diff --git a/.gitignore b/.gitignore index f7cd28a5a7442878e937728999cb6803d04f6210..008d2b342c665e2d61cf728c7af8643c42e63420 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ # Build related /target + +# Logs +/logs diff --git a/Cargo.lock b/Cargo.lock index 3e2ead84baeaede34f2bd1217a5cdb3f8972e867..9ba641ebdda989d53134b402ded2883bdfdedcce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,7 +140,7 @@ dependencies = [ "once_cell", "serde", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -753,6 +753,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -784,6 +793,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "derivative" version = "2.2.0" @@ -2190,6 +2208,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-derive" version = "0.4.2" @@ -2619,13 +2643,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -2710,8 +2740,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", + "zerocopy 0.8.21", ] [[package]] @@ -2721,7 +2762,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -2733,6 +2784,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.1", +] + [[package]] name = "raw-window-handle" version = "0.6.2" @@ -2918,6 +2978,7 @@ dependencies = [ "enum_dispatch", "mavlink-bindgen", "profiling", + "rand 0.9.0", "ring-channel", "serde", "serde_json", @@ -2928,9 +2989,9 @@ dependencies = [ "thiserror 2.0.11", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "tracing-tracy", - "uuid", ] [[package]] @@ -3363,6 +3424,37 @@ dependencies = [ "weezl", ] +[[package]] +name = "time" +version = "0.3.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad298b01a40a23aac4580b67e3dbedb7cc8402f3592d7f49469de2ea4aecdd8" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c97a5b985b7c11d7bc27fa927dc4fe6af3a6dfb021d28deb60d3bf51e76ef" + +[[package]] +name = "time-macros" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8093bc3e81c3bc5f7879de09619d06c9a5a5e45ca44dfeeb7225bae38005c5c" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-skia" version = "0.11.4" @@ -3440,6 +3532,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.69", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.28" @@ -3472,6 +3576,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -3482,12 +3596,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -3616,16 +3733,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" -[[package]] -name = "uuid" -version = "1.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587" -dependencies = [ - "getrandom 0.3.1", - "serde", -] - [[package]] name = "valuable" version = "0.1.1" @@ -4502,7 +4609,7 @@ dependencies = [ "hex", "nix 0.29.0", "ordered-stream", - "rand", + "rand 0.8.5", "serde", "serde_repr", "sha1", @@ -4584,7 +4691,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf01143b2dd5d134f11f545cf9f1431b13b749695cb33bcce051e7568f99478" +dependencies = [ + "zerocopy-derive 0.8.21", ] [[package]] @@ -4598,6 +4714,17 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712c8386f4f4299382c9abee219bee7084f78fb939d88b6840fcc1320d5f6da2" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "zerofrom" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index 60bd536c6dfdb8946bbcaa6a6e28bed38e6b10b7..03ca6287fe321ae6cf764427956b606d6cdd3e9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,9 +28,12 @@ serialport = "4.7.0" # ========= Persistency ========= serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -# =========== Logging =========== +# =========== Tracing and profiling =========== tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +tracing-tracy = "0.11.4" +profiling = { version = "1.0", features = ["profile-with-tracy"] } +tracing-appender = "0.2" # =========== Utility =========== # for dynamic dispatch enum_dispatch = "0.3" @@ -39,6 +42,6 @@ strum_macros = "0.26" anyhow = "1.0" ring-channel = "0.12.0" thiserror = "2.0.7" -uuid = { version = "1.12.1", features = ["serde", "v7"] } -profiling = { version = "1.0", features = ["profile-with-tracy"] } -tracing-tracy = "0.11.4" + +[dev-dependencies] +rand = "0.9.0" diff --git a/justfile b/justfile index 613e908a144ea7e61f0d4e3997c57386b9fbb19e..ea6682b72a8b80b385a49c56e482096d98c68fb6 100644 --- a/justfile +++ b/justfile @@ -1,9 +1,13 @@ alias r := run +alias t := test alias d := doc default: just run +test *ARGS: + cargo nextest run {{ARGS}} + run LEVEL="debug": RUST_LOG=segs={{LEVEL}} cargo r diff --git a/src/communication.rs b/src/communication.rs new file mode 100644 index 0000000000000000000000000000000000000000..cf0bbcc8dc8788030308cea887fa7be37ced01ab --- /dev/null +++ b/src/communication.rs @@ -0,0 +1,185 @@ +//! Main communication module. +//! +//! Provides a unified interface for handling message transmission and reception +//! through different physical connection types (e.g., serial, Ethernet). +//! It also manages connections and message buffering. + +mod error; +pub mod ethernet; +pub mod serial; + +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + +use ring_channel::{RingReceiver, TryRecvError}; +use sealed::MessageTransceiver; + +use crate::mavlink::{MavConnection, MavFrame, MavMessage, TimedMessage}; + +// Re-exports +pub use error::{CommunicationError, ConnectionError}; +pub use ethernet::EthernetConfiguration; +pub use serial::SerialConfiguration; + +const MAX_STORED_MSGS: usize = 1000; // e.g., 192 bytes each = 192 KB + +pub(super) type BoxedConnection = Box<dyn MavConnection<MavMessage> + Send + Sync>; + +mod sealed { + use std::{ + num::NonZeroUsize, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + }; + + use enum_dispatch::enum_dispatch; + use ring_channel::ring_channel; + use skyward_mavlink::mavlink::{ + MavFrame, + error::{MessageReadError, MessageWriteError}, + }; + + use crate::{ + error::ErrInstrument, + mavlink::{MavMessage, TimedMessage}, + }; + + use super::{ + CommunicationError, Connection, ConnectionError, MAX_STORED_MSGS, + ethernet::EthernetTransceiver, serial::SerialTransceiver, + }; + + pub trait TransceiverConfigSealed {} + + /// Trait representing an entity that can be connected. + pub trait Connectable { + type Connected: MessageTransceiver; + + /// Establishes a connection based on the configuration. + fn connect(&self) -> Result<Self::Connected, ConnectionError>; + } + + /// Trait representing a message transceiver. + /// This trait abstracts the common operations for message transmission and reception. + /// It also provides a default implementation for opening a listening connection, while + /// being transparent to the actual Transceiver type. + #[enum_dispatch(Transceivers)] + pub trait MessageTransceiver: Send + Sync + Into<Transceivers> { + /// Blocks until a valid message is received. + fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError>; + + /// Transmits a message using the connection. + fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError>; + + /// Opens a listening connection and spawns a thread for message handling. + #[profiling::function] + fn open_listening_connection(self) -> Connection { + let running_flag = Arc::new(AtomicBool::new(true)); + let (tx, rx) = ring_channel(NonZeroUsize::new(MAX_STORED_MSGS).log_unwrap()); + let endpoint_inner = Arc::new(self.into()); + + { + let running_flag = running_flag.clone(); + let endpoint_inner = endpoint_inner.clone(); + // Detached thread for message handling; errors are logged. + let _ = std::thread::spawn(move || { + while running_flag.load(Ordering::Relaxed) { + match endpoint_inner.wait_for_message() { + Ok(msg) => { + tx.send(msg) + .map_err(|_| CommunicationError::ConnectionClosed)?; + } + Err(MessageReadError::Io(e)) => { + tracing::error!("Failed to read message: {e:#?}"); + running_flag.store(false, Ordering::Relaxed); + return Err(CommunicationError::Io(e)); + } + Err(MessageReadError::Parse(e)) => { + tracing::error!("Failed to read message: {e:#?}"); + } + } + } + Ok(()) + }); + } + + Connection { + transceiver: endpoint_inner, + rx_ring_channel: rx, + running_flag, + } + } + } + + impl<T: Connectable> TransceiverConfigSealed for T {} + + /// Enum representing the different types of transceivers. + #[enum_dispatch] + pub(super) enum Transceivers { + Serial(SerialTransceiver), + Ethernet(EthernetTransceiver), + } +} + +/// Trait to abstract common configuration types. +pub trait TransceiverConfig: sealed::TransceiverConfigSealed {} +impl<T: sealed::TransceiverConfigSealed> TransceiverConfig for T {} + +/// Extension trait to open a connection directly from a configuration. +pub trait TransceiverConfigExt: sealed::Connectable { + /// Opens a connection and returns a handle to it. + fn open_connection(&self) -> Result<Connection, ConnectionError> { + Ok(self.connect()?.open_listening_connection()) + } +} +impl<T: sealed::Connectable> TransceiverConfigExt for T {} + +/// Represents an active connection with buffered messages. +pub struct Connection { + transceiver: Arc<sealed::Transceivers>, + rx_ring_channel: RingReceiver<TimedMessage>, + running_flag: Arc<AtomicBool>, +} + +impl Connection { + /// Retrieves and clears stored messages. + #[profiling::function] + pub fn retrieve_messages(&self) -> Result<Vec<TimedMessage>, CommunicationError> { + let mut stored_msgs = Vec::new(); + loop { + match self.rx_ring_channel.try_recv() { + Ok(msg) => stored_msgs.push(msg), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Err(CommunicationError::ConnectionClosed); + } + } + } + Ok(stored_msgs) + } + + /// Sends a message over the connection. + #[profiling::function] + pub fn send_message(&self, msg: MavFrame<MavMessage>) -> Result<(), CommunicationError> { + self.transceiver.transmit_message(msg)?; + Ok(()) + } +} + +impl std::fmt::Debug for Connection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Connection") + .field("running_flag", &self.running_flag) + .finish() + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.running_flag.store(false, Ordering::Relaxed); + } +} diff --git a/src/communication/error.rs b/src/communication/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..42b0f1c36326ec31e6f4f39cd1d78b6885142aae --- /dev/null +++ b/src/communication/error.rs @@ -0,0 +1,34 @@ +//! Error handling for communication modules. +//! +//! Contains definitions for errors that can occur during serial or Ethernet communication. + +use skyward_mavlink::mavlink::error::MessageWriteError; +use thiserror::Error; + +/// Represents communication errors. +#[derive(Debug, Error)] +pub enum CommunicationError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Connection closed")] + ConnectionClosed, +} + +/// Represents errors during connection setup. +#[derive(Debug, Error)] +pub enum ConnectionError { + #[error("Wrong configuration: {0}")] + WrongConfiguration(String), + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Unknown error")] + Unknown(String), +} + +impl From<MessageWriteError> for CommunicationError { + fn from(e: MessageWriteError) -> Self { + match e { + MessageWriteError::Io(e) => Self::Io(e), + } + } +} diff --git a/src/communication/ethernet.rs b/src/communication/ethernet.rs new file mode 100644 index 0000000000000000000000000000000000000000..629887cd034a5f8a5553d70267de702861f23bd2 --- /dev/null +++ b/src/communication/ethernet.rs @@ -0,0 +1,68 @@ +//! Ethernet utilities module. +//! +//! Provides functionality to connect via Ethernet using UDP, allowing message +//! transmission and reception over a network. + +use skyward_mavlink::mavlink::{ + self, + error::{MessageReadError, MessageWriteError}, +}; +use tracing::{debug, trace}; + +use crate::mavlink::{MavFrame, MavMessage, MavlinkVersion, TimedMessage}; + +use super::{ + BoxedConnection, ConnectionError, + sealed::{Connectable, MessageTransceiver}, +}; + +/// Configuration for an Ethernet connection. +#[derive(Debug, Clone)] +pub struct EthernetConfiguration { + pub port: u16, +} + +impl Connectable for EthernetConfiguration { + type Connected = EthernetTransceiver; + + /// Binds to the specified UDP port to create a network connection. + #[profiling::function] + fn connect(&self) -> Result<Self::Connected, ConnectionError> { + let incoming_addr = format!("udpin:0.0.0.0:{}", self.port); + let outgoing_addr = format!("udpbcast:255.255.255.255:{}", self.port); + let mut incoming_conn: BoxedConnection = mavlink::connect(&incoming_addr)?; + let mut outgoing_conn: BoxedConnection = mavlink::connect(&outgoing_addr)?; + incoming_conn.set_protocol_version(MavlinkVersion::V1); + outgoing_conn.set_protocol_version(MavlinkVersion::V1); + debug!("Ethernet connections set up on port {}", self.port); + Ok(EthernetTransceiver { + incoming_conn, + outgoing_conn, + }) + } +} + +/// Manages a connection over Ethernet. +pub struct EthernetTransceiver { + incoming_conn: BoxedConnection, + outgoing_conn: BoxedConnection, +} + +impl MessageTransceiver for EthernetTransceiver { + /// Waits for a message over Ethernet, blocking until a valid message arrives. + #[profiling::function] + fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError> { + let (_, msg) = self.incoming_conn.recv()?; + debug!("Received message: {:?}", &msg); + Ok(TimedMessage::just_received(msg)) + } + + /// Transmits a message using the UDP socket. + #[profiling::function] + fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError> { + let written = self.outgoing_conn.send_frame(&msg)?; + debug!("Sent message: {:?}", msg); + trace!("Sent {} bytes via Ethernet", written); + Ok(written) + } +} diff --git a/src/communication/serial.rs b/src/communication/serial.rs new file mode 100644 index 0000000000000000000000000000000000000000..8f1c6fa73b6c4c924357f0050857d371f02d4762 --- /dev/null +++ b/src/communication/serial.rs @@ -0,0 +1,100 @@ +//! Serial port utilities module. +//! +//! Provides functions for listing USB serial ports, finding a STM32 port, +//! and handling serial connections including message transmission and reception. + +use serialport::{SerialPortInfo, SerialPortType}; +use skyward_mavlink::mavlink::{ + self, + error::{MessageReadError, MessageWriteError}, +}; +use tracing::{debug, trace}; + +use crate::mavlink::{MavFrame, MavMessage, MavlinkVersion, TimedMessage}; + +use super::{ + BoxedConnection, ConnectionError, + sealed::{Connectable, MessageTransceiver}, +}; + +pub const DEFAULT_BAUD_RATE: u32 = 115200; + +/// Returns a list of all USB serial ports available on the system. +/// +/// # Returns +/// * `Ok(Vec<SerialPortInfo>)` if ports are found or an error otherwise. +#[profiling::function] +pub fn list_all_usb_ports() -> Result<Vec<SerialPortInfo>, serialport::Error> { + let ports = serialport::available_ports()?; + Ok(ports + .into_iter() + .filter(|p| matches!(p.port_type, SerialPortType::UsbPort(_))) + .collect()) +} + +/// Finds the first USB serial port whose product name contains "STM32" or "ST-LINK". +/// +/// # Returns +/// * `Ok(Some(SerialPortInfo))` if a matching port is found, `Ok(None)` otherwise. +#[profiling::function] +pub fn find_first_stm32_port() -> Result<Option<SerialPortInfo>, serialport::Error> { + let ports = list_all_usb_ports()?; + for port in ports { + if let serialport::SerialPortType::UsbPort(info) = &port.port_type { + if let Some(p) = &info.product { + if p.contains("STM32") || p.contains("ST-LINK") { + return Ok(Some(port)); + } + } + } + } + Ok(None) +} + +/// Configuration for a serial connection. +#[derive(Debug, Clone)] +pub struct SerialConfiguration { + pub port_name: String, + pub baud_rate: u32, +} + +impl Connectable for SerialConfiguration { + type Connected = SerialTransceiver; + + /// Connects using the serial port configuration. + #[profiling::function] + fn connect(&self) -> Result<Self::Connected, ConnectionError> { + let serial_edpoint = format!("serial:{}:{}", self.port_name, self.baud_rate); + let mut mav_connection: BoxedConnection = mavlink::connect(&serial_edpoint)?; + mav_connection.set_protocol_version(MavlinkVersion::V1); + debug!( + "Connected to serial port {} with baud rate {}", + self.port_name, self.baud_rate + ); + Ok(SerialTransceiver { mav_connection }) + } +} + +/// Manages a connection to a serial port. +pub struct SerialTransceiver { + mav_connection: BoxedConnection, +} + +impl MessageTransceiver for SerialTransceiver { + /// Blocks until a valid message is received from the serial port. + #[profiling::function] + fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError> { + let (_, msg) = self.mav_connection.recv()?; + debug!("Received message: {:?}", &msg); + Ok(TimedMessage::just_received(msg)) + } + + /// Transmits a message via the serial connection. + #[profiling::function] + fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError> { + let written = self.mav_connection.send_frame(&msg)?; + debug!("Sent message: {:?}", msg); + trace!("Sent {} bytes via serial", written); + Ok(written) + } +} diff --git a/src/main.rs b/src/main.rs index 218319d5f1350f27aa49e55746a76ef01c3a9df4..9093f877687fd8cc2c26b394476e95f6fad98fce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,10 +2,10 @@ #![warn(clippy::unwrap_used)] #![warn(clippy::panic)] +mod communication; mod error; mod mavlink; mod message_broker; -mod serial; mod ui; mod utils; @@ -26,8 +26,15 @@ static APP_NAME: &str = "segs"; fn main() -> Result<(), eframe::Error> { // Set up logging (USE RUST_LOG=debug to see logs) let env_filter = EnvFilter::builder().from_env_lossy(); + let file_appender = tracing_appender::rolling::daily("logs/", "segs.log"); + let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_filter(env_filter)) + .with( + tracing_subscriber::fmt::layer() + .json() + .with_writer(non_blocking), + ) .with(tracing_tracy::TracyLayer::default()) .init(); diff --git a/src/mavlink/base.rs b/src/mavlink/base.rs index c10d9febcb6086cd7d2b9a62a7002c83508efe85..fb64c4cca761dc752c63498d8f5464cc5c475042 100644 --- a/src/mavlink/base.rs +++ b/src/mavlink/base.rs @@ -6,8 +6,6 @@ use std::time::Instant; -use skyward_mavlink::mavlink::peek_reader::PeekReader; - // Re-export from the mavlink crate pub use skyward_mavlink::{ mavlink::*, orion::*, @@ -60,11 +58,3 @@ where }) .collect()) } - -/// Read a stream of bytes and return an iterator of MavLink messages -pub fn byte_parser<'a>( - reader: impl std::io::Read + 'a, -) -> impl Iterator<Item = (MavHeader, MavMessage)> + 'a { - let mut reader = PeekReader::new(reader); - std::iter::from_fn(move || read_v1_msg(&mut reader).ok()) -} diff --git a/src/message_broker.rs b/src/message_broker.rs index 77932cf3501e3ffef906788227b951646eb5e8e3..7cf10e703a48e9144e2d6577723b453930ceddd8 100644 --- a/src/message_broker.rs +++ b/src/message_broker.rs @@ -10,176 +10,82 @@ mod reception_queue; pub use message_bundle::MessageBundle; use reception_queue::ReceptionQueue; -use crate::{ - error::ErrInstrument, - mavlink::{Message, TimedMessage, byte_parser}, - utils::RingBuffer, -}; -use anyhow::{Context, Result}; -use ring_channel::{RingReceiver, RingSender, ring_channel}; use std::{ collections::HashMap, - io::Write, - num::NonZeroUsize, - sync::{ - Arc, Mutex, - atomic::{AtomicBool, Ordering}, - }, - time::{Duration, Instant}, + sync::{Arc, Mutex}, + time::Duration, +}; + +use tracing::error; + +use crate::{ + communication::{Connection, ConnectionError, TransceiverConfigExt}, + error::ErrInstrument, + mavlink::{MavFrame, MavHeader, MavMessage, MavlinkVersion, Message, TimedMessage}, }; -use tokio::{net::UdpSocket, task::JoinHandle}; -use tracing::{debug, trace}; -/// Maximum size of the UDP buffer -const UDP_BUFFER_SIZE: usize = 65527; +const RECEPTION_QUEUE_INTERVAL: Duration = Duration::from_secs(1); +const SEGS_SYSTEM_ID: u8 = 1; +const SEGS_COMPONENT_ID: u8 = 1; /// The MessageBroker struct contains the state of the message broker. /// /// It is responsible for receiving messages from the Mavlink listener and /// dispatching them to the views that are interested in them. -#[derive(Debug)] pub struct MessageBroker { /// A map of all messages received so far, indexed by message ID messages: HashMap<u32, Vec<TimedMessage>>, /// instant queue used for frequency calculation and reception time last_receptions: Arc<Mutex<ReceptionQueue>>, - /// Flag to stop the listener - running_flag: Arc<AtomicBool>, - /// Listener message sender - tx: RingSender<TimedMessage>, - /// Broker message receiver - rx: RingReceiver<TimedMessage>, - /// Task handle for the listener - task: Option<JoinHandle<Result<()>>>, + /// Connection to the Mavlink listener + connection: Option<Connection>, /// Egui context ctx: egui::Context, } impl MessageBroker { /// Creates a new `MessageBroker` with the given channel size and Egui context. - pub fn new(channel_size: NonZeroUsize, ctx: egui::Context) -> Self { - let (tx, rx) = ring_channel(channel_size); + pub fn new(ctx: egui::Context) -> Self { Self { messages: HashMap::new(), // TODO: make this configurable - last_receptions: Arc::new(Mutex::new(ReceptionQueue::new(Duration::from_secs(1)))), - tx, - rx, + last_receptions: Arc::new(Mutex::new(ReceptionQueue::new(RECEPTION_QUEUE_INTERVAL))), + connection: None, ctx, - running_flag: Arc::new(AtomicBool::new(false)), - task: None, - } - } - - /// Stop the listener task from listening to incoming messages, if it is - /// running. - pub fn stop_listening(&mut self) { - self.running_flag.store(false, Ordering::Relaxed); - if let Some(t) = self.task.take() { - t.abort() } } /// Start a listener task that listens to incoming messages from the given - /// Ethernet port, and accumulates them in a ring buffer, read only when - /// views request a refresh. - pub fn listen_from_ethernet_port(&mut self, port: u16) { - // Stop the current listener if it exists - self.stop_listening(); - self.running_flag.store(true, Ordering::Relaxed); - let last_receptions = Arc::clone(&self.last_receptions); - - let tx = self.tx.clone(); - let ctx = self.ctx.clone(); - - let bind_address = format!("0.0.0.0:{}", port); - let mut buf = Box::new([0; UDP_BUFFER_SIZE]); - let running_flag = self.running_flag.clone(); - - debug!("Spawning listener task at {}", bind_address); - let handle = tokio::spawn(async move { - let socket = UdpSocket::bind(bind_address) - .await - .context("Failed to bind socket")?; - debug!("Listening on UDP"); - - while running_flag.load(Ordering::Relaxed) { - let (len, _) = socket - .recv_from(buf.as_mut_slice()) - .await - .context("Failed to receive message")?; - for (_, mav_message) in byte_parser(&buf[..len]) { - trace!("Received message: {:?}", mav_message); - tx.send(TimedMessage::just_received(mav_message)) - .context("Failed to send message")?; - last_receptions.lock().unwrap().push(Instant::now()); - ctx.request_repaint(); - } - } - - Ok::<(), anyhow::Error>(()) - }); - self.task = Some(handle); + /// medium (Serial or Ethernet) and stores them in a ring buffer. + pub fn open_connection( + &mut self, + config: impl TransceiverConfigExt, + ) -> Result<(), ConnectionError> { + self.connection = Some(config.open_connection()?); + Ok(()) } - /// Start a listener task that listens to incoming messages from the given - /// serial port and stores them in a ring buffer. - pub fn listen_from_serial_port(&mut self, port: String, baud_rate: u32) { - // Stop the current listener if it exists - self.stop_listening(); - self.running_flag.store(true, Ordering::Relaxed); - let last_receptions = Arc::clone(&self.last_receptions); - - let tx = self.tx.clone(); - let ctx = self.ctx.clone(); - - let running_flag = self.running_flag.clone(); - - debug!("Spawning listener task at {}", port); - let handle = tokio::task::spawn_blocking(move || { - let mut serial_port = serialport::new(port, baud_rate) - .timeout(std::time::Duration::from_millis(100)) - .open() - .context("Failed to open serial port")?; - debug!("Listening on serial port"); - - let mut ring_buf = RingBuffer::<1024>::new(); - let mut temp_buf = [0; 512]; - // need to do a better error handling for this (need toast errors) - while running_flag.load(Ordering::Relaxed) { - let result = serial_port - .read(&mut temp_buf) - .log_expect("Failed to read from serial port"); - debug!("Read {} bytes from serial port", result); - trace!("data read from serial: {:?}", &temp_buf[..result]); - ring_buf - .write(&temp_buf[..result]) - .log_expect("Failed to write to ring buffer, check buffer size"); - for (_, mav_message) in byte_parser(&mut ring_buf) { - debug!("Received message: {:?}", mav_message); - tx.send(TimedMessage::just_received(mav_message)) - .context("Failed to send message")?; - last_receptions.lock().unwrap().push(Instant::now()); - ctx.request_repaint(); - } - } + /// Stop the listener task from listening to incoming messages, if it is + /// running. + pub fn close_connection(&mut self) { + self.connection.take(); + } - Ok::<(), anyhow::Error>(()) - }); - self.task = Some(handle); + pub fn is_connected(&self) -> bool { + self.connection.is_some() } /// Returns the time since the last message was received. pub fn time_since_last_reception(&self) -> Option<Duration> { self.last_receptions .lock() - .unwrap() + .log_unwrap() .time_since_last_reception() } /// Returns the frequency of messages received in the last second. pub fn reception_frequency(&self) -> f64 { - self.last_receptions.lock().unwrap().frequency() + self.last_receptions.lock().log_unwrap().frequency() } pub fn get(&self, id: u32) -> &[TimedMessage] { @@ -188,15 +94,57 @@ impl MessageBroker { /// Processes incoming network messages. New messages are added to the /// given `MessageBundle`. - pub fn process_messages(&mut self, bundle: &mut MessageBundle) { - while let Ok(message) = self.rx.try_recv() { - bundle.insert(message.clone()); - - // Store the message in the broker - self.messages - .entry(message.message.message_id()) - .or_default() - .push(message); + #[profiling::function] + pub fn process_incoming_messages(&mut self, bundle: &mut MessageBundle) { + // process messages only if the connection is open + if let Some(connection) = &self.connection { + // check for communication errors, and log them + match connection.retrieve_messages() { + Ok(messages) => { + for message in messages { + bundle.insert(message.clone()); + + // Update the last reception time + self.last_receptions.lock().log_unwrap().push(message.time); + + // Store the message in the broker + self.messages + .entry(message.message.message_id()) + .or_default() + .push(message); + } + self.ctx.request_repaint(); + } + Err(e) => { + error!("Error while receiving messages: {:?}", e); + // TODO: user error handling, until them silently close the connection + self.close_connection(); + } + } + } + } + + /// Processes outgoing messages. + /// WARNING: This methods blocks the UI, thus a detailed profiling is needed. + /// FIXME + #[profiling::function] + pub fn process_outgoing_messages(&mut self, messages: Vec<MavMessage>) { + if let Some(connection) = &self.connection { + for msg in messages { + let header = MavHeader { + system_id: SEGS_SYSTEM_ID, + component_id: SEGS_COMPONENT_ID, + ..Default::default() + }; + let frame = MavFrame { + header, + msg, + protocol_version: MavlinkVersion::V1, + }; + if let Err(e) = connection.send_message(frame) { + error!("Error while transmitting message: {:?}", e); + } + } } } diff --git a/src/serial.rs b/src/serial.rs deleted file mode 100644 index 29ffb2d17feb44a2167696ec9c9e5226a151641d..0000000000000000000000000000000000000000 --- a/src/serial.rs +++ /dev/null @@ -1,28 +0,0 @@ -//! Serial port utilities -//! -//! This module provides utilities for working with serial ports, such as listing all available serial ports and finding the first serial port that contains "STM32" or "ST-LINK" in its product name. - -use anyhow::Context; - -use crate::error::ErrInstrument; - -/// Get the first serial port that contains "STM32" or "ST-LINK" in its product name -pub fn get_first_stm32_serial_port() -> Option<String> { - let ports = serialport::available_ports().log_expect("Serial ports cannot be listed!"); - for port in ports { - if let serialport::SerialPortType::UsbPort(info) = port.port_type { - if let Some(p) = info.product { - if p.contains("STM32") || p.contains("ST-LINK") { - return Some(port.port_name); - } - } - } - } - None -} - -/// Get a list of all serial ports available on the system -pub fn list_all_serial_ports() -> anyhow::Result<Vec<String>> { - let ports = serialport::available_ports().context("No serial ports found!")?; - Ok(ports.iter().map(|p| p.port_name.clone()).collect()) -} diff --git a/src/ui.rs b/src/ui.rs index 050d0cc5a07620a8ddb59d247f85b3bd707f6e93..383fabd62708460d45dd65f105faf3feb5f75975 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,9 +1,11 @@ mod app; +mod cache; mod panes; mod persistency; mod shortcuts; mod utils; mod widget_gallery; mod widgets; +pub mod windows; pub use app::App; diff --git a/src/ui/app.rs b/src/ui/app.rs index 05706322e45253c89352f5302b1de8ff527f203a..632d259eed8f0166222081bbee6eac571732552c 100644 --- a/src/ui/app.rs +++ b/src/ui/app.rs @@ -1,31 +1,30 @@ -use super::{ - panes::{Pane, PaneBehavior}, - persistency::{LayoutManager, LayoutManagerWindow}, - shortcuts, - utils::maximized_pane_ui, - widget_gallery::WidgetGallery, - widgets::reception_led::ReceptionLed, -}; -use crate::{ - error::ErrInstrument, - mavlink, - message_broker::{MessageBroker, MessageBundle}, - serial::{get_first_stm32_serial_port, list_all_serial_ports}, - ui::panes::PaneKind, -}; use eframe::CreationContext; -use egui::{Align2, Button, ComboBox, Key, Modifiers, Sides, Vec2}; -use egui_extras::{Size, StripBuilder}; +use egui::{Button, Key, Modifiers, Sides}; use egui_tiles::{Behavior, Container, Linear, LinearDir, Tile, TileId, Tiles, Tree}; use serde::{Deserialize, Serialize}; use std::{ fs, - num::NonZeroUsize, path::{Path, PathBuf}, time::{Duration, Instant}, }; use tracing::{debug, error, trace}; +use crate::{ + error::ErrInstrument, + mavlink::MavMessage, + message_broker::{MessageBroker, MessageBundle}, +}; + +use super::{ + panes::{Pane, PaneBehavior, PaneKind}, + persistency::LayoutManager, + shortcuts, + utils::maximized_pane_ui, + widget_gallery::WidgetGallery, + widgets::reception_led::ReceptionLed, + windows::{ConnectionsWindow, LayoutManagerWindow}, +}; + pub struct App { /// Persistent state of the app state: AppState, @@ -37,7 +36,7 @@ pub struct App { message_bundle: MessageBundle, // == Windows == widget_gallery: WidgetGallery, - sources_window: SourceWindow, + sources_window: ConnectionsWindow, layout_manager_window: LayoutManagerWindow, } @@ -45,7 +44,7 @@ pub struct App { impl eframe::App for App { // The update function is called each time the UI needs repainting! fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { - self.process_messages(); + self.process_incoming_messages(); let panes_tree = &mut self.state.panes_tree; @@ -242,6 +241,9 @@ impl eframe::App for App { self.behavior.action = Some(action); } + // Process outgoing messages + self.process_outgoing_messages(); + // Used for the profiler profiling::finish_frame!(); @@ -276,26 +278,23 @@ impl App { Self { state, layout_manager, - message_broker: MessageBroker::new( - NonZeroUsize::new(50).log_unwrap(), - ctx.egui_ctx.clone(), - ), + message_broker: MessageBroker::new(ctx.egui_ctx.clone()), widget_gallery: WidgetGallery::default(), behavior: AppBehavior::default(), maximized_pane: None, message_bundle: MessageBundle::default(), - sources_window: SourceWindow::default(), + sources_window: ConnectionsWindow::default(), layout_manager_window: LayoutManagerWindow::default(), } } /// Retrieves new messages from the message broker and dispatches them to the panes. #[profiling::function] - fn process_messages(&mut self) { + fn process_incoming_messages(&mut self) { let start = Instant::now(); self.message_broker - .process_messages(&mut self.message_bundle); + .process_incoming_messages(&mut self.message_bundle); // Skip updating the panes if there are no messages let count = self.message_bundle.count(); @@ -330,6 +329,26 @@ impl App { ); self.message_bundle.reset(); } + + /// Sends outgoing messages from the panes to the message broker. + #[profiling::function] + fn process_outgoing_messages(&mut self) { + let outgoing: Vec<MavMessage> = self + .state + .panes_tree + .tiles + .iter_mut() + .filter_map(|(_, tile)| { + if let Tile::Pane(pane) = tile { + Some(pane.drain_outgoing_messages()) + } else { + None + } + }) + .flatten() + .collect(); + self.message_broker.process_outgoing_messages(outgoing); + } } #[derive(Serialize, Deserialize, Clone, PartialEq)] @@ -374,179 +393,6 @@ impl AppState { } } -#[derive(Debug, PartialEq, Eq, Default)] -enum ConnectionKind { - #[default] - Ethernet, - Serial, -} - -#[derive(Debug)] -enum ConnectionDetails { - Ethernet { port: u16 }, - Serial { port: String, baud_rate: u32 }, -} - -impl Default for ConnectionDetails { - fn default() -> Self { - ConnectionDetails::Ethernet { - port: mavlink::DEFAULT_ETHERNET_PORT, - } - } -} - -#[derive(Debug, Default)] -struct SourceWindow { - visible: bool, - connected: bool, - connection_kind: ConnectionKind, - connection_details: ConnectionDetails, -} - -impl SourceWindow { - #[profiling::function] - fn show_window(&mut self, ui: &mut egui::Ui, message_broker: &mut MessageBroker) { - let mut window_is_open = self.visible; - let mut can_be_closed = false; - egui::Window::new("Sources") - .id(ui.id()) - .anchor(Align2::CENTER_CENTER, [0.0, 0.0]) - .max_width(200.0) - .collapsible(false) - .resizable(false) - .open(&mut window_is_open) - .show(ui.ctx(), |ui| { - self.ui(ui, &mut can_be_closed, message_broker); - }); - self.visible = window_is_open && !can_be_closed; - } - - fn ui( - &mut self, - ui: &mut egui::Ui, - can_be_closed: &mut bool, - message_broker: &mut MessageBroker, - ) { - let SourceWindow { - connected, - connection_kind, - connection_details, - .. - } = self; - ui.label("Select Source:"); - ui.horizontal_top(|ui| { - ui.radio_value(connection_kind, ConnectionKind::Ethernet, "Ethernet"); - ui.radio_value(connection_kind, ConnectionKind::Serial, "Serial"); - }); - - ui.separator(); - - match *connection_kind { - ConnectionKind::Ethernet => { - if !matches!(connection_details, ConnectionDetails::Ethernet { .. }) { - *connection_details = ConnectionDetails::Ethernet { - port: mavlink::DEFAULT_ETHERNET_PORT, - }; - } - let ConnectionDetails::Ethernet { port } = connection_details else { - error!("UNREACHABLE: Connection kind is not Ethernet"); - unreachable!("Connection kind is not Ethernet"); - }; - - egui::Grid::new("grid") - .num_columns(2) - .spacing([10.0, 5.0]) - .show(ui, |ui| { - ui.label("Ethernet Port:"); - ui.add(egui::DragValue::new(port).range(0..=65535).speed(10)); - ui.end_row(); - }); - } - ConnectionKind::Serial => { - if !matches!(connection_details, ConnectionDetails::Serial { .. }) { - *connection_details = ConnectionDetails::Serial { - // Default to the first STM32 serial port if available, otherwise - // default to the first serial port available - port: get_first_stm32_serial_port().unwrap_or( - list_all_serial_ports() - .ok() - .and_then(|ports| ports.first().cloned()) - .unwrap_or_default(), - ), - baud_rate: 115200, - }; - } - let ConnectionDetails::Serial { port, baud_rate } = connection_details else { - error!("UNREACHABLE: Connection kind is not Serial"); - unreachable!("Connection kind is not Serial"); - }; - - egui::Grid::new("grid") - .num_columns(2) - .spacing([10.0, 5.0]) - .show(ui, |ui| { - ui.label("Serial Port:"); - ComboBox::from_id_salt("serial_port") - .selected_text(port.clone()) - .show_ui(ui, |ui| { - for available_port in list_all_serial_ports().unwrap_or_default() { - ui.selectable_value( - port, - available_port.clone(), - available_port, - ); - } - }); - ui.end_row(); - ui.label("Baud Rate:"); - ui.add( - egui::DragValue::new(baud_rate) - .range(110..=256000) - .speed(100), - ); - ui.end_row(); - }); - } - }; - - ui.separator(); - - ui.allocate_ui(Vec2::new(ui.available_width(), 20.0), |ui| { - StripBuilder::new(ui) - .sizes(Size::remainder(), 2) // top cell - .horizontal(|mut strip| { - strip.cell(|ui| { - let btn1 = Button::new("Connect"); - ui.add_enabled_ui(!*connected, |ui| { - if ui.add_sized(ui.available_size(), btn1).clicked() { - match connection_details { - ConnectionDetails::Ethernet { port } => { - message_broker.listen_from_ethernet_port(*port); - } - ConnectionDetails::Serial { port, baud_rate } => { - message_broker - .listen_from_serial_port(port.clone(), *baud_rate); - } - } - *can_be_closed = true; - *connected = true; - } - }); - }); - strip.cell(|ui| { - let btn2 = Button::new("Disconnect"); - ui.add_enabled_ui(*connected, |ui| { - if ui.add_sized(ui.available_size(), btn2).clicked() { - message_broker.stop_listening(); - *connected = false; - } - }); - }); - }); - }); - } -} - /// Behavior for the tree of panes in the app #[derive(Default)] pub struct AppBehavior { diff --git a/src/ui/cache.rs b/src/ui/cache.rs new file mode 100644 index 0000000000000000000000000000000000000000..46b29cc2f5199005f2eb42e4ee4a5c827d457801 --- /dev/null +++ b/src/ui/cache.rs @@ -0,0 +1,92 @@ +//! Module for caching expensive UI calls using egui's temporary memory storage. +//! It provides utilities for caching the results of functions to avoid frequent recalculations. + +use std::time::{Duration, Instant}; + +use egui::Context; +use serialport::SerialPortInfo; + +use crate::{communication, error::ErrInstrument}; + +const SERIAL_PORT_REFRESH_INTERVAL: Duration = Duration::from_millis(500); + +/// Internal helper function that caches the result of a given function call for a specified duration. +/// +/// # Arguments +/// * `ctx` - The egui context used for caching. +/// * `id` - The unique identifier for the cached item. +/// * `fun` - The function whose return value is to be cached. +/// * `expiration_duration` - The duration after which the cache should be refreshed. +fn call<T, F>(ctx: &egui::Context, id: egui::Id, fun: F, expiration_duration: Duration) -> T +where + F: Fn() -> T, + T: Clone + Send + Sync + 'static, +{ + ctx.memory_mut(|m| { + match m.data.get_temp::<(T, Instant)>(id) { + None => { + m.data.insert_temp(id, (fun(), Instant::now())); + } + Some((_, i)) if i.elapsed() >= expiration_duration => { + m.data.insert_temp(id, (fun(), Instant::now())); + } + _ => {} + } + m.data.get_temp::<(T, Instant)>(id).log_unwrap().0 + }) +} + +/// A trait to extend egui's Context with a caching function. +pub trait CacheCall { + /// Calls the provided function and caches its result. + /// + /// # Arguments + /// * `id` - A unique identifier for the cached value. + /// * `fun` - The function to be cached. + /// * `expiration_duration` - The cache expiration duration. + fn call_cached<F, T>(&self, id: egui::Id, fun: F, expiration_duration: Duration) -> T + where + F: Fn() -> T, + T: Clone + Send + Sync + 'static; +} + +impl CacheCall for egui::Context { + /// Implements the caching call using the internal `call` function. + fn call_cached<F, T>(&self, id: egui::Id, fun: F, expiration_duration: Duration) -> T + where + F: Fn() -> T, + T: Clone + Send + Sync + 'static, + { + call(self, id, fun, expiration_duration) + } +} + +/// Returns a cached list of all available USB ports. +/// +/// # Arguments +/// * `ctx` - The egui context used for caching. +/// +/// # Returns +/// * A Result containing a vector of `SerialPortInfo` or a `serialport::Error`. +pub fn cached_list_all_usb_ports(ctx: &Context) -> Result<Vec<SerialPortInfo>, serialport::Error> { + ctx.call_cached( + egui::Id::new("list_usb_ports"), + communication::serial::list_all_usb_ports, + SERIAL_PORT_REFRESH_INTERVAL, + ) +} + +/// Returns the first cached STM32 port found, if any. +/// +/// # Arguments +/// * `ctx` - The egui context used for caching. +/// +/// # Returns +/// * A Result containing an Option of `SerialPortInfo` or a `serialport::Error`. +pub fn cached_first_stm32_port(ctx: &Context) -> Result<Option<SerialPortInfo>, serialport::Error> { + ctx.call_cached( + egui::Id::new("list_usb_ports"), + communication::serial::find_first_stm32_port, + SERIAL_PORT_REFRESH_INTERVAL, + ) +} diff --git a/src/ui/panes.rs b/src/ui/panes.rs index 17993c8fb276752f191453ac83478dabc0251211..34c56503162878e5c7cffd58321617b7814f1223 100644 --- a/src/ui/panes.rs +++ b/src/ui/panes.rs @@ -7,7 +7,7 @@ use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use strum_macros::{self, EnumIter, EnumMessage}; -use crate::mavlink::TimedMessage; +use crate::mavlink::{MavMessage, TimedMessage}; use super::app::PaneResponse; @@ -26,17 +26,28 @@ impl Pane { pub trait PaneBehavior { /// Renders the UI of the pane. fn ui(&mut self, ui: &mut egui::Ui, tile_id: TileId) -> PaneResponse; + /// Whether the pane contains the pointer. fn contains_pointer(&self) -> bool; /// Updates the pane state. This method is called before `ui` to allow the /// pane to update its state based on the messages received. - fn update(&mut self, messages: &[TimedMessage]); + fn update(&mut self, _messages: &[TimedMessage]) {} /// Returns the ID of the messages this pane is interested in, if any. - fn get_message_subscription(&self) -> Option<u32>; + fn get_message_subscription(&self) -> Option<u32> { + None + } + /// Checks whether the full message history should be sent to the pane. - fn should_send_message_history(&self) -> bool; + fn should_send_message_history(&self) -> bool { + false + } + + /// Drains the outgoing messages from the pane. + fn drain_outgoing_messages(&mut self) -> Vec<MavMessage> { + Vec::new() + } } impl PaneBehavior for Pane { @@ -59,6 +70,10 @@ impl PaneBehavior for Pane { fn should_send_message_history(&self) -> bool { self.pane.should_send_message_history() } + + fn drain_outgoing_messages(&mut self) -> Vec<MavMessage> { + self.pane.drain_outgoing_messages() + } } // An enum to represent the diffent kinds of widget available to the user. diff --git a/src/ui/panes/messages_viewer.rs b/src/ui/panes/messages_viewer.rs index 23a1f04516c444dbd4bb7dfe1b1a0c6c43a9c277..2c8a54f5e0c796dba09e932fee105fc99ee07ed1 100644 --- a/src/ui/panes/messages_viewer.rs +++ b/src/ui/panes/messages_viewer.rs @@ -32,14 +32,4 @@ impl PaneBehavior for MessagesViewerPane { fn contains_pointer(&self) -> bool { self.contains_pointer } - - fn update(&mut self, _messages: &[crate::mavlink::TimedMessage]) {} - - fn get_message_subscription(&self) -> Option<u32> { - None - } - - fn should_send_message_history(&self) -> bool { - false - } } diff --git a/src/ui/panes/plot.rs b/src/ui/panes/plot.rs index bfd2ca859f753b7d997b4a571cd3c76e86a4628c..e83bd2faa2a011abd45ccaba4bdbc23e6733be51 100644 --- a/src/ui/panes/plot.rs +++ b/src/ui/panes/plot.rs @@ -2,6 +2,7 @@ mod source_window; use super::PaneBehavior; use crate::{ + error::ErrInstrument, mavlink::{MessageData, ROCKET_FLIGHT_TM_DATA, TimedMessage, extract_from_message}, ui::app::PaneResponse, }; @@ -41,19 +42,6 @@ impl PaneBehavior for Plot2DPane { fn ui(&mut self, ui: &mut egui::Ui, _: TileId) -> PaneResponse { let mut response = PaneResponse::default(); - let mut settings = SourceSettings::new(&mut self.settings, &mut self.line_settings); - egui::Window::new("Plot Settings") - .id(ui.auto_id_with("plot_settings")) // TODO: fix this issue with ids - .auto_sized() - .collapsible(true) - .movable(true) - .open(&mut self.settings_visible) - .show(ui.ctx(), |ui| sources_window(ui, &mut settings)); - - if settings.are_sources_changed() { - self.state_valid = false; - } - let ctrl_pressed = ui.input(|i| i.modifiers.ctrl); egui_plot::Plot::new("plot") @@ -80,6 +68,19 @@ impl PaneBehavior for Plot2DPane { .context_menu(|ui| show_menu(ui, &mut self.settings_visible)); }); + let mut settings = SourceSettings::new(&mut self.settings, &mut self.line_settings); + egui::Window::new("Plot Settings") + .id(ui.auto_id_with("plot_settings")) // TODO: fix this issue with ids + .auto_sized() + .collapsible(true) + .movable(true) + .open(&mut self.settings_visible) + .show(ui.ctx(), |ui| sources_window(ui, &mut settings)); + + if settings.are_sources_changed() { + self.state_valid = false; + } + response } @@ -98,8 +99,8 @@ impl PaneBehavior for Plot2DPane { } = &self.settings; for msg in messages { - let x: f64 = extract_from_message(&msg.message, [x_field]).unwrap()[0]; - let ys: Vec<f64> = extract_from_message(&msg.message, y_fields).unwrap(); + let x: f64 = extract_from_message(&msg.message, [x_field]).log_unwrap()[0]; + let ys: Vec<f64> = extract_from_message(&msg.message, y_fields).log_unwrap(); if self.line_data.len() < ys.len() { self.line_data.resize(ys.len(), Vec::new()); diff --git a/src/ui/persistency.rs b/src/ui/persistency.rs index 7998384cd60b8800f37bfd19f3d862e9257f16e4..44bfc12a9e2e7c06e964a2069e107ef9239239ac 100644 --- a/src/ui/persistency.rs +++ b/src/ui/persistency.rs @@ -1,5 +1,3 @@ mod layout_manager; -mod layout_manager_window; pub use layout_manager::LayoutManager; -pub use layout_manager_window::LayoutManagerWindow; diff --git a/src/ui/windows.rs b/src/ui/windows.rs new file mode 100644 index 0000000000000000000000000000000000000000..557c348b3ec837db8ac7130fba57d5146ae6c8a4 --- /dev/null +++ b/src/ui/windows.rs @@ -0,0 +1,5 @@ +mod connections; +mod layouts; + +pub use connections::ConnectionsWindow; +pub use layouts::LayoutManagerWindow; diff --git a/src/ui/windows/connections.rs b/src/ui/windows/connections.rs new file mode 100644 index 0000000000000000000000000000000000000000..ebc34c32fcfd24d57cba0a17ff7f126d79b48e0b --- /dev/null +++ b/src/ui/windows/connections.rs @@ -0,0 +1,221 @@ +use egui::{Align2, Button, ComboBox, Context, RichText, Vec2}; +use egui_extras::{Size, StripBuilder}; +use tracing::{error, warn}; + +use crate::{ + communication::{ + ConnectionError, EthernetConfiguration, SerialConfiguration, serial::DEFAULT_BAUD_RATE, + }, + error::ErrInstrument, + mavlink::DEFAULT_ETHERNET_PORT, + message_broker::MessageBroker, + ui::cache::{cached_first_stm32_port, cached_list_all_usb_ports}, +}; + +#[derive(Default)] +pub struct ConnectionsWindow { + pub visible: bool, + connection_kind: ConnectionKind, + connection_config: ConnectionConfig, +} + +impl ConnectionsWindow { + #[profiling::function] + pub fn show_window(&mut self, ui: &mut egui::Ui, message_broker: &mut MessageBroker) { + let mut window_is_open = self.visible; + let mut can_be_closed = false; + egui::Window::new("Sources") + .id(ui.id()) + .anchor(Align2::CENTER_CENTER, [0.0, 0.0]) + .max_width(200.0) + .collapsible(false) + .resizable(false) + .open(&mut window_is_open) + .show(ui.ctx(), |ui| { + self.ui(ui, &mut can_be_closed, message_broker); + }); + self.visible = window_is_open && !can_be_closed; + } + + fn ui( + &mut self, + ui: &mut egui::Ui, + can_be_closed: &mut bool, + message_broker: &mut MessageBroker, + ) { + let ConnectionsWindow { + connection_kind, + connection_config, + .. + } = self; + ui.label("Select Source:"); + ui.horizontal_top(|ui| { + ui.radio_value(connection_kind, ConnectionKind::Ethernet, "Ethernet"); + ui.radio_value(connection_kind, ConnectionKind::Serial, "Serial"); + }); + + ui.separator(); + + match (connection_kind, &connection_config) { + (ConnectionKind::Ethernet, ConnectionConfig::Ethernet(_)) => {} + (ConnectionKind::Serial, ConnectionConfig::Serial(_)) => {} + (ConnectionKind::Ethernet, _) => { + *connection_config = ConnectionConfig::Ethernet(default_ethernet()); + } + (ConnectionKind::Serial, _) => { + *connection_config = ConnectionConfig::Serial( + default_serial(ui.ctx()).log_expect("USER ERROR: issues with serail ports"), + ); + } + } + + match connection_config { + ConnectionConfig::Ethernet(EthernetConfiguration { port: recv_port }) => { + egui::Grid::new("grid") + .num_columns(2) + .spacing([10.0, 5.0]) + .show(ui, |ui| { + ui.label("Ethernet Port:"); + ui.add(egui::DragValue::new(recv_port).range(0..=65535).speed(10)); + ui.end_row(); + }); + } + ConnectionConfig::Serial(opt) => { + egui::Grid::new("grid") + .num_columns(2) + .spacing([10.0, 5.0]) + .show(ui, |ui| { + ui.label("Serial Port:"); + match opt.as_mut() { + Some(SerialConfiguration { + port_name, + baud_rate, + }) => { + ComboBox::from_id_salt("serial_port") + .selected_text(port_name.as_str()) + .show_ui(ui, |ui| { + for available_port in + cached_list_all_usb_ports(ui.ctx()).log_unwrap() + { + ui.selectable_value( + port_name, + available_port.port_name.clone(), + available_port.port_name, + ); + } + }); + + ui.label("Baud Rate:"); + ui.add( + egui::DragValue::new(baud_rate) + .range(110..=256000) + .speed(100), + ); + ui.end_row(); + } + None => { + // in case of a serial connection missing + warn!("USER ERROR: No serial port found"); + ui.label(RichText::new("No port found").underline().strong()); + *opt = default_serial(ui.ctx()) + .log_expect("USER ERROR: issues with serial ports"); + } + } + + ui.end_row(); + }); + } + }; + + ui.separator(); + + ui.allocate_ui(Vec2::new(ui.available_width(), 20.0), |ui| { + StripBuilder::new(ui) + .sizes(Size::remainder(), 2) // top cell + .horizontal(|mut strip| { + strip.cell(|ui| { + let btn1 = Button::new("Connect"); + ui.add_enabled_ui( + !message_broker.is_connected() & connection_config.is_valid(), + |ui| { + if ui.add_sized(ui.available_size(), btn1).clicked() { + if let Err(e) = + connection_config.open_connection(message_broker) + { + error!("Failed to open connection: {:?}", e); // TODO: handle user erros + } + *can_be_closed = true; + } + }, + ); + }); + strip.cell(|ui| { + let btn2 = Button::new("Disconnect"); + ui.add_enabled_ui(message_broker.is_connected(), |ui| { + if ui.add_sized(ui.available_size(), btn2).clicked() { + message_broker.close_connection(); + } + }); + }); + }); + }); + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +enum ConnectionKind { + #[default] + Ethernet, + Serial, +} + +#[derive(Debug, Clone)] +pub enum ConnectionConfig { + Ethernet(EthernetConfiguration), + Serial(Option<SerialConfiguration>), +} + +fn default_ethernet() -> EthernetConfiguration { + EthernetConfiguration { + port: DEFAULT_ETHERNET_PORT, + } +} + +fn default_serial(ctx: &Context) -> Result<Option<SerialConfiguration>, serialport::Error> { + let port_name = + cached_first_stm32_port(ctx)? + .map(|port| port.port_name) + .or(cached_list_all_usb_ports(ctx) + .ok() + .and_then(|ports| ports.first().map(|port| port.port_name.clone()))); + Ok(port_name.map(|port_name| SerialConfiguration { + port_name, + baud_rate: DEFAULT_BAUD_RATE, + })) +} + +impl ConnectionConfig { + fn is_valid(&self) -> bool { + match self { + ConnectionConfig::Ethernet(_) => true, + ConnectionConfig::Serial(Some(_)) => true, + ConnectionConfig::Serial(None) => false, + } + } + + fn open_connection(&self, msg_broker: &mut MessageBroker) -> Result<(), ConnectionError> { + match self { + Self::Ethernet(config) => msg_broker.open_connection(config.clone()), + Self::Serial(Some(config)) => msg_broker.open_connection(config.clone()), + Self::Serial(None) => Err(ConnectionError::WrongConfiguration( + "No serial port found".to_string(), + )), + } + } +} + +impl Default for ConnectionConfig { + fn default() -> Self { + ConnectionConfig::Ethernet(default_ethernet()) + } +} diff --git a/src/ui/persistency/layout_manager_window.rs b/src/ui/windows/layouts.rs similarity index 99% rename from src/ui/persistency/layout_manager_window.rs rename to src/ui/windows/layouts.rs index d4be72e36d8dfdabdd2d6c06dbfb189eed2d2c4a..0407cef95636694e11805751d01dbbf13f7e6ad9 100644 --- a/src/ui/persistency/layout_manager_window.rs +++ b/src/ui/windows/layouts.rs @@ -8,9 +8,10 @@ use egui_extras::{Column, Size, StripBuilder, TableBuilder}; use egui_file::FileDialog; use tracing::{debug, error}; -use crate::{error::ErrInstrument, ui::app::AppState}; - -use super::LayoutManager; +use crate::{ + error::ErrInstrument, + ui::{app::AppState, persistency::LayoutManager}, +}; #[derive(Default)] pub struct LayoutManagerWindow { diff --git a/src/utils.rs b/src/utils.rs index d3699bbc4d745f0db992d5721d08abbd1ec550ba..7e22ff58e02d0b7d8c76696d474d7624e4ac3f77 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,58 +1,2 @@ -#[derive(Debug)] -pub struct RingBuffer<const G: usize> { - buffer: Box<[u8; G]>, - write_cursor: usize, - read_cursor: usize, -} +mod ring_buffer; -impl<const G: usize> std::io::Write for RingBuffer<G> { - fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { - let mut written = 0; - for byte in buf { - if self.is_full() { - Err(std::io::Error::new( - std::io::ErrorKind::WriteZero, - "Buffer full", - ))?; - } - self.buffer[self.write_cursor] = *byte; - self.write_cursor = (self.write_cursor + 1) % G; - written += 1; - } - Ok(written) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -impl<const G: usize> std::io::Read for RingBuffer<G> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { - let mut read = 0; - while read < buf.len() { - if self.read_cursor == self.write_cursor { - break; - } - buf[read] = self.buffer[self.read_cursor]; - self.read_cursor = (self.read_cursor + 1) % G; - read += 1; - } - Ok(read) - } -} - -impl<const G: usize> RingBuffer<G> { - pub fn new() -> Self { - Self { - buffer: Box::new([0; G]), - write_cursor: 0, - read_cursor: 0, - } - } - - #[inline] - pub fn is_full(&self) -> bool { - (self.write_cursor + 1) % G == self.read_cursor - } -} diff --git a/src/utils/ring_buffer.rs b/src/utils/ring_buffer.rs new file mode 100644 index 0000000000000000000000000000000000000000..c25df112697f5d3ff6737fe0b219b25c72d27010 --- /dev/null +++ b/src/utils/ring_buffer.rs @@ -0,0 +1,256 @@ + + +#[derive(Debug)] +pub struct RingBuffer<const G: usize> { + buffer: Box<[u8; G]>, + write_cursor: usize, + read_cursor: usize, +} + +impl<const G: usize> RingBuffer<G> { + pub fn new() -> Self { + Self { + buffer: Box::new([0; G]), + write_cursor: 0, + read_cursor: 0, + } + } + + pub fn len(&self) -> usize { + if self.write_cursor >= self.read_cursor { + self.write_cursor - self.read_cursor + } else { + G - self.read_cursor + self.write_cursor + } + } + + #[inline] + pub fn remaining_capacity(&self) -> usize { + G - self.len() - 1 + } + + #[inline] + pub fn is_full(&self) -> bool { + (self.write_cursor + 1) % G == self.read_cursor + } +} + +impl<const G: usize> std::io::Write for RingBuffer<G> { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + if self.remaining_capacity() < buf.len() { + Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "Buffer full", + ))?; + } + let mut written = 0; + for byte in buf { + self.buffer[self.write_cursor] = *byte; + self.write_cursor = (self.write_cursor + 1) % G; + written += 1; + } + Ok(written) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl<const G: usize> std::io::Read for RingBuffer<G> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + let mut read = 0; + while read < buf.len() { + if self.read_cursor == self.write_cursor { + break; + } + buf[read] = self.buffer[self.read_cursor]; + self.read_cursor = (self.read_cursor + 1) % G; + read += 1; + } + Ok(read) + } +} + +pub struct OverwritingRingBuffer<const G: usize> { + buffer: Box<[u8; G]>, + write_cursor: usize, + read_cursor: usize, + full: bool, // new flag to indicate if buffer is full +} + +impl<const G: usize> OverwritingRingBuffer<G> { + pub fn new() -> Self { + Self { + buffer: Box::new([0; G]), + write_cursor: 0, + read_cursor: 0, + full: false, // initialize full flag to false + } + } + + // Updated len() using full flag + pub fn len(&self) -> usize { + if self.full { + G + } else if self.write_cursor >= self.read_cursor { + self.write_cursor - self.read_cursor + } else { + G - self.read_cursor + self.write_cursor + } + } +} + +impl<const G: usize> std::io::Write for OverwritingRingBuffer<G> { + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + let mut written = 0; + for &byte in buf { + self.buffer[self.write_cursor] = byte; + self.write_cursor = (self.write_cursor + 1) % G; + if self.full { + // Buffer full; advance read_cursor to overwrite oldest + self.read_cursor = (self.read_cursor + 1) % G; + } + // Set full flag if write_cursor catches up to read_cursor + self.full = self.write_cursor == self.read_cursor; + written += 1; + } + Ok(written) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl<const G: usize> std::io::Read for OverwritingRingBuffer<G> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + let mut read = 0; + // If full, then buffer is not empty and we will clear full once reading starts. + while read < buf.len() { + // Buffer empty condition: not full and cursors equal + if !self.full && (self.read_cursor == self.write_cursor) { + break; + } + buf[read] = self.buffer[self.read_cursor]; + self.read_cursor = (self.read_cursor + 1) % G; + self.full = false; // once reading, clear full flag + read += 1; + } + Ok(read) + } +} + +#[allow(clippy::unwrap_used)] +#[cfg(test)] +mod tests { + use std::io::{Read, Write}; + + // Tests for RingBuffer + #[test] + fn test_ring_buffer_empty_read() { + let mut rb = super::RingBuffer::<8>::new(); + let mut buf = [0; 4]; + let n = rb.read(&mut buf).unwrap(); + assert_eq!(n, 0); + } + + #[test] + fn test_ring_buffer_write_then_read() { + let mut rb = super::RingBuffer::<8>::new(); + let data = [10, 20, 30, 40]; + let n = rb.write(&data).unwrap(); + assert_eq!(n, data.len()); + let mut buf = [0u8; 4]; + let n = rb.read(&mut buf).unwrap(); + assert_eq!(n, data.len()); + assert_eq!(buf, data); + } + + #[test] + fn test_ring_buffer_wraparound() { + let mut rb = super::RingBuffer::<8>::new(); + // Write initial data. + let data1 = [1, 2, 3, 4, 5, 6]; + let n = rb.write(&data1).unwrap(); + assert_eq!(n, data1.len()); + // Read a few bytes to free space. + let mut buf1 = [0u8; 3]; + let n = rb.read(&mut buf1).unwrap(); + assert_eq!(n, 3); + assert_eq!(buf1, [1, 2, 3]); + // Write additional data to wrap-around. + let data2 = [7, 8, 9]; + let n = rb.write(&data2).unwrap(); + assert_eq!(n, data2.len()); + // Read remaining data. + let mut buf2 = [0u8; 6]; + let n = rb.read(&mut buf2).unwrap(); + // Expected order: remaining from data1 ([4,5,6]) then data2 ([7,8,9]) + assert_eq!(n, 6); + assert_eq!(buf2, [4, 5, 6, 7, 8, 9]); + } + + #[test] + fn test_ring_buffer_full_error() { + let mut rb = super::RingBuffer::<4>::new(); + // Usable capacity is G - 1: 3 bytes. + let _ = rb.write(&[10, 20, 30]).unwrap(); + let res = rb.write(&[40]); + assert!(res.is_err()); + } + + #[test] + fn test_ring_buffer_partial_write_when_full() { + let mut rb = super::RingBuffer::<6>::new(); + // Usable capacity: 5 bytes. + let n = rb.write(&[1, 2, 3, 4]).unwrap(); + assert_eq!(n, 4); + // Only one byte can be written before buffer is full. + // The write should error on the second byte. + let res = rb.write(&[5, 6]); + assert!(res.is_err()); + + // Read the first 5 bytes. + let mut buf = [0u8; 5]; + let n = rb.read(&mut buf).unwrap(); + assert_eq!(n, 4); + assert_eq!(buf, [1, 2, 3, 4, 0]); + } + + // Tests for OverwritingRingBuffer + #[test] + fn test_overwriting_ring_buffer_empty_read() { + let mut orb = super::OverwritingRingBuffer::<8>::new(); + let mut buf = [0; 4]; + let n = orb.read(&mut buf).unwrap(); + assert_eq!(n, 0); + } + + #[test] + fn test_overwriting_ring_buffer_write_then_read() { + let mut orb = super::OverwritingRingBuffer::<8>::new(); + let data = [10, 20, 30, 40]; + let n = orb.write(&data).unwrap(); + assert_eq!(n, data.len()); + let mut buf = [0u8; 4]; + let n = orb.read(&mut buf).unwrap(); + assert_eq!(n, data.len()); + assert_eq!(buf, data); + } + + #[test] + fn test_overwriting_ring_buffer_overwrite() { + let mut orb = super::OverwritingRingBuffer::<6>::new(); + // Write initial 5 elements (usable capacity = 5). + let _ = orb.write(&[1, 2, 3, 4, 5]).unwrap(); + // Write two more elements to force overwrite. + let _ = orb.write(&[6, 7]).unwrap(); + // After overwrite, expected order starts from index advanced by one overwrite + // Expected stored order: [2,3,4,5,6,7] + let mut buf = [0u8; 6]; + let n = orb.read(&mut buf).unwrap(); + assert_eq!(n, 6); + assert_eq!(buf, [2, 3, 4, 5, 6, 7]); + } +}