diff --git a/Cargo.lock b/Cargo.lock index fbc96f04cf891edfccba6e0babd32cce2d6a18fd..ac6f8291799f3db9b03e2a89a0520d7be505c432 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -785,15 +785,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -2911,7 +2902,6 @@ name = "segs" version = "0.1.0" dependencies = [ "anyhow", - "crossbeam-channel", "eframe", "egui", "egui_extras", @@ -2920,7 +2910,6 @@ dependencies = [ "egui_tiles", "enum_dispatch", "mavlink-bindgen", - "parking_lot", "ring-channel", "serde", "serde_json", @@ -3395,7 +3384,6 @@ dependencies = [ "backtrace", "libc", "mio", - "parking_lot", "pin-project-lite", "socket2", "windows-sys 0.52.0", diff --git a/Cargo.toml b/Cargo.toml index be31c45f165358550e065d57e2ae05e09efa6070..b69294f3bd282b4e3b9ca0448897f310ce21ce57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ egui_file = "0.19" tokio = { version = "1.41", features = [ "rt-multi-thread", "net", - "parking_lot", "sync", ] } # =========== Mavlink =========== @@ -35,11 +34,6 @@ serde_json = "1.0" # =========== Logging =========== tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -# =========== Performance =========== -# for fast mutexes -parking_lot = "0.12" -# for fast channels -crossbeam-channel = "0.5" # =========== Utility =========== # for dynamic dispatch enum_dispatch = "0.3" diff --git a/src/main.rs b/src/main.rs index baa1fb9f8d6916358b8d9e19bfce4f7a1fe94799..e0c0555099548c1e08966bc6f91be01500c94071 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,39 +5,24 @@ mod error; mod mavlink; mod serial; +mod message_broker; mod ui; mod utils; -use std::{ - num::NonZeroUsize, - sync::{LazyLock, OnceLock}, -}; +use std::sync::LazyLock; -use parking_lot::Mutex; use tokio::runtime::Runtime; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; use error::ErrInstrument; -use mavlink::{MessageBroker, ReflectionContext}; +use mavlink::ReflectionContext; use ui::ComposableView; -/// MessageBroker singleton, used to fetch & filter Mavlink messages collected -static MSG_MANAGER: OnceLock<Mutex<MessageBroker>> = OnceLock::new(); /// ReflectionContext singleton, used to get access to the Mavlink message definitions static MAVLINK_PROFILE: LazyLock<ReflectionContext> = LazyLock::new(ReflectionContext::new); static APP_NAME: &str = "segs"; -#[macro_export] -macro_rules! msg_broker { - () => { - $crate::MSG_MANAGER - .get() - .log_expect("Unable to get MessageBroker") - .lock() - }; -} - fn main() -> Result<(), eframe::Error> { // Set up logging (USE RUST_LOG=debug to see logs) let env_filter = EnvFilter::builder().from_env_lossy(); @@ -62,20 +47,6 @@ fn main() -> Result<(), eframe::Error> { eframe::run_native( APP_NAME, // This is the app id, used for example by Wayland native_options, - Box::new(|ctx| { - // First we initialize the MSGManager, as a global singleton available to all the panes - MSG_MANAGER - .set(Mutex::new(MessageBroker::new( - // FIXME: Choose where to put the channel size of the MessageBroker - NonZeroUsize::new(50).log_unwrap(), - ctx.egui_ctx.clone(), - ))) - .log_expect("Unable to set MessageManager"); - let app = ctx - .storage - .map(|storage| ComposableView::new(APP_NAME, storage)) - .unwrap_or_default(); - Ok(Box::new(app)) - }), + Box::new(|ctx| Ok(Box::new(ComposableView::new(APP_NAME, ctx)))), ) } diff --git a/src/mavlink.rs b/src/mavlink.rs index 7fe41e95c6537002e60e15f68618153c08dca5e0..91193466aa0254f7dbe1e0415be3421d18e4ab09 100644 --- a/src/mavlink.rs +++ b/src/mavlink.rs @@ -5,13 +5,10 @@ mod base; mod error; -mod message_broker; mod reflection; // Export all the types from the base module as if they were defined in this module pub use base::*; -pub use error::{MavlinkError, Result as MavlinkResult}; -pub use message_broker::{MessageBroker, MessageView, ViewId}; pub use reflection::ReflectionContext; /// Default port for the Ethernet connection diff --git a/src/mavlink/message_broker.rs b/src/message_broker.rs similarity index 51% rename from src/mavlink/message_broker.rs rename to src/message_broker.rs index a4b070a7c0b3297398ee7ed40507ca12d26b47d0..dd3c7dc854f8f84d71f75a90f0a648afcc92e8d0 100644 --- a/src/mavlink/message_broker.rs +++ b/src/message_broker.rs @@ -5,66 +5,42 @@ //! is responsible for listening to incoming messages from the Mavlink listener, //! storing them in a map, and updating the views that are interested in them. +mod message_bundle; +mod reception_queue; +pub use message_bundle::MessageBundle; +use reception_queue::ReceptionQueue; + +use crate::{ + error::ErrInstrument, + mavlink::{byte_parser, Message, TimedMessage}, + utils::RingBuffer, +}; +use anyhow::{Context, Result}; +use ring_channel::{ring_channel, RingReceiver, RingSender}; use std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, io::Write, num::NonZeroUsize, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, time::{Duration, Instant}, }; - -use anyhow::{Context, Result}; -use parking_lot::Mutex; -use ring_channel::{ring_channel, RingReceiver, RingSender}; -use serde::{Deserialize, Serialize}; use tokio::{net::UdpSocket, task::JoinHandle}; use tracing::{debug, trace}; -use uuid::Uuid; - -use crate::{error::ErrInstrument, mavlink::byte_parser, utils::RingBuffer}; - -use super::{MavlinkResult, Message, TimedMessage}; /// Maximum size of the UDP buffer const UDP_BUFFER_SIZE: usize = 65527; -/// Trait for a view that fetch Mavlink messages. -/// -/// This trait should be implemented by any view that wants to interact with the -/// `MessageBroker` and get updates on the messages it is interested in. -pub trait MessageView { - /// Returns an hashable value as widget identifier - fn view_id(&self) -> ViewId; - /// Returns the message ID of interest for the view - fn id_of_interest(&self) -> u32; - /// Returns whether the view is cache valid or not, i.e. if it can be - /// updated or needs to be re-populated from scratch - fn is_valid(&self) -> bool; - /// Populates the view with the initial messages. This method is called when - /// the cache is invalid and the view needs to be populated from the stored - /// map of messages - fn populate_view(&mut self, msg_slice: &[TimedMessage]) -> MavlinkResult<()>; - /// Updates the view with new messages. This method is called when the cache - /// is valid, hence the view only needs to be updated with the new messages - fn update_view(&mut self, msg_slice: &[TimedMessage]) -> MavlinkResult<()>; -} - -/// Responsible for storing & dispatching the Mavlink message received. +/// The MessageBroker struct contains the state of the message broker. /// -/// It listens to incoming messages, stores them in a map, and updates the views -/// that are interested in them. It should be used as a singleton in the -/// application. +/// It is responsible for receiving messages from the Mavlink listener and +/// dispatching them to the views that are interested in them. #[derive(Debug)] pub struct MessageBroker { - // == Messages == - /// map(message ID -> vector of messages received so far) + /// A map of all messages received so far, indexed by message ID messages: HashMap<u32, Vec<TimedMessage>>, - /// map(widget ID -> queue of messages left for update) - update_queues: HashMap<ViewId, (u32, VecDeque<TimedMessage>)>, - // == Internal == /// instant queue used for frequency calculation and reception time last_receptions: Arc<Mutex<ReceptionQueue>>, /// Flag to stop the listener @@ -85,7 +61,6 @@ impl MessageBroker { let (tx, rx) = ring_channel(channel_size); Self { messages: HashMap::new(), - update_queues: HashMap::new(), // TODO: make this configurable last_receptions: Arc::new(Mutex::new(ReceptionQueue::new(Duration::from_secs(1)))), tx, @@ -96,18 +71,6 @@ impl MessageBroker { } } - /// Refreshes the view given as argument. It handles automatically the cache - /// validity based on `is_valid` method of the view. - pub fn refresh_view<V: MessageView>(&mut self, view: &mut V) -> MavlinkResult<()> { - self.process_incoming_msgs(); - if !view.is_valid() || !self.is_view_subscribed(&view.view_id()) { - self.init_view(view)?; - } else { - self.update_view(view)?; - } - Ok(()) - } - /// Stop the listener task from listening to incoming messages, if it is /// running. pub fn stop_listening(&mut self) { @@ -118,7 +81,8 @@ impl MessageBroker { } /// Start a listener task that listens to incoming messages from the given - /// Ethernet port and stores them in a ring buffer. + /// Ethernet port, and accumulates them in a ring buffer, read only when + /// views request a refresh. pub fn listen_from_ethernet_port(&mut self, port: u16) { // Stop the current listener if it exists self.stop_listening(); @@ -145,10 +109,10 @@ impl MessageBroker { .await .context("Failed to receive message")?; for (_, mav_message) in byte_parser(&buf[..len]) { - debug!("Received message: {:?}", mav_message); + trace!("Received message: {:?}", mav_message); tx.send(TimedMessage::just_received(mav_message)) .context("Failed to send message")?; - last_receptions.lock().push(Instant::now()); + last_receptions.lock().unwrap().push(Instant::now()); ctx.request_repaint(); } } @@ -195,7 +159,7 @@ impl MessageBroker { debug!("Received message: {:?}", mav_message); tx.send(TimedMessage::just_received(mav_message)) .context("Failed to send message")?; - last_receptions.lock().push(Instant::now()); + last_receptions.lock().unwrap().push(Instant::now()); ctx.request_repaint(); } } @@ -205,67 +169,30 @@ impl MessageBroker { self.task = Some(handle); } - pub fn unsubscribe_all_views(&mut self) { - self.update_queues.clear(); - } - - /// Clears all the messages stored in the broker. Useful in message replay - /// scenarios. - pub fn clear(&mut self) { - self.messages.clear(); - } - /// Returns the time since the last message was received. pub fn time_since_last_reception(&self) -> Option<Duration> { - self.last_receptions.lock().time_since_last_reception() + self.last_receptions + .lock() + .unwrap() + .time_since_last_reception() } /// Returns the frequency of messages received in the last second. pub fn reception_frequency(&self) -> f64 { - self.last_receptions.lock().frequency() + self.last_receptions.lock().unwrap().frequency() } - fn is_view_subscribed(&self, widget_id: &ViewId) -> bool { - self.update_queues.contains_key(widget_id) - } - - /// Init a view in case of cache invalidation or first time initialization. - fn init_view<V: MessageView>(&mut self, view: &mut V) -> MavlinkResult<()> { - trace!("initializing view: {:?}", view.view_id()); - if let Some(messages) = self.messages.get(&view.id_of_interest()) { - view.populate_view(messages)?; - } - self.update_queues - .insert(view.view_id(), (view.id_of_interest(), VecDeque::new())); - Ok(()) - } - - /// Update a view with new messages, used when the cache is valid. - fn update_view<V: MessageView>(&mut self, view: &mut V) -> MavlinkResult<()> { - trace!("updating view: {:?}", view.view_id()); - if let Some((_, queue)) = self.update_queues.get_mut(&view.view_id()) { - while let Some(msg) = queue.pop_front() { - view.update_view(&[msg])?; - } - } - Ok(()) + pub fn get(&self, id: u32) -> &[TimedMessage] { + self.messages.get(&id).map_or(&[], |v| v.as_slice()) } - /// Process the incoming messages from the Mavlink listener, storing them in - /// the messages map and updating the update queues. - fn process_incoming_msgs(&mut self) { + /// Processes incoming network messages. New messages are added to the + /// given `MessageBundle`. + pub fn process_messages(&mut self, bundle: &mut MessageBundle) { while let Ok(message) = self.rx.try_recv() { - debug!( - "processing received message: {:?}", - message.message.message_name() - ); - // first update the update queues - for (_, (id, queue)) in self.update_queues.iter_mut() { - if *id == message.message.message_id() { - queue.push_back(message.clone()); - } - } - // then store the message in the messages map + bundle.insert(message.clone()); + + // Store the message in the broker self.messages .entry(message.message.message_id()) .or_default() @@ -276,55 +203,3 @@ impl MessageBroker { // TODO: Implement a scheduler removal of old messages (configurable, must not hurt performance) // TODO: Add a Dashmap if performance is a problem (Personally don't think it will be) } - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct ViewId(Uuid); - -impl ViewId { - pub fn new() -> Self { - Self(Uuid::now_v7()) - } -} - -impl Default for ViewId { - fn default() -> Self { - Self(Uuid::now_v7()) - } -} - -#[derive(Debug)] -struct ReceptionQueue { - queue: VecDeque<Instant>, - threshold: Duration, -} - -impl ReceptionQueue { - fn new(threshold: Duration) -> Self { - Self { - queue: VecDeque::new(), - threshold, - } - } - - fn push(&mut self, instant: Instant) { - self.queue.push_front(instant); - // clear the queue of all elements older than the threshold - while let Some(front) = self.queue.back() { - if instant.duration_since(*front) > self.threshold { - self.queue.pop_back(); - } else { - break; - } - } - } - - fn frequency(&self) -> f64 { - let till = Instant::now(); - let since = till - self.threshold; - self.queue.iter().take_while(|t| **t > since).count() as f64 / self.threshold.as_secs_f64() - } - - fn time_since_last_reception(&self) -> Option<Duration> { - self.queue.front().map(|t| t.elapsed()) - } -} diff --git a/src/message_broker/message_bundle.rs b/src/message_broker/message_bundle.rs new file mode 100644 index 0000000000000000000000000000000000000000..52568b935f267e0af070423a746299ddba2151c7 --- /dev/null +++ b/src/message_broker/message_bundle.rs @@ -0,0 +1,63 @@ +use crate::mavlink::{Message, TimedMessage}; + +/// A bundle of messages, indexed by their ID. +/// Allows for efficient storage and retrieval of messages by ID. +/// +/// # Note +/// +/// This structure performed best when reusing the same instance for multiple +/// iterations, instead of creating a new instance every time. Use the [`reset`] +/// method to clear the content of the bundle and prepare it for reuse. +#[derive(Default)] +pub struct MessageBundle { + storage: Vec<(u32, Vec<TimedMessage>)>, + count: u32, +} + +impl MessageBundle { + /// Returns all messages of the given ID contained in the bundle. + pub fn get(&self, id: u32) -> &[TimedMessage] { + self.storage + .iter() + .find(|&&(queue_id, _)| queue_id == id) + .map_or(&[], |(_, messages)| messages.as_slice()) + } + + /// Inserts a new message into the bundle. + pub fn insert(&mut self, message: TimedMessage) { + let message_id = message.message.message_id(); + + // Retrieve the queue for the ID, if it exists + let maybe_queue = self + .storage + .iter_mut() + .find(|&&mut (queue_id, _)| queue_id == message_id) + .map(|(_, queue)| queue); + + if let Some(queue) = maybe_queue { + queue.push(message); + } else { + self.storage.push((message_id, vec![message])); + } + + self.count += 1; + } + + /// Returns the number of messages in the bundle. + pub fn count(&self) -> u32 { + self.count + } + + /// Resets the content of the bundle, preparing it to be efficiently reused. + /// Effectively, it clears the content of the bundle, but with lower + /// allocation cost the next time the bundle is reused. + pub fn reset(&mut self) { + // Clear the individual queues instead of the full storage, to avoid + // the allocation cost of the already used per-id queues. + for (_, queue) in &mut self.storage { + queue.clear(); + } + + self.count = 0; + } +} diff --git a/src/message_broker/reception_queue.rs b/src/message_broker/reception_queue.rs new file mode 100644 index 0000000000000000000000000000000000000000..95ba70e5fc4017ed1b51eec35fac2eeadc743eed --- /dev/null +++ b/src/message_broker/reception_queue.rs @@ -0,0 +1,41 @@ +use std::{ + collections::VecDeque, + time::{Duration, Instant}, +}; + +#[derive(Debug)] +pub(super) struct ReceptionQueue { + queue: VecDeque<Instant>, + threshold: Duration, +} + +impl ReceptionQueue { + pub(super) fn new(threshold: Duration) -> Self { + Self { + queue: VecDeque::new(), + threshold, + } + } + + pub(super) fn push(&mut self, instant: Instant) { + self.queue.push_front(instant); + // clear the queue of all elements older than the threshold + while let Some(front) = self.queue.back() { + if instant.duration_since(*front) > self.threshold { + self.queue.pop_back(); + } else { + break; + } + } + } + + pub(super) fn frequency(&self) -> f64 { + let till = Instant::now(); + let since = till - self.threshold; + self.queue.iter().take_while(|t| **t > since).count() as f64 / self.threshold.as_secs_f64() + } + + pub(super) fn time_since_last_reception(&self) -> Option<Duration> { + self.queue.front().map(|t| t.elapsed()) + } +} diff --git a/src/ui/composable_view.rs b/src/ui/composable_view.rs index 87052dbb62c57b29959a391e258412de5f45f8ec..4abda196b3a203a5e37c705f7877e69b245dcd09 100644 --- a/src/ui/composable_view.rs +++ b/src/ui/composable_view.rs @@ -1,10 +1,3 @@ -use crate::{ - error::ErrInstrument, - mavlink, msg_broker, - serial::{get_first_stm32_serial_port, list_all_serial_ports}, - ui::panes::PaneKind, -}; - use super::{ panes::{Pane, PaneBehavior}, persistency::{LayoutManager, LayoutManagerWindow}, @@ -13,28 +6,37 @@ use super::{ widget_gallery::WidgetGallery, widgets::reception_led::ReceptionLed, }; -use std::{ - fs, - path::{Path, PathBuf}, - time::Duration, +use crate::{ + error::ErrInstrument, + mavlink, + message_broker::{MessageBroker, MessageBundle}, + serial::{get_first_stm32_serial_port, list_all_serial_ports}, + ui::panes::PaneKind, }; - +use eframe::CreationContext; use egui::{Align2, Button, ComboBox, Key, Modifiers, Sides, Vec2}; use egui_extras::{Size, StripBuilder}; use egui_tiles::{Behavior, Container, Linear, LinearDir, Tile, TileId, Tiles, Tree}; use serde::{Deserialize, Serialize}; +use std::{ + fs, + num::NonZeroUsize, + path::{Path, PathBuf}, + time::{Duration, Instant}, +}; use tracing::{debug, error, trace}; -#[derive(Default)] pub struct ComposableView { /// Persistent state of the app state: ComposableViewState, layout_manager: LayoutManager, - widget_gallery: WidgetGallery, behavior: ComposableBehavior, maximized_pane: Option<TileId>, - + // == Message handling == + message_broker: MessageBroker, + message_bundle: MessageBundle, // == Windows == + widget_gallery: WidgetGallery, sources_window: SourceWindow, layout_manager_window: LayoutManagerWindow, } @@ -43,6 +45,8 @@ pub struct ComposableView { impl eframe::App for ComposableView { // The update function is called each time the UI needs repainting! fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { + self.process_messages(); + let panes_tree = &mut self.state.panes_tree; // Get the id of the hovered pane, in order to apply actions to it @@ -169,24 +173,27 @@ impl eframe::App for ComposableView { } } + // TODO: maybe introduce a stats struct to store these values? + let reception_led_active = self + .message_broker + .time_since_last_reception() + .unwrap_or(Duration::MAX) + < Duration::from_millis(100); + let reception_frequency = self.message_broker.reception_frequency(); + // Show a panel at the bottom of the screen with few global controls egui::TopBottomPanel::bottom("bottom_control").show(ctx, |ui| { // Horizontal belt of controls Sides::new().show( ui, - |ui| { - let active = msg_broker!() - .time_since_last_reception() - .unwrap_or(Duration::MAX) - < Duration::from_millis(100); - ui.add(ReceptionLed::new(active)) - }, + |ui| ui.add(ReceptionLed::new(reception_led_active, reception_frequency)), |ui| { ui.horizontal(|ui| { egui::global_theme_preference_switch(ui); // Window for the sources - self.sources_window.show_window(ui); + self.sources_window + .show_window(ui, &mut self.message_broker); if ui .add(Button::new("🔌").frame(false)) @@ -243,21 +250,79 @@ impl eframe::App for ComposableView { } impl ComposableView { - pub fn new(app_name: &str, storage: &dyn eframe::Storage) -> Self { - let layout_manager = LayoutManager::new(app_name, storage); - let mut s = Self { - layout_manager, - ..Self::default() + pub fn new(app_name: &str, ctx: &CreationContext) -> Self { + let mut layout_manager = if let Some(storage) = ctx.storage { + LayoutManager::new(app_name, storage) + } else { + LayoutManager::default() }; + + let mut state = ComposableViewState::default(); + // Load the selected layout if valid and existing - if let Some(layout) = s.layout_manager.current_layout().cloned() { - s.layout_manager - .load_layout(layout, &mut s.state) + if let Some(layout) = layout_manager.current_layout().cloned() { + layout_manager + .load_layout(layout, &mut state) .unwrap_or_else(|e| { error!("Error loading layout: {}", e); }); } - s + + Self { + state, + layout_manager, + message_broker: MessageBroker::new( + NonZeroUsize::new(50).log_unwrap(), + ctx.egui_ctx.clone(), + ), + widget_gallery: WidgetGallery::default(), + behavior: ComposableBehavior::default(), + maximized_pane: None, + message_bundle: MessageBundle::default(), + sources_window: SourceWindow::default(), + layout_manager_window: LayoutManagerWindow::default(), + } + } + + /// Retrieves new messages from the message broker and dispatches them to the panes. + fn process_messages(&mut self) { + let start = Instant::now(); + + self.message_broker + .process_messages(&mut self.message_bundle); + + // Skip updating the panes if there are no messages + let count = self.message_bundle.count(); + if count == 0 { + return; + } + + debug!( + "Receiving {count} messages from message broker took {:?}", + start.elapsed() + ); + + let start = Instant::now(); + for (_, tile) in self.state.panes_tree.tiles.iter_mut() { + // Skip non-pane tiles + let Tile::Pane(pane) = tile else { continue }; + // Skip panes that do not have a subscription + let Some(sub_id) = pane.get_message_subscription() else { + continue; + }; + + if pane.should_send_message_history() { + pane.update(self.message_broker.get(sub_id)); + } else { + pane.update(self.message_bundle.get(sub_id)); + } + } + + debug!( + "Panes message processing messages took {:?}", + start.elapsed() + ); + self.message_bundle.reset(); } } @@ -333,7 +398,7 @@ struct SourceWindow { } impl SourceWindow { - fn show_window(&mut self, ui: &mut egui::Ui) { + fn show_window(&mut self, ui: &mut egui::Ui, message_broker: &mut MessageBroker) { let mut window_is_open = self.visible; let mut can_be_closed = false; egui::Window::new("Sources") @@ -344,12 +409,17 @@ impl SourceWindow { .resizable(false) .open(&mut window_is_open) .show(ui.ctx(), |ui| { - self.ui(ui, &mut can_be_closed); + self.ui(ui, &mut can_be_closed, message_broker); }); self.visible = window_is_open && !can_be_closed; } - fn ui(&mut self, ui: &mut egui::Ui, can_be_closed: &mut bool) { + fn ui( + &mut self, + ui: &mut egui::Ui, + can_be_closed: &mut bool, + message_broker: &mut MessageBroker, + ) { let SourceWindow { connected, connection_kind, @@ -444,10 +514,10 @@ impl SourceWindow { if ui.add_sized(ui.available_size(), btn1).clicked() { match connection_details { ConnectionDetails::Ethernet { port } => { - msg_broker!().listen_from_ethernet_port(*port); + message_broker.listen_from_ethernet_port(*port); } ConnectionDetails::Serial { port, baud_rate } => { - msg_broker!() + message_broker .listen_from_serial_port(port.clone(), *baud_rate); } } @@ -460,7 +530,7 @@ impl SourceWindow { let btn2 = Button::new("Disconnect"); ui.add_enabled_ui(*connected, |ui| { if ui.add_sized(ui.available_size(), btn2).clicked() { - msg_broker!().stop_listening(); + message_broker.stop_listening(); *connected = false; } }); diff --git a/src/ui/panes.rs b/src/ui/panes.rs index 2dbcdd6eca3ca8546be24365104b8d33caaed26f..febd8a5c7941504f0740ece2d8d43a66691d0359 100644 --- a/src/ui/panes.rs +++ b/src/ui/panes.rs @@ -7,6 +7,8 @@ use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use strum_macros::{self, EnumIter, EnumMessage}; +use crate::mavlink::TimedMessage; + use super::composable_view::PaneResponse; #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] @@ -22,8 +24,19 @@ impl Pane { #[enum_dispatch(PaneKind)] pub trait PaneBehavior { + /// Renders the UI of the pane. fn ui(&mut self, ui: &mut egui::Ui, tile_id: TileId) -> PaneResponse; + /// Whether the pane contains the pointer. fn contains_pointer(&self) -> bool; + + /// Updates the pane state. This method is called before `ui` to allow the + /// pane to update its state based on the messages received. + fn update(&mut self, messages: &[TimedMessage]); + + /// Returns the ID of the messages this pane is interested in, if any. + fn get_message_subscription(&self) -> Option<u32>; + /// Checks whether the full message history should be sent to the pane. + fn should_send_message_history(&self) -> bool; } impl PaneBehavior for Pane { @@ -34,6 +47,18 @@ impl PaneBehavior for Pane { fn contains_pointer(&self) -> bool { self.pane.contains_pointer() } + + fn update(&mut self, messages: &[TimedMessage]) { + self.pane.update(messages) + } + + fn get_message_subscription(&self) -> Option<u32> { + self.pane.get_message_subscription() + } + + fn should_send_message_history(&self) -> bool { + self.pane.should_send_message_history() + } } // An enum to represent the diffent kinds of widget available to the user. diff --git a/src/ui/panes/default.rs b/src/ui/panes/default.rs index 7d58c23e195dcda97a925a694d9de33f605ef6d6..7fc0636bb5df66dbacb55b4d30665830be3f5ce5 100644 --- a/src/ui/panes/default.rs +++ b/src/ui/panes/default.rs @@ -58,4 +58,14 @@ impl PaneBehavior for DefaultPane { fn contains_pointer(&self) -> bool { self.contains_pointer } + + fn update(&mut self, _messages: &[crate::mavlink::TimedMessage]) {} + + fn get_message_subscription(&self) -> Option<u32> { + None + } + + fn should_send_message_history(&self) -> bool { + false + } } diff --git a/src/ui/panes/messages_viewer.rs b/src/ui/panes/messages_viewer.rs index 02bcc9947b1f7bed483ca05d6e98e071ac4018dc..0925854ebcaadc455b723af4d1f53fbdec6bd2f1 100644 --- a/src/ui/panes/messages_viewer.rs +++ b/src/ui/panes/messages_viewer.rs @@ -31,4 +31,14 @@ impl PaneBehavior for MessagesViewerPane { fn contains_pointer(&self) -> bool { self.contains_pointer } + + fn update(&mut self, _messages: &[crate::mavlink::TimedMessage]) {} + + fn get_message_subscription(&self) -> Option<u32> { + None + } + + fn should_send_message_history(&self) -> bool { + false + } } diff --git a/src/ui/panes/plot.rs b/src/ui/panes/plot.rs index e5279043de9b330ff2b36346ccaf581ffa183b12..a7c4b5a0e3d9917cb2707e76c6bba7fb43f7e352 100644 --- a/src/ui/panes/plot.rs +++ b/src/ui/panes/plot.rs @@ -1,22 +1,16 @@ mod source_window; +use super::PaneBehavior; +use crate::{ + mavlink::{extract_from_message, MessageData, TimedMessage, ROCKET_FLIGHT_TM_DATA}, + ui::composable_view::PaneResponse, +}; use egui::{Color32, Vec2b}; use egui_plot::{Legend, Line, PlotPoints}; use egui_tiles::TileId; use serde::{Deserialize, Serialize}; use source_window::{sources_window, SourceSettings}; - -use crate::{ - error::ErrInstrument, - mavlink::{ - extract_from_message, MavlinkResult, MessageData, MessageView, TimedMessage, ViewId, - ROCKET_FLIGHT_TM_DATA, - }, - msg_broker, - ui::composable_view::PaneResponse, -}; - -use super::PaneBehavior; +use std::iter::zip; #[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct Plot2DPane { @@ -25,16 +19,20 @@ pub struct Plot2DPane { pub contains_pointer: bool, #[serde(skip)] settings_visible: bool, + line_settings: Vec<LineSettings>, - plot_active: bool, - view: PlotMessageView, + #[serde(skip)] + line_data: Vec<Vec<[f64; 2]>>, + + settings: MsgSources, + + #[serde(skip)] + state_valid: bool, } impl PartialEq for Plot2DPane { fn eq(&self, other: &Self) -> bool { - self.view.settings == other.view.settings - && self.line_settings == other.line_settings - && self.plot_active == other.plot_active + self.settings == other.settings && self.line_settings == other.line_settings } } @@ -42,74 +40,44 @@ impl PaneBehavior for Plot2DPane { fn ui(&mut self, ui: &mut egui::Ui, _: TileId) -> PaneResponse { let mut response = PaneResponse::default(); - let Self { - line_settings: plot_lines, - settings_visible, - plot_active, - view, - .. - } = self; - - let mut settings = SourceSettings::new(&mut view.settings, plot_lines); + let mut settings = SourceSettings::new(&mut self.settings, &mut self.line_settings); egui::Window::new("Plot Settings") .id(ui.auto_id_with("plot_settings")) // TODO: fix this issue with ids .auto_sized() .collapsible(true) .movable(true) - .open(settings_visible) + .open(&mut self.settings_visible) .show(ui.ctx(), |ui| sources_window(ui, &mut settings)); - // if settings are changed, invalidate the cache - view.cache_valid = !settings.are_sources_changed(); - // if there are no fields, do not plot - *plot_active = !settings.fields_empty(); - let ctrl_pressed = ui.input(|i| i.modifiers.ctrl); - - let mut plot_lines = Vec::new(); - if self.plot_active { - msg_broker!() - .refresh_view(view) - .log_expect("MessageView may be invalid"); - let acc_points = &view.points; - - let field_x = &view.settings.x_field; - if !acc_points.is_empty() { - for (i, plot_line) in self.line_settings.iter().enumerate() { - let points: Vec<[f64; 2]> = { - let iter = acc_points.iter(); - if field_x == "timestamp" { - iter.map(|(x, ys)| [x / 1e6, ys[i]]).collect() - } else { - iter.map(|(x, ys)| [*x, ys[i]]).collect() - } - }; - plot_lines.push((plot_line.clone(), points)); - } - } + if settings.are_sources_changed() { + self.state_valid = false; } - let plot = egui_plot::Plot::new("plot") + let ctrl_pressed = ui.input(|i| i.modifiers.ctrl); + + egui_plot::Plot::new("plot") .auto_bounds(Vec2b::TRUE) .legend(Legend::default()) - .label_formatter(|name, value| format!("{} - x:{:.2} y:{:.2}", name, value.x, value.y)); - plot.show(ui, |plot_ui| { - self.contains_pointer = plot_ui.response().contains_pointer(); - if plot_ui.response().dragged() && ctrl_pressed { - println!("ctrl + drag"); - response.set_drag_started(); - } - for (plot_settings, data_points) in plot_lines { - plot_ui.line( - Line::new(PlotPoints::from(data_points)) - .color(plot_settings.color) - .width(plot_settings.width) - .name(&plot_settings.field), - ); - } - plot_ui - .response() - .context_menu(|ui| show_menu(ui, settings_visible)); - }); + .label_formatter(|name, value| format!("{} - x:{:.2} y:{:.2}", name, value.x, value.y)) + .show(ui, |plot_ui| { + self.contains_pointer = plot_ui.response().contains_pointer(); + if plot_ui.response().dragged() && ctrl_pressed { + response.set_drag_started(); + } + + for (settings, points) in zip(&self.line_settings, &mut self.line_data) { + plot_ui.line( + // TODO: remove clone when PlotPoints supports borrowing + Line::new(PlotPoints::from(points.clone())) + .color(settings.color) + .width(settings.width) + .name(&settings.field), + ); + } + plot_ui + .response() + .context_menu(|ui| show_menu(ui, &mut self.settings_visible)); + }); response } @@ -117,63 +85,44 @@ impl PaneBehavior for Plot2DPane { fn contains_pointer(&self) -> bool { self.contains_pointer } -} -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -struct PlotMessageView { - // == Settings from the UI == - settings: MsgSources, - // == Data == - #[serde(skip)] - points: Vec<(f64, Vec<f64>)>, - // == Internal == - id: ViewId, - #[serde(skip)] - cache_valid: bool, -} + fn update(&mut self, messages: &[TimedMessage]) { + if !self.state_valid { + self.line_data.clear(); + } -impl PlotMessageView { - fn new() -> Self { - Self::default() - } -} + let MsgSources { + x_field, y_fields, .. + } = &self.settings; -impl MessageView for PlotMessageView { - fn view_id(&self) -> ViewId { - self.id - } + for msg in messages { + let x: f64 = extract_from_message(&msg.message, [x_field]).unwrap()[0]; + let ys: Vec<f64> = extract_from_message(&msg.message, y_fields).unwrap(); - fn id_of_interest(&self) -> u32 { - self.settings.msg_id - } + if self.line_data.len() < ys.len() { + self.line_data.resize(ys.len(), Vec::new()); + } - fn is_valid(&self) -> bool { - self.cache_valid - } + for (line, y) in zip(&mut self.line_data, ys) { + let point = if x_field == "timestamp" { + [x / 1e6, y] + } else { + [x, y] + }; - fn populate_view(&mut self, msg_slice: &[TimedMessage]) -> MavlinkResult<()> { - self.points.clear(); - let MsgSources { - x_field, y_fields, .. - } = &self.settings; - for msg in msg_slice { - let x: f64 = extract_from_message(&msg.message, [x_field])?[0]; - let ys: Vec<f64> = extract_from_message(&msg.message, y_fields)?; - self.points.push((x, ys)); + line.push(point); + } } - Ok(()) + + self.state_valid = true; } - fn update_view(&mut self, msg_slice: &[TimedMessage]) -> MavlinkResult<()> { - let MsgSources { - x_field, y_fields, .. - } = &self.settings; - for msg in msg_slice { - let x: f64 = extract_from_message(&msg.message, [x_field])?[0]; - let ys: Vec<f64> = extract_from_message(&msg.message, y_fields)?; - self.points.push((x, ys)); - } - Ok(()) + fn get_message_subscription(&self) -> Option<u32> { + Some(self.settings.msg_id) + } + + fn should_send_message_history(&self) -> bool { + !self.state_valid } } diff --git a/src/ui/persistency/layout_manager.rs b/src/ui/persistency/layout_manager.rs index a7b857ca3af0ff779012f5ab98187621a3a67227..173fea11360bf4f0049957d210992ec38417c5b2 100644 --- a/src/ui/persistency/layout_manager.rs +++ b/src/ui/persistency/layout_manager.rs @@ -7,7 +7,7 @@ use std::{ use tracing::{info, trace, warn}; -use crate::{error::ErrInstrument, msg_broker}; +use crate::error::ErrInstrument; use super::super::composable_view::ComposableViewState; @@ -103,7 +103,6 @@ impl LayoutManager { .ok_or(anyhow::anyhow!("Layout not found"))?; *state = layout.clone(); self.current_layout = Some(path.as_ref().into()); - msg_broker!().unsubscribe_all_views(); Ok(()) } diff --git a/src/ui/widgets/reception_led.rs b/src/ui/widgets/reception_led.rs index e92f5f19b0e00786eeea85c599c5b4c9d7caa5dc..c6c1cb57b9e62c56b183800e41cea3380c19a478 100644 --- a/src/ui/widgets/reception_led.rs +++ b/src/ui/widgets/reception_led.rs @@ -1,15 +1,16 @@ use egui::{Color32, Response, Sense, Stroke, Ui, Vec2, Widget}; -use crate::{error::ErrInstrument, msg_broker}; +use crate::error::ErrInstrument; pub struct ReceptionLed { pub active: bool, + pub frequency: f64, } impl ReceptionLed { /// Create a new `ReceptionLed` widget based on the given state. - pub fn new(active: bool) -> Self { - Self { active } + pub fn new(active: bool, frequency: f64) -> Self { + Self { active, frequency } } } @@ -37,8 +38,7 @@ impl ReceptionLed { fn show_label(&self, ui: &mut Ui) -> Response { if self.active { - let freq = msg_broker!().reception_frequency(); - let text = format!("{} Hz", freq); + let text = format!("{} Hz", self.frequency); ui.label(text) } else { ui.label("N/A")