From 7e7cab1684f119864032cfa23c4504d144887781 Mon Sep 17 00:00:00 2001
From: Federico Lolli <federico.lolli@skywarder.eu>
Date: Fri, 7 Mar 2025 08:43:08 +0000
Subject: [PATCH] Refactored Mavlink Communication for Improved Reliability

---
 .gitignore                                    |   3 +
 Cargo.lock                                    | 163 +++++++++--
 Cargo.toml                                    |  13 +-
 justfile                                      |   4 +
 src/communication.rs                          | 185 +++++++++++++
 src/communication/error.rs                    |  34 +++
 src/communication/ethernet.rs                 |  68 +++++
 src/communication/serial.rs                   | 100 +++++++
 src/main.rs                                   |   9 +-
 src/mavlink/base.rs                           |  10 -
 src/message_broker.rs                         | 222 ++++++---------
 src/serial.rs                                 |  28 --
 src/ui.rs                                     |   2 +
 src/ui/app.rs                                 | 246 ++++-------------
 src/ui/cache.rs                               |  92 +++++++
 src/ui/panes.rs                               |  23 +-
 src/ui/panes/messages_viewer.rs               |  10 -
 src/ui/panes/plot.rs                          |  31 ++-
 src/ui/persistency.rs                         |   2 -
 src/ui/windows.rs                             |   5 +
 src/ui/windows/connections.rs                 | 221 +++++++++++++++
 .../layouts.rs}                               |   7 +-
 src/utils.rs                                  |  58 +---
 src/utils/ring_buffer.rs                      | 256 ++++++++++++++++++
 24 files changed, 1302 insertions(+), 490 deletions(-)
 create mode 100644 src/communication.rs
 create mode 100644 src/communication/error.rs
 create mode 100644 src/communication/ethernet.rs
 create mode 100644 src/communication/serial.rs
 delete mode 100644 src/serial.rs
 create mode 100644 src/ui/cache.rs
 create mode 100644 src/ui/windows.rs
 create mode 100644 src/ui/windows/connections.rs
 rename src/ui/{persistency/layout_manager_window.rs => windows/layouts.rs} (99%)
 create mode 100644 src/utils/ring_buffer.rs

diff --git a/.gitignore b/.gitignore
index f7cd28a..008d2b3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,6 @@
 
 # Build related
 /target
+
+# Logs
+/logs
diff --git a/Cargo.lock b/Cargo.lock
index 3e2ead8..9ba641e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -140,7 +140,7 @@ dependencies = [
  "once_cell",
  "serde",
  "version_check",
- "zerocopy",
+ "zerocopy 0.7.35",
 ]
 
 [[package]]
@@ -753,6 +753,15 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "crossbeam-channel"
+version = "0.5.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-queue"
 version = "0.3.12"
@@ -784,6 +793,15 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991"
 
+[[package]]
+name = "deranged"
+version = "0.3.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4"
+dependencies = [
+ "powerfmt",
+]
+
 [[package]]
 name = "derivative"
 version = "2.2.0"
@@ -2190,6 +2208,12 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "num-conv"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
+
 [[package]]
 name = "num-derive"
 version = "0.4.2"
@@ -2619,13 +2643,19 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "powerfmt"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
+
 [[package]]
 name = "ppv-lite86"
 version = "0.2.20"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
 dependencies = [
- "zerocopy",
+ "zerocopy 0.7.35",
 ]
 
 [[package]]
