Skip to content
Snippets Groups Projects
Commit aced424c authored by Federico Lolli's avatar Federico Lolli
Browse files

Added support for multiple message id subscriptions

parent 74a26643
Branches
Tags
1 merge request!16Implemented a first draft of the new Valve-control-pane
......@@ -34,4 +34,8 @@ impl TimedMessage {
time: Instant::now(),
}
}
pub fn id(&self) -> u32 {
self.message.message_id()
}
}
......@@ -11,7 +11,6 @@ pub use message_bundle::MessageBundle;
use reception_queue::ReceptionQueue;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
......@@ -21,7 +20,7 @@ use tracing::error;
use crate::{
communication::{Connection, ConnectionError, TransceiverConfigExt},
error::ErrInstrument,
mavlink::{MavFrame, MavHeader, MavMessage, MavlinkVersion, Message, TimedMessage},
mavlink::{MavFrame, MavHeader, MavMessage, MavlinkVersion, TimedMessage},
};
const RECEPTION_QUEUE_INTERVAL: Duration = Duration::from_secs(1);
......@@ -34,7 +33,7 @@ const SEGS_COMPONENT_ID: u8 = 1;
/// dispatching them to the views that are interested in them.
pub struct MessageBroker {
/// A map of all messages received so far, indexed by message ID
messages: HashMap<u32, Vec<TimedMessage>>,
messages: Vec<TimedMessage>,
/// instant queue used for frequency calculation and reception time
last_receptions: Arc<Mutex<ReceptionQueue>>,
/// Connection to the Mavlink listener
......@@ -47,7 +46,7 @@ impl MessageBroker {
/// Creates a new `MessageBroker` with the given channel size and Egui context.
pub fn new(ctx: egui::Context) -> Self {
Self {
messages: HashMap::new(),
messages: Vec::new(),
// TODO: make this configurable
last_receptions: Arc::new(Mutex::new(ReceptionQueue::new(RECEPTION_QUEUE_INTERVAL))),
connection: None,
......@@ -88,8 +87,11 @@ impl MessageBroker {
self.last_receptions.lock().log_unwrap().frequency()
}
pub fn get(&self, id: u32) -> &[TimedMessage] {
self.messages.get(&id).map_or(&[], |v| v.as_slice())
pub fn get(&self, ids: &[u32]) -> Vec<&TimedMessage> {
self.messages
.iter()
.filter(|msg| ids.contains(&msg.id()))
.collect()
}
/// Processes incoming network messages. New messages are added to the
......@@ -108,10 +110,7 @@ impl MessageBroker {
self.last_receptions.lock().log_unwrap().push(message.time);
// Store the message in the broker
self.messages
.entry(message.message.message_id())
.or_default()
.push(message);
self.messages.push(message);
}
self.ctx.request_repaint();
}
......
use crate::mavlink::{Message, TimedMessage};
use crate::mavlink::TimedMessage;
/// A bundle of messages, indexed by their ID.
/// Allows for efficient storage and retrieval of messages by ID.
......@@ -10,36 +10,22 @@ use crate::mavlink::{Message, TimedMessage};
/// method to clear the content of the bundle and prepare it for reuse.
#[derive(Default)]
pub struct MessageBundle {
storage: Vec<(u32, Vec<TimedMessage>)>,
storage: Vec<TimedMessage>,
count: u32,
}
impl MessageBundle {
/// Returns all messages of the given ID contained in the bundle.
pub fn get(&self, id: u32) -> &[TimedMessage] {
pub fn get(&self, ids: &[u32]) -> Vec<&TimedMessage> {
self.storage
.iter()
.find(|&&(queue_id, _)| queue_id == id)
.map_or(&[], |(_, messages)| messages.as_slice())
.filter(|msg| ids.contains(&msg.id()))
.collect()
}
/// Inserts a new message into the bundle.
pub fn insert(&mut self, message: TimedMessage) {
let message_id = message.message.message_id();
// Retrieve the queue for the ID, if it exists
let maybe_queue = self
.storage
.iter_mut()
.find(|&&mut (queue_id, _)| queue_id == message_id)
.map(|(_, queue)| queue);
if let Some(queue) = maybe_queue {
queue.push(message);
} else {
self.storage.push((message_id, vec![message]));
}
self.storage.push(message);
self.count += 1;
}
......@@ -49,15 +35,9 @@ impl MessageBundle {
}
/// Resets the content of the bundle, preparing it to be efficiently reused.
/// Effectively, it clears the content of the bundle, but with lower
/// allocation cost the next time the bundle is reused.
/// Effectively, it clears the content of the bundle.
pub fn reset(&mut self) {
// Clear the individual queues instead of the full storage, to avoid
// the allocation cost of the already used per-id queues.
for (_, queue) in &mut self.storage {
queue.clear();
}
self.storage.clear();
self.count = 0;
}
}
......@@ -318,14 +318,12 @@ impl App {
// Skip non-pane tiles
let Tile::Pane(pane) = tile else { continue };
// Skip panes that do not have a subscription
let Some(sub_id) = pane.get_message_subscription() else {
continue;
};
let sub_ids: Vec<u32> = pane.get_message_subscriptions().collect();
if pane.should_send_message_history() {
pane.update(self.message_broker.get(sub_id));
pane.update(self.message_broker.get(&sub_ids[..]).as_slice());
} else {
pane.update(self.message_bundle.get(sub_id));
pane.update(self.message_bundle.get(&sub_ids[..]).as_slice());
}
}
......
......@@ -34,11 +34,11 @@ pub trait PaneBehavior {
/// Updates the pane state. This method is called before `ui` to allow the
/// pane to update its state based on the messages received.
fn update(&mut self, _messages: &[TimedMessage]) {}
fn update(&mut self, _messages: &[&TimedMessage]) {}
/// Returns the ID of the messages this pane is interested in, if any.
fn get_message_subscription(&self) -> Option<u32> {
None
fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> {
Box::new(None.into_iter())
}
/// Checks whether the full message history should be sent to the pane.
......@@ -61,12 +61,12 @@ impl PaneBehavior for Pane {
self.pane.contains_pointer()
}
fn update(&mut self, messages: &[TimedMessage]) {
fn update(&mut self, messages: &[&TimedMessage]) {
self.pane.update(messages)
}
fn get_message_subscription(&self) -> Option<u32> {
self.pane.get_message_subscription()
fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> {
self.pane.get_message_subscriptions()
}
fn should_send_message_history(&self) -> bool {
......
......@@ -2,9 +2,12 @@ use super::PaneBehavior;
use serde::{Deserialize, Serialize};
use tracing::debug;
use crate::ui::{
use crate::{
mavlink::TimedMessage,
ui::{
app::{PaneAction, PaneResponse},
utils::{SizingMemo, vertically_centered},
},
};
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
......@@ -60,10 +63,10 @@ impl PaneBehavior for DefaultPane {
self.contains_pointer
}
fn update(&mut self, _messages: &[crate::mavlink::TimedMessage]) {}
fn update(&mut self, _messages: &[&TimedMessage]) {}
fn get_message_subscription(&self) -> Option<u32> {
None
fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> {
Box::new(None.into_iter())
}
fn should_send_message_history(&self) -> bool {
......
......@@ -151,7 +151,7 @@ impl PaneBehavior for PidPane {
self.contains_pointer
}
fn update(&mut self, messages: &[TimedMessage]) {
fn update(&mut self, messages: &[&TimedMessage]) {
if let Some(msg) = messages.last() {
for element in &mut self.elements {
element.update(&msg.message, self.message_subscription_id);
......@@ -159,8 +159,8 @@ impl PaneBehavior for PidPane {
}
}
fn get_message_subscription(&self) -> Option<u32> {
Some(self.message_subscription_id)
fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> {
Box::new(Some(self.message_subscription_id).into_iter())
}
}
......
......@@ -186,7 +186,7 @@ impl PaneBehavior for Plot2DPane {
}
#[profiling::function]
fn update(&mut self, messages: &[TimedMessage]) {
fn update(&mut self, messages: &[&TimedMessage]) {
if !self.state_valid {
self.line_data.clear();
}
......@@ -226,8 +226,8 @@ impl PaneBehavior for Plot2DPane {
self.state_valid = true;
}
fn get_message_subscription(&self) -> Option<u32> {
Some(self.settings.plot_message_id)
fn get_message_subscriptions(&self) -> Box<dyn Iterator<Item = u32>> {
Box::new(Some(self.settings.plot_message_id).into_iter())
}
fn should_send_message_history(&self) -> bool {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment