From aced424cad06c57e47d58a73a831187978855b50 Mon Sep 17 00:00:00 2001 From: Federico Lolli <federico.lolli@skywarder.eu> Date: Fri, 21 Mar 2025 09:54:39 +0100 Subject: [PATCH] Added support for multiple message id subscriptions --- src/mavlink.rs | 4 ++++ src/message_broker.rs | 19 +++++++-------- src/message_broker/message_bundle.rs | 36 +++++++--------------------- src/ui/app.rs | 8 +++---- src/ui/panes.rs | 12 +++++----- src/ui/panes/default.rs | 15 +++++++----- src/ui/panes/pid_drawing_tool.rs | 6 ++--- src/ui/panes/plot.rs | 6 ++--- 8 files changed, 45 insertions(+), 61 deletions(-) diff --git a/src/mavlink.rs b/src/mavlink.rs index f1517cc..9a05aa8 100644 --- a/src/mavlink.rs +++ b/src/mavlink.rs @@ -34,4 +34,8 @@ impl TimedMessage { time: Instant::now(), } } + + pub fn id(&self) -> u32 { + self.message.message_id() + } } diff --git a/src/message_broker.rs b/src/message_broker.rs index 7cf10e7..ddf1c17 100644 --- a/src/message_broker.rs +++ b/src/message_broker.rs @@ -11,7 +11,6 @@ pub use message_bundle::MessageBundle; use reception_queue::ReceptionQueue; use std::{ - collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; @@ -21,7 +20,7 @@ use tracing::error; use crate::{ communication::{Connection, ConnectionError, TransceiverConfigExt}, error::ErrInstrument, - mavlink::{MavFrame, MavHeader, MavMessage, MavlinkVersion, Message, TimedMessage}, + mavlink::{MavFrame, MavHeader, MavMessage, MavlinkVersion, TimedMessage}, }; const RECEPTION_QUEUE_INTERVAL: Duration = Duration::from_secs(1); @@ -34,7 +33,7 @@ const SEGS_COMPONENT_ID: u8 = 1; /// dispatching them to the views that are interested in them. pub struct MessageBroker { /// A map of all messages received so far, indexed by message ID - messages: HashMap<u32, Vec<TimedMessage>>, + messages: Vec<TimedMessage>, /// instant queue used for frequency calculation and reception time last_receptions: Arc<Mutex<ReceptionQueue>>, /// Connection to the Mavlink listener @@ -47,7 +46,7 @@ impl MessageBroker { /// Creates a new `MessageBroker` with the given channel size and Egui context. pub fn new(ctx: egui::Context) -> Self { Self { - messages: HashMap::new(), + messages: Vec::new(), // TODO: make this configurable last_receptions: Arc::new(Mutex::new(ReceptionQueue::new(RECEPTION_QUEUE_INTERVAL))), connection: None, @@ -88,8 +87,11 @@ impl MessageBroker { self.last_receptions.lock().log_unwrap().frequency() } - pub fn get(&self, id: u32) -> &[TimedMessage] { - self.messages.get(&id).map_or(&[], |v| v.as_slice()) + pub fn get(&self, ids: &[u32]) -> Vec<&TimedMessage> { + self.messages + .iter() + .filter(|msg| ids.contains(&msg.id())) + .collect() } /// Processes incoming network messages. New messages are added to the @@ -108,10 +110,7 @@ impl MessageBroker { self.last_receptions.lock().log_unwrap().push(message.time); // Store the message in the broker - self.messages - .entry(message.message.message_id()) - .or_default() - .push(message); + self.messages.push(message); } self.ctx.request_repaint(); } diff --git a/src/message_broker/message_bundle.rs b/src/message_broker/message_bundle.rs index 52568b9..eaa06df 100644 --- a/src/message_broker/message_bundle.rs +++ b/src/message_broker/message_bundle.rs @@ -1,4 +1,4 @@ -use crate::mavlink::{Message, TimedMessage}; +use crate::mavlink::TimedMessage; /// A bundle of messages, indexed by their ID. /// Allows for efficient storage and retrieval of messages by ID. @@ -10,36 +10,22 @@ use crate::mavlink::{Message, TimedMessage}; /// method to clear the content of the bundle and prepare it for reuse. #[derive(Default)] pub struct MessageBundle { - storage: Vec<(u32, Vec<TimedMessage>)>, + storage: Vec<TimedMessage>, count: u32, } impl MessageBundle { /// Returns all messages of the given ID contained in the bundle. - pub fn get(&self, id: u32) -> &[TimedMessage] { + pub fn get(&self, ids: &[u32]) -> Vec<&TimedMessage> { self.storage .iter() - .find(|&&(queue_id, _)| queue_id == id) - .map_or(&[], |(_, messages)| messages.as_slice()) + .filter(|msg| ids.contains(&msg.id())) + .collect() } /// Inserts a new message into the bundle. pub fn insert(&mut self, message: TimedMessage) { - let message_id = message.message.message_id(); - - // Retrieve the queue for the ID, if it exists - let maybe_queue = self - .storage - .iter_mut() - .find(|&&mut (queue_id, _)| queue_id == message_id) - .map(|(_, queue)| queue); - - if let Some(queue) = maybe_queue { - queue.push(message); - } else { - self.storage.push((message_id, vec![message])); - } - + self.storage.push(message); self.count += 1; } @@ -49,15 +35,9 @@ impl MessageBundle { } /// Resets the content of the bundle, preparing it to be efficiently reused. - /// Effectively, it clears the content of the bundle, but with lower - /// allocation cost the next time the bundle is reused. + /// Effectively, it clears the content of the bundle. pub fn reset(&mut self) { - // Clear the individual queues instead of the full storage, to avoid - // the allocation cost of the already used per-id queues. - for (_, queue) in &mut self.storage { - queue.clear(); - } - + self.storage.clear(); self.count = 0; } } diff --git a/src/ui/app.rs b/src/ui/app.rs index ad8fa9f..902e586 100644 --- a/src/ui/app.rs +++ b/src/ui/app.rs @@ -318,14 +318,12 @@ impl App { // Skip non-pane tiles let Tile::Pane(pane) = tile else { continue }; // Skip panes that do not have a subscription - let Some(sub_id) = pane.get_message_subscription() else { - continue; - }; + let sub_ids: Vec<u32> = pane.get_message_subscriptions().collect(); if pane.should_send_message_history() { - pane.update(self.message_broker.get(sub_id)); + pane.update(self.message_broker.get(&sub_ids[..]).as_slice()); } else { - pane.update(self.message_bundle.get(sub_id)); + pane.update(self.message_bundle.get(&sub_ids[..]).as_slice()); } } diff --git a/src/ui/panes.rs b/src/ui/panes.rs index 9b27514..5d031f3 100644 --- a/src/ui/panes.rs +++ b/src/ui/panes.rs @@ -34,11 +34,11 @@ pub trait PaneBehavior { /// Updates the pane state. This method is called before `ui` to allow the /// pane to update its state based on the messages received. - fn update(&mut self, _messages: &[TimedMessage]) {} + fn update(&mut self, _messages: &[&TimedMessage]) {} /// Returns the ID of the messages this pane is interested in, if any. - fn get_message_subscription(&self) -> Option<u32> { - None + fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> { + Box::new(None.into_iter()) } /// Checks whether the full message history should be sent to the pane. @@ -61,12 +61,12 @@ impl PaneBehavior for Pane { self.pane.contains_pointer() } - fn update(&mut self, messages: &[TimedMessage]) { + fn update(&mut self, messages: &[&TimedMessage]) { self.pane.update(messages) } - fn get_message_subscription(&self) -> Option<u32> { - self.pane.get_message_subscription() + fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> { + self.pane.get_message_subscriptions() } fn should_send_message_history(&self) -> bool { diff --git a/src/ui/panes/default.rs b/src/ui/panes/default.rs index c3f7979..e4d1bef 100644 --- a/src/ui/panes/default.rs +++ b/src/ui/panes/default.rs @@ -2,9 +2,12 @@ use super::PaneBehavior; use serde::{Deserialize, Serialize}; use tracing::debug; -use crate::ui::{ - app::{PaneAction, PaneResponse}, - utils::{SizingMemo, vertically_centered}, +use crate::{ + mavlink::TimedMessage, + ui::{ + app::{PaneAction, PaneResponse}, + utils::{SizingMemo, vertically_centered}, + }, }; #[derive(Clone, Debug, Default, Serialize, Deserialize)] @@ -60,10 +63,10 @@ impl PaneBehavior for DefaultPane { self.contains_pointer } - fn update(&mut self, _messages: &[crate::mavlink::TimedMessage]) {} + fn update(&mut self, _messages: &[&TimedMessage]) {} - fn get_message_subscription(&self) -> Option<u32> { - None + fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> { + Box::new(None.into_iter()) } fn should_send_message_history(&self) -> bool { diff --git a/src/ui/panes/pid_drawing_tool.rs b/src/ui/panes/pid_drawing_tool.rs index 57c49ca..6f7695e 100644 --- a/src/ui/panes/pid_drawing_tool.rs +++ b/src/ui/panes/pid_drawing_tool.rs @@ -151,7 +151,7 @@ impl PaneBehavior for PidPane { self.contains_pointer } - fn update(&mut self, messages: &[TimedMessage]) { + fn update(&mut self, messages: &[&TimedMessage]) { if let Some(msg) = messages.last() { for element in &mut self.elements { element.update(&msg.message, self.message_subscription_id); @@ -159,8 +159,8 @@ impl PaneBehavior for PidPane { } } - fn get_message_subscription(&self) -> Option<u32> { - Some(self.message_subscription_id) + fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> { + Box::new(Some(self.message_subscription_id).into_iter()) } } diff --git a/src/ui/panes/plot.rs b/src/ui/panes/plot.rs index 684f221..42444ab 100644 --- a/src/ui/panes/plot.rs +++ b/src/ui/panes/plot.rs @@ -186,7 +186,7 @@ impl PaneBehavior for Plot2DPane { } #[profiling::function] - fn update(&mut self, messages: &[TimedMessage]) { + fn update(&mut self, messages: &[&TimedMessage]) { if !self.state_valid { self.line_data.clear(); } @@ -226,8 +226,8 @@ impl PaneBehavior for Plot2DPane { self.state_valid = true; } - fn get_message_subscription(&self) -> Option<u32> { - Some(self.settings.plot_message_id) + fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> { + Box::new(Some(self.settings.plot_message_id).into_iter()) } fn should_send_message_history(&self) -> bool { -- GitLab