@@ -2710,8 +2740,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
 dependencies = [
  "libc",
- "rand_chacha",
- "rand_core",
+ "rand_chacha 0.3.1",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "rand"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
+dependencies = [
+ "rand_chacha 0.9.0",
+ "rand_core 0.9.3",
+ "zerocopy 0.8.21",
 ]
 
 [[package]]
@@ -2721,7 +2762,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
 dependencies = [
  "ppv-lite86",
- "rand_core",
+ "rand_core 0.6.4",
+]
+
+[[package]]
+name = "rand_chacha"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
+dependencies = [
+ "ppv-lite86",
+ "rand_core 0.9.3",
 ]
 
 [[package]]
@@ -2733,6 +2784,15 @@ dependencies = [
  "getrandom 0.2.15",
 ]
 
+[[package]]
+name = "rand_core"
+version = "0.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
+dependencies = [
+ "getrandom 0.3.1",
+]
+
 [[package]]
 name = "raw-window-handle"
 version = "0.6.2"
@@ -2918,6 +2978,7 @@ dependencies = [
  "enum_dispatch",
  "mavlink-bindgen",
  "profiling",
+ "rand 0.9.0",
  "ring-channel",
  "serde",
  "serde_json",
@@ -2928,9 +2989,9 @@ dependencies = [
  "thiserror 2.0.11",
  "tokio",
  "tracing",
+ "tracing-appender",
  "tracing-subscriber",
  "tracing-tracy",
- "uuid",
 ]
 
 [[package]]
@@ -3363,6 +3424,37 @@ dependencies = [
  "weezl",
 ]
 
+[[package]]
+name = "time"
+version = "0.3.39"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dad298b01a40a23aac4580b67e3dbedb7cc8402f3592d7f49469de2ea4aecdd8"
+dependencies = [
+ "deranged",
+ "itoa",
+ "num-conv",
+ "powerfmt",
+ "serde",
+ "time-core",
+ "time-macros",
+]
+
+[[package]]
+name = "time-core"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "765c97a5b985b7c11d7bc27fa927dc4fe6af3a6dfb021d28deb60d3bf51e76ef"
+
+[[package]]
+name = "time-macros"
+version = "0.2.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e8093bc3e81c3bc5f7879de09619d06c9a5a5e45ca44dfeeb7225bae38005c5c"
+dependencies = [
+ "num-conv",
+ "time-core",
+]
+
 [[package]]
 name = "tiny-skia"
 version = "0.11.4"
@@ -3440,6 +3532,18 @@ dependencies = [
  "tracing-core",
 ]
 
+[[package]]
+name = "tracing-appender"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
+dependencies = [
+ "crossbeam-channel",
+ "thiserror 1.0.69",
+ "time",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "tracing-attributes"
 version = "0.1.28"
@@ -3472,6 +3576,16 @@ dependencies = [
  "tracing-core",
 ]
 
+[[package]]
+name = "tracing-serde"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
+dependencies = [
+ "serde",
+ "tracing-core",
+]
+
 [[package]]
 name = "tracing-subscriber"
 version = "0.3.19"
@@ -3482,12 +3596,15 @@ dependencies = [
  "nu-ansi-term",
  "once_cell",
  "regex",
+ "serde",
+ "serde_json",
  "sharded-slab",
  "smallvec",
  "thread_local",
  "tracing",
  "tracing-core",
  "tracing-log",
+ "tracing-serde",
 ]
 
 [[package]]
@@ -3616,16 +3733,6 @@ version = "1.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
 
-[[package]]
-name = "uuid"
-version = "1.15.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587"
-dependencies = [
- "getrandom 0.3.1",
- "serde",
-]
-
 [[package]]
 name = "valuable"
 version = "0.1.1"
@@ -4502,7 +4609,7 @@ dependencies = [
  "hex",
  "nix 0.29.0",
  "ordered-stream",
- "rand",
+ "rand 0.8.5",
  "serde",
  "serde_repr",
  "sha1",
@@ -4584,7 +4691,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
 dependencies = [
  "byteorder",
- "zerocopy-derive",
+ "zerocopy-derive 0.7.35",
+]
+
+[[package]]
+name = "zerocopy"
+version = "0.8.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dcf01143b2dd5d134f11f545cf9f1431b13b749695cb33bcce051e7568f99478"
+dependencies = [
+ "zerocopy-derive 0.8.21",
 ]
 
 [[package]]
@@ -4598,6 +4714,17 @@ dependencies = [
  "syn 2.0.98",
 ]
 
+[[package]]
+name = "zerocopy-derive"
+version = "0.8.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712c8386f4f4299382c9abee219bee7084f78fb939d88b6840fcc1320d5f6da2"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.98",
+]
+
 [[package]]
 name = "zerofrom"
 version = "0.1.6"
diff --git a/Cargo.toml b/Cargo.toml
index 60bd536..03ca628 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,9 +28,12 @@ serialport = "4.7.0"
 # ========= Persistency =========
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
-# =========== Logging ===========
+# =========== Tracing and profiling ===========
 tracing = "0.1"
-tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
+tracing-tracy = "0.11.4"
+profiling = { version = "1.0", features = ["profile-with-tracy"] }
+tracing-appender = "0.2"
 # =========== Utility ===========
 # for dynamic dispatch
 enum_dispatch = "0.3"
@@ -39,6 +42,6 @@ strum_macros = "0.26"
 anyhow = "1.0"
 ring-channel = "0.12.0"
 thiserror = "2.0.7"
-uuid = { version = "1.12.1", features = ["serde", "v7"] }
-profiling = { version = "1.0", features = ["profile-with-tracy"] }
-tracing-tracy = "0.11.4"
+
+[dev-dependencies]
+rand = "0.9.0"
diff --git a/justfile b/justfile
index 613e908..ea6682b 100644
--- a/justfile
+++ b/justfile
@@ -1,9 +1,13 @@
 alias r := run
+alias t := test
 alias d := doc
 
 default:
     just run
 
+test *ARGS:
+    cargo nextest run {{ARGS}}
+
 run LEVEL="debug":
     RUST_LOG=segs={{LEVEL}} cargo r
 
diff --git a/src/communication.rs b/src/communication.rs
new file mode 100644
index 0000000..cf0bbcc
--- /dev/null
+++ b/src/communication.rs
@@ -0,0 +1,185 @@
+//! Main communication module.
+//!
+//! Provides a unified interface for handling message transmission and reception
+//! through different physical connection types (e.g., serial, Ethernet).
+//! It also manages connections and message buffering.
+
+mod error;
+pub mod ethernet;
+pub mod serial;
+
+use std::sync::{
+    Arc,
+    atomic::{AtomicBool, Ordering},
+};
+
+use ring_channel::{RingReceiver, TryRecvError};
+use sealed::MessageTransceiver;
+
+use crate::mavlink::{MavConnection, MavFrame, MavMessage, TimedMessage};
+
+// Re-exports
+pub use error::{CommunicationError, ConnectionError};
+pub use ethernet::EthernetConfiguration;
+pub use serial::SerialConfiguration;
+
+const MAX_STORED_MSGS: usize = 1000; // e.g., 192 bytes each = 192 KB
+
+pub(super) type BoxedConnection = Box<dyn MavConnection<MavMessage> + Send + Sync>;
+
+mod sealed {
+    use std::{
+        num::NonZeroUsize,
+        sync::{
+            Arc,
+            atomic::{AtomicBool, Ordering},
+        },
+    };
+
+    use enum_dispatch::enum_dispatch;
+    use ring_channel::ring_channel;
+    use skyward_mavlink::mavlink::{
+        MavFrame,
+        error::{MessageReadError, MessageWriteError},
+    };
+
+    use crate::{
+        error::ErrInstrument,
+        mavlink::{MavMessage, TimedMessage},
+    };
+
+    use super::{
+        CommunicationError, Connection, ConnectionError, MAX_STORED_MSGS,
+        ethernet::EthernetTransceiver, serial::SerialTransceiver,
+    };
+
+    pub trait TransceiverConfigSealed {}
+
+    /// Trait representing an entity that can be connected.
+    pub trait Connectable {
+        type Connected: MessageTransceiver;
+
+        /// Establishes a connection based on the configuration.
+        fn connect(&self) -> Result<Self::Connected, ConnectionError>;
+    }
+
+    /// Trait representing a message transceiver.
+    /// This trait abstracts the common operations for message transmission and reception.
+    /// It also provides a default implementation for opening a listening connection, while
+    /// being transparent to the actual Transceiver type.
+    #[enum_dispatch(Transceivers)]
+    pub trait MessageTransceiver: Send + Sync + Into<Transceivers> {
+        /// Blocks until a valid message is received.
+        fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError>;
+
+        /// Transmits a message using the connection.
+        fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError>;
+
+        /// Opens a listening connection and spawns a thread for message handling.
+        #[profiling::function]
+        fn open_listening_connection(self) -> Connection {
+            let running_flag = Arc::new(AtomicBool::new(true));
+            let (tx, rx) = ring_channel(NonZeroUsize::new(MAX_STORED_MSGS).log_unwrap());
+            let endpoint_inner = Arc::new(self.into());
+
+            {
+                let running_flag = running_flag.clone();
+                let endpoint_inner = endpoint_inner.clone();
+                // Detached thread for message handling; errors are logged.
+                let _ = std::thread::spawn(move || {
+                    while running_flag.load(Ordering::Relaxed) {
+                        match endpoint_inner.wait_for_message() {
+                            Ok(msg) => {
+                                tx.send(msg)
+                                    .map_err(|_| CommunicationError::ConnectionClosed)?;
+                            }
+                            Err(MessageReadError::Io(e)) => {
+                                tracing::error!("Failed to read message: {e:#?}");
+                                running_flag.store(false, Ordering::Relaxed);
+                                return Err(CommunicationError::Io(e));
+                            }
+                            Err(MessageReadError::Parse(e)) => {
+                                tracing::error!("Failed to read message: {e:#?}");
+                            }
+                        }
+                    }
+                    Ok(())
+                });
+            }
+
+            Connection {
+                transceiver: endpoint_inner,
+                rx_ring_channel: rx,
+                running_flag,
+            }
+        }
+    }
+
+    impl<T: Connectable> TransceiverConfigSealed for T {}
+
+    /// Enum representing the different types of transceivers.
+    #[enum_dispatch]
+    pub(super) enum Transceivers {
+        Serial(SerialTransceiver),
+        Ethernet(EthernetTransceiver),
+    }
+}
+
+/// Trait to abstract common configuration types.
+pub trait TransceiverConfig: sealed::TransceiverConfigSealed {}
+impl<T: sealed::TransceiverConfigSealed> TransceiverConfig for T {}
+
+/// Extension trait to open a connection directly from a configuration.
+pub trait TransceiverConfigExt: sealed::Connectable {
+    /// Opens a connection and returns a handle to it.
+    fn open_connection(&self) -> Result<Connection, ConnectionError> {
+        Ok(self.connect()?.open_listening_connection())
+    }
+}
+impl<T: sealed::Connectable> TransceiverConfigExt for T {}
+
+/// Represents an active connection with buffered messages.
+pub struct Connection {
+    transceiver: Arc<sealed::Transceivers>,
+    rx_ring_channel: RingReceiver<TimedMessage>,
+    running_flag: Arc<AtomicBool>,
+}
+
+impl Connection {
+    /// Retrieves and clears stored messages.
+    #[profiling::function]
+    pub fn retrieve_messages(&self) -> Result<Vec<TimedMessage>, CommunicationError> {
+        let mut stored_msgs = Vec::new();
+        loop {
+            match self.rx_ring_channel.try_recv() {
+                Ok(msg) => stored_msgs.push(msg),
+                Err(TryRecvError::Empty) => break,
+                Err(TryRecvError::Disconnected) => {
+                    return Err(CommunicationError::ConnectionClosed);
+                }
+            }
+        }
+        Ok(stored_msgs)
+    }
+
+    /// Sends a message over the connection.
+    #[profiling::function]
+    pub fn send_message(&self, msg: MavFrame<MavMessage>) -> Result<(), CommunicationError> {
+        self.transceiver.transmit_message(msg)?;
+        Ok(())
+    }
+}
+
+impl std::fmt::Debug for Connection {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Connection")
+            .field("running_flag", &self.running_flag)
+            .finish()
+    }
+}
+
+impl Drop for Connection {
+    fn drop(&mut self) {
+        self.running_flag.store(false, Ordering::Relaxed);
+    }
+}
diff --git a/src/communication/error.rs b/src/communication/error.rs
new file mode 100644
index 0000000..42b0f1c
--- /dev/null
+++ b/src/communication/error.rs
@@ -0,0 +1,34 @@
+//! Error handling for communication modules.
+//!
+//! Contains definitions for errors that can occur during serial or Ethernet communication.
+
+use skyward_mavlink::mavlink::error::MessageWriteError;
+use thiserror::Error;
+
+/// Represents communication errors.
+#[derive(Debug, Error)]
+pub enum CommunicationError {
+    #[error("IO error: {0}")]
+    Io(#[from] std::io::Error),
+    #[error("Connection closed")]
+    ConnectionClosed,
+}
+
+/// Represents errors during connection setup.
+#[derive(Debug, Error)]
+pub enum ConnectionError {
+    #[error("Wrong configuration: {0}")]
+    WrongConfiguration(String),
+    #[error("IO error: {0}")]
+    Io(#[from] std::io::Error),
+    #[error("Unknown error")]
+    Unknown(String),
+}
+
+impl From<MessageWriteError> for CommunicationError {
+    fn from(e: MessageWriteError) -> Self {
+        match e {
+            MessageWriteError::Io(e) => Self::Io(e),
+        }
+    }
+}
diff --git a/src/communication/ethernet.rs b/src/communication/ethernet.rs
new file mode 100644
index 0000000..629887c
--- /dev/null
+++ b/src/communication/ethernet.rs
@@ -0,0 +1,68 @@
+//! Ethernet utilities module.
+//!
+//! Provides functionality to connect via Ethernet using UDP, allowing message
+//! transmission and reception over a network.
+
+use skyward_mavlink::mavlink::{
+    self,
+    error::{MessageReadError, MessageWriteError},
+};
+use tracing::{debug, trace};
+
+use crate::mavlink::{MavFrame, MavMessage, MavlinkVersion, TimedMessage};
+
+use super::{
+    BoxedConnection, ConnectionError,
+    sealed::{Connectable, MessageTransceiver},
+};
+
+/// Configuration for an Ethernet connection.
+#[derive(Debug, Clone)]
+pub struct EthernetConfiguration {
+    pub port: u16,
+}
+
+impl Connectable for EthernetConfiguration {
+    type Connected = EthernetTransceiver;
+
+    /// Binds to the specified UDP port to create a network connection.
+    #[profiling::function]
+    fn connect(&self) -> Result<Self::Connected, ConnectionError> {
+        let incoming_addr = format!("udpin:0.0.0.0:{}", self.port);
+        let outgoing_addr = format!("udpbcast:255.255.255.255:{}", self.port);
+        let mut incoming_conn: BoxedConnection = mavlink::connect(&incoming_addr)?;
+        let mut outgoing_conn: BoxedConnection = mavlink::connect(&outgoing_addr)?;
+        incoming_conn.set_protocol_version(MavlinkVersion::V1);
+        outgoing_conn.set_protocol_version(MavlinkVersion::V1);
+        debug!("Ethernet connections set up on port {}", self.port);
+        Ok(EthernetTransceiver {
+            incoming_conn,
+            outgoing_conn,
+        })
+    }
+}
+
+/// Manages a connection over Ethernet.
+pub struct EthernetTransceiver {
+    incoming_conn: BoxedConnection,
+    outgoing_conn: BoxedConnection,
+}
+
+impl MessageTransceiver for EthernetTransceiver {
+    /// Waits for a message over Ethernet, blocking until a valid message arrives.
+    #[profiling::function]
+    fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError> {
+        let (_, msg) = self.incoming_conn.recv()?;
+        debug!("Received message: {:?}", &msg);
+        Ok(TimedMessage::just_received(msg))
+    }
+
+    /// Transmits a message using the UDP socket.
+    #[profiling::function]
+    fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError> {
+        let written = self.outgoing_conn.send_frame(&msg)?;
+        debug!("Sent message: {:?}", msg);
+        trace!("Sent {} bytes via Ethernet", written);
+        Ok(written)
+    }
+}
diff --git a/src/communication/serial.rs b/src/communication/serial.rs
new file mode 100644
index 0000000..8f1c6fa
--- /dev/null
+++ b/src/communication/serial.rs
@@ -0,0 +1,100 @@
+//! Serial port utilities module.
+//!
+//! Provides functions for listing USB serial ports, finding a STM32 port,
+//! and handling serial connections including message transmission and reception.
+
+use serialport::{SerialPortInfo, SerialPortType};
+use skyward_mavlink::mavlink::{
+    self,
+    error::{MessageReadError, MessageWriteError},
+};
+use tracing::{debug, trace};
+
+use crate::mavlink::{MavFrame, MavMessage, MavlinkVersion, TimedMessage};
+
+use super::{
+    BoxedConnection, ConnectionError,
+    sealed::{Connectable, MessageTransceiver},
+};
+
+pub const DEFAULT_BAUD_RATE: u32 = 115200;
+
+/// Returns a list of all USB serial ports available on the system.
+///
+/// # Returns
+/// * `Ok(Vec<SerialPortInfo>)` if ports are found or an error otherwise.
+#[profiling::function]
+pub fn list_all_usb_ports() -> Result<Vec<SerialPortInfo>, serialport::Error> {
+    let ports = serialport::available_ports()?;
+    Ok(ports
+        .into_iter()
+        .filter(|p| matches!(p.port_type, SerialPortType::UsbPort(_)))
+        .collect())
+}
+
+/// Finds the first USB serial port whose product name contains "STM32" or "ST-LINK".
+///
+/// # Returns
+/// * `Ok(Some(SerialPortInfo))` if a matching port is found, `Ok(None)` otherwise.
+#[profiling::function]
+pub fn find_first_stm32_port() -> Result<Option<SerialPortInfo>, serialport::Error> {
+    let ports = list_all_usb_ports()?;
+    for port in ports {
+        if let serialport::SerialPortType::UsbPort(info) = &port.port_type {
+            if let Some(p) = &info.product {
+                if p.contains("STM32") || p.contains("ST-LINK") {
+                    return Ok(Some(port));
+                }
+            }
+        }
+    }
+    Ok(None)
+}
+
+/// Configuration for a serial connection.
+#[derive(Debug, Clone)]
+pub struct SerialConfiguration {
+    pub port_name: String,
+    pub baud_rate: u32,
+}
+
+impl Connectable for SerialConfiguration {
+    type Connected = SerialTransceiver;
+
+    /// Connects using the serial port configuration.
+    #[profiling::function]
+    fn connect(&self) -> Result<Self::Connected, ConnectionError> {
+        let serial_edpoint = format!("serial:{}:{}", self.port_name, self.baud_rate);
+        let mut mav_connection: BoxedConnection = mavlink::connect(&serial_edpoint)?;
+        mav_connection.set_protocol_version(MavlinkVersion::V1);
+        debug!(
+            "Connected to serial port {} with baud rate {}",
+            self.port_name, self.baud_rate
+        );
+        Ok(SerialTransceiver { mav_connection })
+    }
+}
+
+/// Manages a connection to a serial port.
+pub struct SerialTransceiver {
+    mav_connection: BoxedConnection,
+}
+
+impl MessageTransceiver for SerialTransceiver {
+    /// Blocks until a valid message is received from the serial port.
+    #[profiling::function]
+    fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError> {
+        let (_, msg) = self.mav_connection.recv()?;
+        debug!("Received message: {:?}", &msg);
+        Ok(TimedMessage::just_received(msg))
+    }
+
+    /// Transmits a message via the serial connection.
+    #[profiling::function]
+    fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError> {
+        let written = self.mav_connection.send_frame(&msg)?;
+        debug!("Sent message: {:?}", msg);
+        trace!("Sent {} bytes via serial", written);
+        Ok(written)
+    }
+}
diff --git a/src/main.rs b/src/main.rs
index 218319d..9093f87 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,10 +2,10 @@
 #![warn(clippy::unwrap_used)]
 #![warn(clippy::panic)]
 
+mod communication;
 mod error;
 mod mavlink;
 mod message_broker;
-mod serial;
 mod ui;
 mod utils;
 
@@ -26,8 +26,15 @@ static APP_NAME: &str = "segs";
 fn main() -> Result<(), eframe::Error> {
     // Set up logging (USE RUST_LOG=debug to see logs)
     let env_filter = EnvFilter::builder().from_env_lossy();
+    let file_appender = tracing_appender::rolling::daily("logs/", "segs.log");
+    let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
     tracing_subscriber::registry()
         .with(tracing_subscriber::fmt::layer().with_filter(env_filter))
+        .with(
+            tracing_subscriber::fmt::layer()
+                .json()
+                .with_writer(non_blocking),
+        )
         .with(tracing_tracy::TracyLayer::default())
         .init();
 
diff --git a/src/mavlink/base.rs b/src/mavlink/base.rs
index c10d9fe..fb64c4c 100644
--- a/src/mavlink/base.rs
+++ b/src/mavlink/base.rs
@@ -6,8 +6,6 @@
 
 use std::time::Instant;
 
-use skyward_mavlink::mavlink::peek_reader::PeekReader;
-
 // Re-export from the mavlink crate
 pub use skyward_mavlink::{
     mavlink::*, orion::*,
@@ -60,11 +58,3 @@ where
         })
         .collect())
 }
-
-/// Read a stream of bytes and return an iterator of MavLink messages
-pub fn byte_parser<'a>(
-    reader: impl std::io::Read + 'a,
-) -> impl Iterator<Item = (MavHeader, MavMessage)> + 'a {
-    let mut reader = PeekReader::new(reader);
-    std::iter::from_fn(move || read_v1_msg(&mut reader).ok())
-}
diff --git a/src/message_broker.rs b/src/message_broker.rs
index 77932cf..7cf10e7 100644
--- a/src/message_broker.rs
+++ b/src/message_broker.rs
@@ -10,176 +10,82 @@ mod reception_queue;
 pub use message_bundle::MessageBundle;
 use reception_queue::ReceptionQueue;
 
-use crate::{
-    error::ErrInstrument,
-    mavlink::{Message, TimedMessage, byte_parser},
-    utils::RingBuffer,
-};
-use anyhow::{Context, Result};
-use ring_channel::{RingReceiver, RingSender, ring_channel};
 use std::{
     collections::HashMap,
-    io::Write,
-    num::NonZeroUsize,
-    sync::{
-        Arc, Mutex,
-        atomic::{AtomicBool, Ordering},
-    },
-    time::{Duration, Instant},
+    sync::{Arc, Mutex},
+    time::Duration,
+};
+
+use tracing::error;
+
+use crate::{
+    communication::{Connection, ConnectionError, TransceiverConfigExt},
+    error::ErrInstrument,
+    mavlink::{MavFrame, MavHeader, MavMessage, MavlinkVersion, Message, TimedMessage},
 };
-use tokio::{net::UdpSocket, task::JoinHandle};
-use tracing::{debug, trace};
 
-/// Maximum size of the UDP buffer
-const UDP_BUFFER_SIZE: usize = 65527;
+const RECEPTION_QUEUE_INTERVAL: Duration = Duration::from_secs(1);
+const SEGS_SYSTEM_ID: u8 = 1;
+const SEGS_COMPONENT_ID: u8 = 1;
 
 /// The MessageBroker struct contains the state of the message broker.
 ///
 /// It is responsible for receiving messages from the Mavlink listener and
 /// dispatching them to the views that are interested in them.
-#[derive(Debug)]
 pub struct MessageBroker {
     /// A map of all messages received so far, indexed by message ID
     messages: HashMap<u32, Vec<TimedMessage>>,
     /// instant queue used for frequency calculation and reception time
     last_receptions: Arc<Mutex<ReceptionQueue>>,
-    /// Flag to stop the listener
-    running_flag: Arc<AtomicBool>,
-    /// Listener message sender
-    tx: RingSender<TimedMessage>,
-    /// Broker message receiver
-    rx: RingReceiver<TimedMessage>,
-    /// Task handle for the listener
-    task: Option<JoinHandle<Result<()>>>,
+    /// Connection to the Mavlink listener
+    connection: Option<Connection>,
     /// Egui context
     ctx: egui::Context,
 }
 
 impl MessageBroker {
     /// Creates a new `MessageBroker` with the given channel size and Egui context.
-    pub fn new(channel_size: NonZeroUsize, ctx: egui::Context) -> Self {
-        let (tx, rx) = ring_channel(channel_size);
+    pub fn new(ctx: egui::Context) -> Self {
         Self {
             messages: HashMap::new(),
             // TODO: make this configurable
-            last_receptions: Arc::new(Mutex::new(ReceptionQueue::new(Duration::from_secs(1)))),
-            tx,
-            rx,
+            last_receptions: Arc::new(Mutex::new(ReceptionQueue::new(RECEPTION_QUEUE_INTERVAL))),
+            connection: None,
             ctx,
-            running_flag: Arc::new(AtomicBool::new(false)),
-            task: None,
-        }
-    }
-
-    /// Stop the listener task from listening to incoming messages, if it is
-    /// running.
-    pub fn stop_listening(&mut self) {
-        self.running_flag.store(false, Ordering::Relaxed);
-        if let Some(t) = self.task.take() {
-            t.abort()
         }
     }
 
     /// Start a listener task that listens to incoming messages from the given
-    /// Ethernet port, and accumulates them in a ring buffer, read only when
-    /// views request a refresh.
-    pub fn listen_from_ethernet_port(&mut self, port: u16) {
-        // Stop the current listener if it exists
-        self.stop_listening();
-        self.running_flag.store(true, Ordering::Relaxed);
-        let last_receptions = Arc::clone(&self.last_receptions);
-
-        let tx = self.tx.clone();
-        let ctx = self.ctx.clone();
-
-        let bind_address = format!("0.0.0.0:{}", port);
-        let mut buf = Box::new([0; UDP_BUFFER_SIZE]);
-        let running_flag = self.running_flag.clone();
-
-        debug!("Spawning listener task at {}", bind_address);
-        let handle = tokio::spawn(async move {
-            let socket = UdpSocket::bind(bind_address)
-                .await
-                .context("Failed to bind socket")?;
-            debug!("Listening on UDP");
-
-            while running_flag.load(Ordering::Relaxed) {
-                let (len, _) = socket
-                    .recv_from(buf.as_mut_slice())
-                    .await
-                    .context("Failed to receive message")?;
-                for (_, mav_message) in byte_parser(&buf[..len]) {
-                    trace!("Received message: {:?}", mav_message);
-                    tx.send(TimedMessage::just_received(mav_message))
-                        .context("Failed to send message")?;
-                    last_receptions.lock().unwrap().push(Instant::now());
-                    ctx.request_repaint();
-                }
-            }
-
-            Ok::<(), anyhow::Error>(())
-        });
-        self.task = Some(handle);
+    /// medium (Serial or Ethernet) and stores them in a ring buffer.
+    pub fn open_connection(
+        &mut self,
+        config: impl TransceiverConfigExt,
+    ) -> Result<(), ConnectionError> {
+        self.connection = Some(config.open_connection()?);
+        Ok(())
     }
 
-    /// Start a listener task that listens to incoming messages from the given
-    /// serial port and stores them in a ring buffer.
-    pub fn listen_from_serial_port(&mut self, port: String, baud_rate: u32) {
-        // Stop the current listener if it exists
-        self.stop_listening();
-        self.running_flag.store(true, Ordering::Relaxed);
-        let last_receptions = Arc::clone(&self.last_receptions);
-
-        let tx = self.tx.clone();
-        let ctx = self.ctx.clone();
-
-        let running_flag = self.running_flag.clone();
-
-        debug!("Spawning listener task at {}", port);
-        let handle = tokio::task::spawn_blocking(move || {
-            let mut serial_port = serialport::new(port, baud_rate)
-                .timeout(std::time::Duration::from_millis(100))
-                .open()
-                .context("Failed to open serial port")?;
-            debug!("Listening on serial port");
-
-            let mut ring_buf = RingBuffer::<1024>::new();
-            let mut temp_buf = [0; 512];
-            // need to do a better error handling for this (need toast errors)
-            while running_flag.load(Ordering::Relaxed) {
-                let result = serial_port
-                    .read(&mut temp_buf)
-                    .log_expect("Failed to read from serial port");
-                debug!("Read {} bytes from serial port", result);
-                trace!("data read from serial: {:?}", &temp_buf[..result]);
-                ring_buf
-                    .write(&temp_buf[..result])
-                    .log_expect("Failed to write to ring buffer, check buffer size");
-                for (_, mav_message) in byte_parser(&mut ring_buf) {
-                    debug!("Received message: {:?}", mav_message);
-                    tx.send(TimedMessage::just_received(mav_message))
-                        .context("Failed to send message")?;
-                    last_receptions.lock().unwrap().push(Instant::now());
-                    ctx.request_repaint();
-                }
-            }
+    /// Stop the listener task from listening to incoming messages, if it is
+    /// running.
+    pub fn close_connection(&mut self) {
+        self.connection.take();
+    }
 
-            Ok::<(), anyhow::Error>(())
-        });
-        self.task = Some(handle);
+    pub fn is_connected(&self) -> bool {
+        self.connection.is_some()
     }
 
     /// Returns the time since the last message was received.
     pub fn time_since_last_reception(&self) -> Option<Duration> {
         self.last_receptions
             .lock()
-            .unwrap()
+            .log_unwrap()
             .time_since_last_reception()
     }
 
     /// Returns the frequency of messages received in the last second.
     pub fn reception_frequency(&self) -> f64 {
-        self.last_receptions.lock().unwrap().frequency()
+        self.last_receptions.lock().log_unwrap().frequency()
     }
 
     pub fn get(&self, id: u32) -> &[TimedMessage] {
@@ -188,15 +94,57 @@ impl MessageBroker {
 
     /// Processes incoming network messages. New messages are added to the
     /// given `MessageBundle`.
-    pub fn process_messages(&mut self, bundle: &mut MessageBundle) {
-        while let Ok(message) = self.rx.try_recv() {
-            bundle.insert(message.clone());
-
-            // Store the message in the broker
-            self.messages
-                .entry(message.message.message_id())
-                .or_default()
-                .push(message);
+    #[profiling::function]
+    pub fn process_incoming_messages(&mut self, bundle: &mut MessageBundle) {
+        // process messages only if the connection is open
+        if let Some(connection) = &self.connection {
+            // check for communication errors, and log them
+            match connection.retrieve_messages() {
+                Ok(messages) => {
+                    for message in messages {
+                        bundle.insert(message.clone());
+
+                        // Update the last reception time
+                        self.last_receptions.lock().log_unwrap().push(message.time);
+
+                        // Store the message in the broker
+                        self.messages
+                            .entry(message.message.message_id())
+                            .or_default()
+                            .push(message);
+                    }
+                    self.ctx.request_repaint();
+                }
+                Err(e) => {
+                    error!("Error while receiving messages: {:?}", e);
+                    // TODO: user error handling, until them silently close the connection
+                    self.close_connection();
+                }
+            }
+        }
+    }
+
+    /// Processes outgoing messages.
+    /// WARNING: This methods blocks the UI, thus a detailed profiling is needed.
+    /// FIXME
+    #[profiling::function]
+    pub fn process_outgoing_messages(&mut self, messages: Vec<MavMessage>) {
+        if let Some(connection) = &self.connection {
+            for msg in messages {
+                let header = MavHeader {
+                    system_id: SEGS_SYSTEM_ID,
+                    component_id: SEGS_COMPONENT_ID,
+                    ..Default::default()
+                };
+                let frame = MavFrame {
+                    header,
+                    msg,
+                    protocol_version: MavlinkVersion::V1,
+                };
+                if let Err(e) = connection.send_message(frame) {
+                    error!("Error while transmitting message: {:?}", e);
+                }
+            }
         }
     }
 
diff --git a/src/serial.rs b/src/serial.rs
deleted file mode 100644
index 29ffb2d..0000000
--- a/src/serial.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-//! Serial port utilities
-//!
-//! This module provides utilities for working with serial ports, such as listing all available serial ports and finding the first serial port that contains "STM32" or "ST-LINK" in its product name.
-
-use anyhow::Context;
-
-use crate::error::ErrInstrument;
-
-/// Get the first serial port that contains "STM32" or "ST-LINK" in its product name
-pub fn get_first_stm32_serial_port() -> Option<String> {
-    let ports = serialport::available_ports().log_expect("Serial ports cannot be listed!");
-    for port in ports {
-        if let serialport::SerialPortType::UsbPort(info) = port.port_type {
-            if let Some(p) = info.product {
-                if p.contains("STM32") || p.contains("ST-LINK") {
-                    return Some(port.port_name);
-                }
-            }
-        }
-    }
-    None
-}
-
-/// Get a list of all serial ports available on the system
-pub fn list_all_serial_ports() -> anyhow::Result<Vec<String>> {
-    let ports = serialport::available_ports().context("No serial ports found!")?;
-    Ok(ports.iter().map(|p| p.port_name.clone()).collect())
-}
diff --git a/src/ui.rs b/src/ui.rs
index 050d0cc..383fabd 100644
--- a/src/ui.rs
+++ b/src/ui.rs
@@ -1,9 +1,11 @@
 mod app;
+mod cache;
 mod panes;
 mod persistency;
 mod shortcuts;
 mod utils;
 mod widget_gallery;
 mod widgets;
+pub mod windows;
 
 pub use app::App;
diff --git a/src/ui/app.rs b/src/ui/app.rs
index 0570632..632d259 100644
--- a/src/ui/app.rs
+++ b/src/ui/app.rs
@@ -1,31 +1,30 @@
-use super::{
-    panes::{Pane, PaneBehavior},
-    persistency::{LayoutManager, LayoutManagerWindow},
-    shortcuts,
-    utils::maximized_pane_ui,
-    widget_gallery::WidgetGallery,
-    widgets::reception_led::ReceptionLed,
-};
-use crate::{
-    error::ErrInstrument,
-    mavlink,
-    message_broker::{MessageBroker, MessageBundle},
-    serial::{get_first_stm32_serial_port, list_all_serial_ports},
-    ui::panes::PaneKind,
-};
 use eframe::CreationContext;
-use egui::{Align2, Button, ComboBox, Key, Modifiers, Sides, Vec2};
-use egui_extras::{Size, StripBuilder};
+use egui::{Button, Key, Modifiers, Sides};
 use egui_tiles::{Behavior, Container, Linear, LinearDir, Tile, TileId, Tiles, Tree};
 use serde::{Deserialize, Serialize};
 use std::{
     fs,
-    num::NonZeroUsize,
     path::{Path, PathBuf},
     time::{Duration, Instant},
 };
 use tracing::{debug, error, trace};
 
+use crate::{
+    error::ErrInstrument,
+    mavlink::MavMessage,
+    message_broker::{MessageBroker, MessageBundle},
+};
+
+use super::{
+    panes::{Pane, PaneBehavior, PaneKind},
+    persistency::LayoutManager,
+    shortcuts,
+    utils::maximized_pane_ui,
+    widget_gallery::WidgetGallery,
+    widgets::reception_led::ReceptionLed,
+    windows::{ConnectionsWindow, LayoutManagerWindow},
+};
+
 pub struct App {
     /// Persistent state of the app
     state: AppState,
@@ -37,7 +36,7 @@ pub struct App {
     message_bundle: MessageBundle,
     // == Windows ==
     widget_gallery: WidgetGallery,
-    sources_window: SourceWindow,
+    sources_window: ConnectionsWindow,
     layout_manager_window: LayoutManagerWindow,
 }
 
@@ -45,7 +44,7 @@ pub struct App {
 impl eframe::App for App {
     // The update function is called each time the UI needs repainting!
     fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
-        self.process_messages();
+        self.process_incoming_messages();
 
         let panes_tree = &mut self.state.panes_tree;
 
@@ -242,6 +241,9 @@ impl eframe::App for App {
             self.behavior.action = Some(action);
         }
 
+        // Process outgoing messages
+        self.process_outgoing_messages();
+
         // Used for the profiler
         profiling::finish_frame!();
 
@@ -276,26 +278,23 @@ impl App {
         Self {
             state,
             layout_manager,
-            message_broker: MessageBroker::new(
-                NonZeroUsize::new(50).log_unwrap(),
-                ctx.egui_ctx.clone(),
-            ),
+            message_broker: MessageBroker::new(ctx.egui_ctx.clone()),
             widget_gallery: WidgetGallery::default(),
             behavior: AppBehavior::default(),
             maximized_pane: None,
             message_bundle: MessageBundle::default(),
-            sources_window: SourceWindow::default(),
+            sources_window: ConnectionsWindow::default(),
             layout_manager_window: LayoutManagerWindow::default(),
         }
     }
 
     /// Retrieves new messages from the message broker and dispatches them to the panes.
     #[profiling::function]
-    fn process_messages(&mut self) {
+    fn process_incoming_messages(&mut self) {
         let start = Instant::now();
 
         self.message_broker
-            .process_messages(&mut self.message_bundle);
+            .process_incoming_messages(&mut self.message_bundle);
 
         // Skip updating the panes if there are no messages
         let count = self.message_bundle.count();
@@ -330,6 +329,26 @@ impl App {
         );
         self.message_bundle.reset();
     }
+
+    /// Sends outgoing messages from the panes to the message broker.
+    #[profiling::function]
+    fn process_outgoing_messages(&mut self) {
+        let outgoing: Vec<MavMessage> = self
+            .state
+            .panes_tree
+            .tiles
+            .iter_mut()
+            .filter_map(|(_, tile)| {
+                if let Tile::Pane(pane) = tile {
+                    Some(pane.drain_outgoing_messages())
+                } else {
+                    None
+                }
+            })
+            .flatten()
+            .collect();
+        self.message_broker.process_outgoing_messages(outgoing);
+    }
 }
 
 #[derive(Serialize, Deserialize, Clone, PartialEq)]
@@ -374,179 +393,6 @@ impl AppState {
     }
 }
 
-#[derive(Debug, PartialEq, Eq, Default)]
-enum ConnectionKind {
-    #[default]
-    Ethernet,
-    Serial,
-}
-
-#[derive(Debug)]
-enum ConnectionDetails {
-    Ethernet { port: u16 },
-    Serial { port: String, baud_rate: u32 },
-}
-
-impl Default for ConnectionDetails {
-    fn default() -> Self {
-        ConnectionDetails::Ethernet {
-            port: mavlink::DEFAULT_ETHERNET_PORT,
-        }
-    }
-}
-
-#[derive(Debug, Default)]
-struct SourceWindow {
-    visible: bool,
-    connected: bool,
-    connection_kind: ConnectionKind,
-    connection_details: ConnectionDetails,
-}
-
-impl SourceWindow {
-    #[profiling::function]
-    fn show_window(&mut self, ui: &mut egui::Ui, message_broker: &mut MessageBroker) {
-        let mut window_is_open = self.visible;
-        let mut can_be_closed = false;
-        egui::Window::new("Sources")
-            .id(ui.id())
-            .anchor(Align2::CENTER_CENTER, [0.0, 0.0])
-            .max_width(200.0)
-            .collapsible(false)
-            .resizable(false)
-            .open(&mut window_is_open)
-            .show(ui.ctx(), |ui| {
-                self.ui(ui, &mut can_be_closed, message_broker);
-            });
-        self.visible = window_is_open && !can_be_closed;
-    }
-
-    fn ui(
-        &mut self,
-        ui: &mut egui::Ui,
-        can_be_closed: &mut bool,
-        message_broker: &mut MessageBroker,
-    ) {
-        let SourceWindow {
-            connected,
-            connection_kind,
-            connection_details,
-            ..
-        } = self;
-        ui.label("Select Source:");
-        ui.horizontal_top(|ui| {
-            ui.radio_value(connection_kind, ConnectionKind::Ethernet, "Ethernet");
-            ui.radio_value(connection_kind, ConnectionKind::Serial, "Serial");
-        });
-
-        ui.separator();
-
-        match *connection_kind {
-            ConnectionKind::Ethernet => {
-                if !matches!(connection_details, ConnectionDetails::Ethernet { .. }) {
-                    *connection_details = ConnectionDetails::Ethernet {
-                        port: mavlink::DEFAULT_ETHERNET_PORT,
-                    };
-                }
-                let ConnectionDetails::Ethernet { port } = connection_details else {
-                    error!("UNREACHABLE: Connection kind is not Ethernet");
-                    unreachable!("Connection kind is not Ethernet");
-                };
-
-                egui::Grid::new("grid")
-                    .num_columns(2)
-                    .spacing([10.0, 5.0])
-                    .show(ui, |ui| {
-                        ui.label("Ethernet Port:");
-                        ui.add(egui::DragValue::new(port).range(0..=65535).speed(10));
-                        ui.end_row();
-                    });
-            }
-            ConnectionKind::Serial => {
-                if !matches!(connection_details, ConnectionDetails::Serial { .. }) {
-                    *connection_details = ConnectionDetails::Serial {
-                        // Default to the first STM32 serial port if available, otherwise
-                        // default to the first serial port available
-                        port: get_first_stm32_serial_port().unwrap_or(
-                            list_all_serial_ports()
-                                .ok()
-                                .and_then(|ports| ports.first().cloned())
-                                .unwrap_or_default(),
-                        ),
-                        baud_rate: 115200,
-                    };
-                }
-                let ConnectionDetails::Serial { port, baud_rate } = connection_details else {
-                    error!("UNREACHABLE: Connection kind is not Serial");
-                    unreachable!("Connection kind is not Serial");
-                };
-
-                egui::Grid::new("grid")
-                    .num_columns(2)
-                    .spacing([10.0, 5.0])
-                    .show(ui, |ui| {
-                        ui.label("Serial Port:");
-                        ComboBox::from_id_salt("serial_port")
-                            .selected_text(port.clone())
-                            .show_ui(ui, |ui| {
-                                for available_port in list_all_serial_ports().unwrap_or_default() {
-                                    ui.selectable_value(
-                                        port,
-                                        available_port.clone(),
-                                        available_port,
-                                    );
-                                }
-                            });
-                        ui.end_row();
-                        ui.label("Baud Rate:");
-                        ui.add(
-                            egui::DragValue::new(baud_rate)
-                                .range(110..=256000)
-                                .speed(100),
-                        );
-                        ui.end_row();
-                    });
-            }
-        };
-
-        ui.separator();
-
-        ui.allocate_ui(Vec2::new(ui.available_width(), 20.0), |ui| {
-            StripBuilder::new(ui)
-                .sizes(Size::remainder(), 2) // top cell
-                .horizontal(|mut strip| {
-                    strip.cell(|ui| {
-                        let btn1 = Button::new("Connect");
-                        ui.add_enabled_ui(!*connected, |ui| {
-                            if ui.add_sized(ui.available_size(), btn1).clicked() {
-                                match connection_details {
-                                    ConnectionDetails::Ethernet { port } => {
-                                        message_broker.listen_from_ethernet_port(*port);
-                                    }
-                                    ConnectionDetails::Serial { port, baud_rate } => {
-                                        message_broker
-                                            .listen_from_serial_port(port.clone(), *baud_rate);
-                                    }
-                                }
-                                *can_be_closed = true;
-                                *connected = true;
-                            }
-                        });
-                    });
-                    strip.cell(|ui| {
-                        let btn2 = Button::new("Disconnect");
-                        ui.add_enabled_ui(*connected, |ui| {
-                            if ui.add_sized(ui.available_size(), btn2).clicked() {
-                                message_broker.stop_listening();
-                                *connected = false;
-                            }
-                        });
-                    });
-                });
-        });
-    }
-}
-
 /// Behavior for the tree of panes in the app
 #[derive(Default)]
 pub struct AppBehavior {
diff --git a/src/ui/cache.rs b/src/ui/cache.rs
new file mode 100644
index 0000000..46b29cc
--- /dev/null
+++ b/src/ui/cache.rs
@@ -0,0 +1,92 @@
+//! Module for caching expensive UI calls using egui's temporary memory storage.
+//! It provides utilities for caching the results of functions to avoid frequent recalculations.
+
+use std::time::{Duration, Instant};
+
+use egui::Context;
+use serialport::SerialPortInfo;
+
+use crate::{communication, error::ErrInstrument};
+
+const SERIAL_PORT_REFRESH_INTERVAL: Duration = Duration::from_millis(500);
+
+/// Internal helper function that caches the result of a given function call for a specified duration.
+///
+/// # Arguments
+/// * `ctx` - The egui context used for caching.
+/// * `id` - The unique identifier for the cached item.
+/// * `fun` - The function whose return value is to be cached.
+/// * `expiration_duration` - The duration after which the cache should be refreshed.
+fn call<T, F>(ctx: &egui::Context, id: egui::Id, fun: F, expiration_duration: Duration) -> T
+where
+    F: Fn() -> T,
+    T: Clone + Send + Sync + 'static,
+{
+    ctx.memory_mut(|m| {
+        match m.data.get_temp::<(T, Instant)>(id) {
+            None => {
+                m.data.insert_temp(id, (fun(), Instant::now()));
+            }
+            Some((_, i)) if i.elapsed() >= expiration_duration => {
+                m.data.insert_temp(id, (fun(), Instant::now()));
+            }
+            _ => {}
+        }
+        m.data.get_temp::<(T, Instant)>(id).log_unwrap().0
+    })
+}
+
+/// A trait to extend egui's Context with a caching function.
+pub trait CacheCall {
+    /// Calls the provided function and caches its result.
+    ///
+    /// # Arguments
+    /// * `id` - A unique identifier for the cached value.
+    /// * `fun` - The function to be cached.
+    /// * `expiration_duration` - The cache expiration duration.
+    fn call_cached<F, T>(&self, id: egui::Id, fun: F, expiration_duration: Duration) -> T
+    where
+        F: Fn() -> T,
+        T: Clone + Send + Sync + 'static;
+}
+
+impl CacheCall for egui::Context {
+    /// Implements the caching call using the internal `call` function.
+    fn call_cached<F, T>(&self, id: egui::Id, fun: F, expiration_duration: Duration) -> T
+    where
+        F: Fn() -> T,
+        T: Clone + Send + Sync + 'static,
+    {
+        call(self, id, fun, expiration_duration)
+    }
+}
+
+/// Returns a cached list of all available USB ports.
+///
+/// # Arguments
+/// * `ctx` - The egui context used for caching.
+///
+/// # Returns
+/// * A Result containing a vector of `SerialPortInfo` or a `serialport::Error`.
+pub fn cached_list_all_usb_ports(ctx: &Context) -> Result<Vec<SerialPortInfo>, serialport::Error> {
+    ctx.call_cached(
+        egui::Id::new("list_usb_ports"),
+        communication::serial::list_all_usb_ports,
+        SERIAL_PORT_REFRESH_INTERVAL,
+    )
+}
+
+/// Returns the first cached STM32 port found, if any.
+///
+/// # Arguments
+/// * `ctx` - The egui context used for caching.
+///
+/// # Returns
+/// * A Result containing an Option of `SerialPortInfo` or a `serialport::Error`.
+pub fn cached_first_stm32_port(ctx: &Context) -> Result<Option<SerialPortInfo>, serialport::Error> {
+    ctx.call_cached(
+        egui::Id::new("list_usb_ports"),
+        communication::serial::find_first_stm32_port,
+        SERIAL_PORT_REFRESH_INTERVAL,
+    )
+}
diff --git a/src/ui/panes.rs b/src/ui/panes.rs
index 17993c8..34c5650 100644
--- a/src/ui/panes.rs
+++ b/src/ui/panes.rs
@@ -7,7 +7,7 @@ use enum_dispatch::enum_dispatch;
 use serde::{Deserialize, Serialize};
 use strum_macros::{self, EnumIter, EnumMessage};
 
-use crate::mavlink::TimedMessage;
+use crate::mavlink::{MavMessage, TimedMessage};
 
 use super::app::PaneResponse;
 
@@ -26,17 +26,28 @@ impl Pane {
 pub trait PaneBehavior {
     /// Renders the UI of the pane.
     fn ui(&mut self, ui: &mut egui::Ui, tile_id: TileId) -> PaneResponse;
+
     /// Whether the pane contains the pointer.
     fn contains_pointer(&self) -> bool;
 
     /// Updates the pane state. This method is called before `ui` to allow the
     /// pane to update its state based on the messages received.
-    fn update(&mut self, messages: &[TimedMessage]);
+    fn update(&mut self, _messages: &[TimedMessage]) {}
 
     /// Returns the ID of the messages this pane is interested in, if any.
-    fn get_message_subscription(&self) -> Option<u32>;
+    fn get_message_subscription(&self) -> Option<u32> {
+        None
+    }
+
     /// Checks whether the full message history should be sent to the pane.
-    fn should_send_message_history(&self) -> bool;
+    fn should_send_message_history(&self) -> bool {
+        false
+    }
+
+    /// Drains the outgoing messages from the pane.
+    fn drain_outgoing_messages(&mut self) -> Vec<MavMessage> {
+        Vec::new()
+    }
 }
 
 impl PaneBehavior for Pane {
@@ -59,6 +70,10 @@ impl PaneBehavior for Pane {
     fn should_send_message_history(&self) -> bool {
         self.pane.should_send_message_history()
     }
+
+    fn drain_outgoing_messages(&mut self) -> Vec<MavMessage> {
+        self.pane.drain_outgoing_messages()
+    }
 }
 
 // An enum to represent the diffent kinds of widget available to the user.
diff --git a/src/ui/panes/messages_viewer.rs b/src/ui/panes/messages_viewer.rs
index 23a1f04..2c8a54f 100644
--- a/src/ui/panes/messages_viewer.rs
+++ b/src/ui/panes/messages_viewer.rs
@@ -32,14 +32,4 @@ impl PaneBehavior for MessagesViewerPane {
     fn contains_pointer(&self) -> bool {
         self.contains_pointer
     }
-
-    fn update(&mut self, _messages: &[crate::mavlink::TimedMessage]) {}
-
-    fn get_message_subscription(&self) -> Option<u32> {
-        None
-    }
-
-    fn should_send_message_history(&self) -> bool {
-        false
-    }
 }
diff --git a/src/ui/panes/plot.rs b/src/ui/panes/plot.rs
index bfd2ca8..e83bd2f 100644
--- a/src/ui/panes/plot.rs
+++ b/src/ui/panes/plot.rs
@@ -2,6 +2,7 @@ mod source_window;
 
 use super::PaneBehavior;
 use crate::{
+    error::ErrInstrument,
     mavlink::{MessageData, ROCKET_FLIGHT_TM_DATA, TimedMessage, extract_from_message},
     ui::app::PaneResponse,
 };
@@ -41,19 +42,6 @@ impl PaneBehavior for Plot2DPane {
     fn ui(&mut self, ui: &mut egui::Ui, _: TileId) -> PaneResponse {
         let mut response = PaneResponse::default();
 
-        let mut settings = SourceSettings::new(&mut self.settings, &mut self.line_settings);
-        egui::Window::new("Plot Settings")
-            .id(ui.auto_id_with("plot_settings")) // TODO: fix this issue with ids
-            .auto_sized()
-            .collapsible(true)
-            .movable(true)
-            .open(&mut self.settings_visible)
-            .show(ui.ctx(), |ui| sources_window(ui, &mut settings));
-
-        if settings.are_sources_changed() {
-            self.state_valid = false;
-        }
-
         let ctrl_pressed = ui.input(|i| i.modifiers.ctrl);
 
         egui_plot::Plot::new("plot")
@@ -80,6 +68,19 @@ impl PaneBehavior for Plot2DPane {
                     .context_menu(|ui| show_menu(ui, &mut self.settings_visible));
             });
 
+        let mut settings = SourceSettings::new(&mut self.settings, &mut self.line_settings);
+        egui::Window::new("Plot Settings")
+            .id(ui.auto_id_with("plot_settings")) // TODO: fix this issue with ids
+            .auto_sized()
+            .collapsible(true)
+            .movable(true)
+            .open(&mut self.settings_visible)
+            .show(ui.ctx(), |ui| sources_window(ui, &mut settings));
+
+        if settings.are_sources_changed() {
+            self.state_valid = false;
+        }
+
         response
     }
 
@@ -98,8 +99,8 @@ impl PaneBehavior for Plot2DPane {
         } = &self.settings;
 
         for msg in messages {
-            let x: f64 = extract_from_message(&msg.message, [x_field]).unwrap()[0];
-            let ys: Vec<f64> = extract_from_message(&msg.message, y_fields).unwrap();
+            let x: f64 = extract_from_message(&msg.message, [x_field]).log_unwrap()[0];
+            let ys: Vec<f64> = extract_from_message(&msg.message, y_fields).log_unwrap();
 
             if self.line_data.len() < ys.len() {
                 self.line_data.resize(ys.len(), Vec::new());
diff --git a/src/ui/persistency.rs b/src/ui/persistency.rs
index 7998384..44bfc12 100644
--- a/src/ui/persistency.rs
+++ b/src/ui/persistency.rs
@@ -1,5 +1,3 @@
 mod layout_manager;
-mod layout_manager_window;
 
 pub use layout_manager::LayoutManager;
-pub use layout_manager_window::LayoutManagerWindow;
diff --git a/src/ui/windows.rs b/src/ui/windows.rs
new file mode 100644
index 0000000..557c348
--- /dev/null
+++ b/src/ui/windows.rs
@@ -0,0 +1,5 @@
+mod connections;
+mod layouts;
+
+pub use connections::ConnectionsWindow;
+pub use layouts::LayoutManagerWindow;
diff --git a/src/ui/windows/connections.rs b/src/ui/windows/connections.rs
new file mode 100644
index 0000000..ebc34c3
--- /dev/null
+++ b/src/ui/windows/connections.rs
@@ -0,0 +1,221 @@
+use egui::{Align2, Button, ComboBox, Context, RichText, Vec2};
+use egui_extras::{Size, StripBuilder};
+use tracing::{error, warn};
+
+use crate::{
+    communication::{
+        ConnectionError, EthernetConfiguration, SerialConfiguration, serial::DEFAULT_BAUD_RATE,
+    },
+    error::ErrInstrument,
+    mavlink::DEFAULT_ETHERNET_PORT,
+    message_broker::MessageBroker,
+    ui::cache::{cached_first_stm32_port, cached_list_all_usb_ports},
+};
+
+#[derive(Default)]
+pub struct ConnectionsWindow {
+    pub visible: bool,
+    connection_kind: ConnectionKind,
+    connection_config: ConnectionConfig,
+}
+
+impl ConnectionsWindow {
+    #[profiling::function]
+    pub fn show_window(&mut self, ui: &mut egui::Ui, message_broker: &mut MessageBroker) {
+        let mut window_is_open = self.visible;
+        let mut can_be_closed = false;
+        egui::Window::new("Sources")
+            .id(ui.id())
+            .anchor(Align2::CENTER_CENTER, [0.0, 0.0])
+            .max_width(200.0)
+            .collapsible(false)
+            .resizable(false)
+            .open(&mut window_is_open)
+            .show(ui.ctx(), |ui| {
+                self.ui(ui, &mut can_be_closed, message_broker);
+            });
+        self.visible = window_is_open && !can_be_closed;
+    }
+
+    fn ui(
+        &mut self,
+        ui: &mut egui::Ui,
+        can_be_closed: &mut bool,
+        message_broker: &mut MessageBroker,
+    ) {
+        let ConnectionsWindow {
+            connection_kind,
+            connection_config,
+            ..
+        } = self;
+        ui.label("Select Source:");
+        ui.horizontal_top(|ui| {
+            ui.radio_value(connection_kind, ConnectionKind::Ethernet, "Ethernet");
+            ui.radio_value(connection_kind, ConnectionKind::Serial, "Serial");
+        });
+
+        ui.separator();
+
+        match (connection_kind, &connection_config) {
+            (ConnectionKind::Ethernet, ConnectionConfig::Ethernet(_)) => {}
+            (ConnectionKind::Serial, ConnectionConfig::Serial(_)) => {}
+            (ConnectionKind::Ethernet, _) => {
+                *connection_config = ConnectionConfig::Ethernet(default_ethernet());
+            }
+            (ConnectionKind::Serial, _) => {
+                *connection_config = ConnectionConfig::Serial(
+                    default_serial(ui.ctx()).log_expect("USER ERROR: issues with serail ports"),
+                );
+            }
+        }
+
+        match connection_config {
+            ConnectionConfig::Ethernet(EthernetConfiguration { port: recv_port }) => {
+                egui::Grid::new("grid")
+                    .num_columns(2)
+                    .spacing([10.0, 5.0])
+                    .show(ui, |ui| {
+                        ui.label("Ethernet Port:");
+                        ui.add(egui::DragValue::new(recv_port).range(0..=65535).speed(10));
+                        ui.end_row();
+                    });
+            }
+            ConnectionConfig::Serial(opt) => {
+                egui::Grid::new("grid")
+                    .num_columns(2)
+                    .spacing([10.0, 5.0])
+                    .show(ui, |ui| {
+                        ui.label("Serial Port:");
+                        match opt.as_mut() {
+                            Some(SerialConfiguration {
+                                port_name,
+                                baud_rate,
+                            }) => {
+                                ComboBox::from_id_salt("serial_port")
+                                    .selected_text(port_name.as_str())
+                                    .show_ui(ui, |ui| {
+                                        for available_port in
+                                            cached_list_all_usb_ports(ui.ctx()).log_unwrap()
+                                        {
+                                            ui.selectable_value(
+                                                port_name,
+                                                available_port.port_name.clone(),
+                                                available_port.port_name,
+                                            );
+                                        }
+                                    });
+
+                                ui.label("Baud Rate:");
+                                ui.add(
+                                    egui::DragValue::new(baud_rate)
+                                        .range(110..=256000)
+                                        .speed(100),
+                                );
+                                ui.end_row();
+                            }
+                            None => {
+                                // in case of a serial connection missing
+                                warn!("USER ERROR: No serial port found");
+                                ui.label(RichText::new("No port found").underline().strong());
+                                *opt = default_serial(ui.ctx())
+                                    .log_expect("USER ERROR: issues with serial ports");
+                            }
+                        }
+
+                        ui.end_row();
+                    });
+            }
+        };
+
+        ui.separator();
+
+        ui.allocate_ui(Vec2::new(ui.available_width(), 20.0), |ui| {
+            StripBuilder::new(ui)
+                .sizes(Size::remainder(), 2) // top cell
+                .horizontal(|mut strip| {
+                    strip.cell(|ui| {
+                        let btn1 = Button::new("Connect");
+                        ui.add_enabled_ui(
+                            !message_broker.is_connected() & connection_config.is_valid(),
+                            |ui| {
+                                if ui.add_sized(ui.available_size(), btn1).clicked() {
+                                    if let Err(e) =
+                                        connection_config.open_connection(message_broker)
+                                    {
+                                        error!("Failed to open connection: {:?}", e); // TODO: handle user erros
+                                    }
+                                    *can_be_closed = true;
+                                }
+                            },
+                        );
+                    });
+                    strip.cell(|ui| {
+                        let btn2 = Button::new("Disconnect");
+                        ui.add_enabled_ui(message_broker.is_connected(), |ui| {
+                            if ui.add_sized(ui.available_size(), btn2).clicked() {
+                                message_broker.close_connection();
+                            }
+                        });
+                    });
+                });
+        });
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+enum ConnectionKind {
+    #[default]
+    Ethernet,
+    Serial,
+}
+
+#[derive(Debug, Clone)]
+pub enum ConnectionConfig {
+    Ethernet(EthernetConfiguration),
+    Serial(Option<SerialConfiguration>),
+}
+
+fn default_ethernet() -> EthernetConfiguration {
+    EthernetConfiguration {
+        port: DEFAULT_ETHERNET_PORT,
+    }
+}
+
+fn default_serial(ctx: &Context) -> Result<Option<SerialConfiguration>, serialport::Error> {
+    let port_name =
+        cached_first_stm32_port(ctx)?
+            .map(|port| port.port_name)
+            .or(cached_list_all_usb_ports(ctx)
+                .ok()
+                .and_then(|ports| ports.first().map(|port| port.port_name.clone())));
+    Ok(port_name.map(|port_name| SerialConfiguration {
+        port_name,
+        baud_rate: DEFAULT_BAUD_RATE,
+    }))
+}
+
+impl ConnectionConfig {
+    fn is_valid(&self) -> bool {
+        match self {
+            ConnectionConfig::Ethernet(_) => true,
+            ConnectionConfig::Serial(Some(_)) => true,
+            ConnectionConfig::Serial(None) => false,
+        }
+    }
+
+    fn open_connection(&self, msg_broker: &mut MessageBroker) -> Result<(), ConnectionError> {
+        match self {
+            Self::Ethernet(config) => msg_broker.open_connection(config.clone()),
+            Self::Serial(Some(config)) => msg_broker.open_connection(config.clone()),
+            Self::Serial(None) => Err(ConnectionError::WrongConfiguration(
+                "No serial port found".to_string(),
+            )),
+        }
+    }
+}
+
+impl Default for ConnectionConfig {
+    fn default() -> Self {
+        ConnectionConfig::Ethernet(default_ethernet())
+    }
+}
diff --git a/src/ui/persistency/layout_manager_window.rs b/src/ui/windows/layouts.rs
similarity index 99%
rename from src/ui/persistency/layout_manager_window.rs
rename to src/ui/windows/layouts.rs
index d4be72e..0407cef 100644
--- a/src/ui/persistency/layout_manager_window.rs
+++ b/src/ui/windows/layouts.rs
@@ -8,9 +8,10 @@ use egui_extras::{Column, Size, StripBuilder, TableBuilder};
 use egui_file::FileDialog;
 use tracing::{debug, error};
 
-use crate::{error::ErrInstrument, ui::app::AppState};
-
-use super::LayoutManager;
+use crate::{
+    error::ErrInstrument,
+    ui::{app::AppState, persistency::LayoutManager},
+};
 
 #[derive(Default)]
 pub struct LayoutManagerWindow {
diff --git a/src/utils.rs b/src/utils.rs
index d3699bb..7e22ff5 100644
--- a/src/utils.rs
+++ b/src/utils.rs
@@ -1,58 +1,2 @@
-#[derive(Debug)]
-pub struct RingBuffer<const G: usize> {
-    buffer: Box<[u8; G]>,
-    write_cursor: usize,
-    read_cursor: usize,
-}
+mod ring_buffer;
 
-impl<const G: usize> std::io::Write for RingBuffer<G> {
-    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        let mut written = 0;
-        for byte in buf {
-            if self.is_full() {
-                Err(std::io::Error::new(
-                    std::io::ErrorKind::WriteZero,
-                    "Buffer full",
-                ))?;
-            }
-            self.buffer[self.write_cursor] = *byte;
-            self.write_cursor = (self.write_cursor + 1) % G;
-            written += 1;
-        }
-        Ok(written)
-    }
-
-    fn flush(&mut self) -> std::io::Result<()> {
-        Ok(())
-    }
-}
-
-impl<const G: usize> std::io::Read for RingBuffer<G> {
-    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
-        let mut read = 0;
-        while read < buf.len() {
-            if self.read_cursor == self.write_cursor {
-                break;
-            }
-            buf[read] = self.buffer[self.read_cursor];
-            self.read_cursor = (self.read_cursor + 1) % G;
-            read += 1;
-        }
-        Ok(read)
-    }
-}
-
-impl<const G: usize> RingBuffer<G> {
-    pub fn new() -> Self {
-        Self {
-            buffer: Box::new([0; G]),
-            write_cursor: 0,
-            read_cursor: 0,
-        }
-    }
-
-    #[inline]
-    pub fn is_full(&self) -> bool {
-        (self.write_cursor + 1) % G == self.read_cursor
-    }
-}
diff --git a/src/utils/ring_buffer.rs b/src/utils/ring_buffer.rs
new file mode 100644
index 0000000..c25df11
--- /dev/null
+++ b/src/utils/ring_buffer.rs
@@ -0,0 +1,256 @@
+
+
+#[derive(Debug)]
+pub struct RingBuffer<const G: usize> {
+    buffer: Box<[u8; G]>,
+    write_cursor: usize,
+    read_cursor: usize,
+}
+
+impl<const G: usize> RingBuffer<G> {
+    pub fn new() -> Self {
+        Self {
+            buffer: Box::new([0; G]),
+            write_cursor: 0,
+            read_cursor: 0,
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        if self.write_cursor >= self.read_cursor {
+            self.write_cursor - self.read_cursor
+        } else {
+            G - self.read_cursor + self.write_cursor
+        }
+    }
+
+    #[inline]
+    pub fn remaining_capacity(&self) -> usize {
+        G - self.len() - 1
+    }
+
+    #[inline]
+    pub fn is_full(&self) -> bool {
+        (self.write_cursor + 1) % G == self.read_cursor
+    }
+}
+
+impl<const G: usize> std::io::Write for RingBuffer<G> {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        if self.remaining_capacity() < buf.len() {
+            Err(std::io::Error::new(
+                std::io::ErrorKind::WriteZero,
+                "Buffer full",
+            ))?;
+        }
+        let mut written = 0;
+        for byte in buf {
+            self.buffer[self.write_cursor] = *byte;
+            self.write_cursor = (self.write_cursor + 1) % G;
+            written += 1;
+        }
+        Ok(written)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        Ok(())
+    }
+}
+
+impl<const G: usize> std::io::Read for RingBuffer<G> {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        let mut read = 0;
+        while read < buf.len() {
+            if self.read_cursor == self.write_cursor {
+                break;
+            }
+            buf[read] = self.buffer[self.read_cursor];
+            self.read_cursor = (self.read_cursor + 1) % G;
+            read += 1;
+        }
+        Ok(read)
+    }
+}
+
+pub struct OverwritingRingBuffer<const G: usize> {
+    buffer: Box<[u8; G]>,
+    write_cursor: usize,
+    read_cursor: usize,
+    full: bool, // new flag to indicate if buffer is full
+}
+
+impl<const G: usize> OverwritingRingBuffer<G> {
+    pub fn new() -> Self {
+        Self {
+            buffer: Box::new([0; G]),
+            write_cursor: 0,
+            read_cursor: 0,
+            full: false, // initialize full flag to false
+        }
+    }
+
+    // Updated len() using full flag
+    pub fn len(&self) -> usize {
+        if self.full {
+            G
+        } else if self.write_cursor >= self.read_cursor {
+            self.write_cursor - self.read_cursor
+        } else {
+            G - self.read_cursor + self.write_cursor
+        }
+    }
+}
+
+impl<const G: usize> std::io::Write for OverwritingRingBuffer<G> {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        let mut written = 0;
+        for &byte in buf {
+            self.buffer[self.write_cursor] = byte;
+            self.write_cursor = (self.write_cursor + 1) % G;
+            if self.full {
+                // Buffer full; advance read_cursor to overwrite oldest
+                self.read_cursor = (self.read_cursor + 1) % G;
+            }
+            // Set full flag if write_cursor catches up to read_cursor
+            self.full = self.write_cursor == self.read_cursor;
+            written += 1;
+        }
+        Ok(written)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        Ok(())
+    }
+}
+
+impl<const G: usize> std::io::Read for OverwritingRingBuffer<G> {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        let mut read = 0;
+        // If full, then buffer is not empty and we will clear full once reading starts.
+        while read < buf.len() {
+            // Buffer empty condition: not full and cursors equal
+            if !self.full && (self.read_cursor == self.write_cursor) {
+                break;
+            }
+            buf[read] = self.buffer[self.read_cursor];
+            self.read_cursor = (self.read_cursor + 1) % G;
+            self.full = false; // once reading, clear full flag
+            read += 1;
+        }
+        Ok(read)
+    }
+}
+
+#[allow(clippy::unwrap_used)]
+#[cfg(test)]
+mod tests {
+    use std::io::{Read, Write};
+
+    // Tests for RingBuffer
+    #[test]
+    fn test_ring_buffer_empty_read() {
+        let mut rb = super::RingBuffer::<8>::new();
+        let mut buf = [0; 4];
+        let n = rb.read(&mut buf).unwrap();
+        assert_eq!(n, 0);
+    }
+
+    #[test]
+    fn test_ring_buffer_write_then_read() {
+        let mut rb = super::RingBuffer::<8>::new();
+        let data = [10, 20, 30, 40];
+        let n = rb.write(&data).unwrap();
+        assert_eq!(n, data.len());
+        let mut buf = [0u8; 4];
+        let n = rb.read(&mut buf).unwrap();
+        assert_eq!(n, data.len());
+        assert_eq!(buf, data);
+    }
+
+    #[test]
+    fn test_ring_buffer_wraparound() {
+        let mut rb = super::RingBuffer::<8>::new();
+        // Write initial data.
+        let data1 = [1, 2, 3, 4, 5, 6];
+        let n = rb.write(&data1).unwrap();
+        assert_eq!(n, data1.len());
+        // Read a few bytes to free space.
+        let mut buf1 = [0u8; 3];
+        let n = rb.read(&mut buf1).unwrap();
+        assert_eq!(n, 3);
+        assert_eq!(buf1, [1, 2, 3]);
+        // Write additional data to wrap-around.
+        let data2 = [7, 8, 9];
+        let n = rb.write(&data2).unwrap();
+        assert_eq!(n, data2.len());
+        // Read remaining data.
+        let mut buf2 = [0u8; 6];
+        let n = rb.read(&mut buf2).unwrap();
+        // Expected order: remaining from data1 ([4,5,6]) then data2 ([7,8,9])
+        assert_eq!(n, 6);
+        assert_eq!(buf2, [4, 5, 6, 7, 8, 9]);
+    }
+
+    #[test]
+    fn test_ring_buffer_full_error() {
+        let mut rb = super::RingBuffer::<4>::new();
+        // Usable capacity is G - 1: 3 bytes.
+        let _ = rb.write(&[10, 20, 30]).unwrap();
+        let res = rb.write(&[40]);
+        assert!(res.is_err());
+    }
+
+    #[test]
+    fn test_ring_buffer_partial_write_when_full() {
+        let mut rb = super::RingBuffer::<6>::new();
+        // Usable capacity: 5 bytes.
+        let n = rb.write(&[1, 2, 3, 4]).unwrap();
+        assert_eq!(n, 4);
+        // Only one byte can be written before buffer is full.
+        // The write should error on the second byte.
+        let res = rb.write(&[5, 6]);
+        assert!(res.is_err());
+
+        // Read the first 5 bytes.
+        let mut buf = [0u8; 5];
+        let n = rb.read(&mut buf).unwrap();
+        assert_eq!(n, 4);
+        assert_eq!(buf, [1, 2, 3, 4, 0]);
+    }
+
+    // Tests for OverwritingRingBuffer
+    #[test]
+    fn test_overwriting_ring_buffer_empty_read() {
+        let mut orb = super::OverwritingRingBuffer::<8>::new();
+        let mut buf = [0; 4];
+        let n = orb.read(&mut buf).unwrap();
+        assert_eq!(n, 0);
+    }
+
+    #[test]
+    fn test_overwriting_ring_buffer_write_then_read() {
+        let mut orb = super::OverwritingRingBuffer::<8>::new();
+        let data = [10, 20, 30, 40];
+        let n = orb.write(&data).unwrap();
+        assert_eq!(n, data.len());
+        let mut buf = [0u8; 4];
+        let n = orb.read(&mut buf).unwrap();
+        assert_eq!(n, data.len());
+        assert_eq!(buf, data);
+    }
+
+    #[test]
+    fn test_overwriting_ring_buffer_overwrite() {
+        let mut orb = super::OverwritingRingBuffer::<6>::new();
+        // Write initial 5 elements (usable capacity = 5).
+        let _ = orb.write(&[1, 2, 3, 4, 5]).unwrap();
+        // Write two more elements to force overwrite.
+        let _ = orb.write(&[6, 7]).unwrap();
+        // After overwrite, expected order starts from index advanced by one overwrite
+        // Expected stored order: [2,3,4,5,6,7]
+        let mut buf = [0u8; 6];
+        let n = orb.read(&mut buf).unwrap();
+        assert_eq!(n, 6);
+        assert_eq!(buf, [2, 3, 4, 5, 6, 7]);
+    }
+}
-- 
GitLab