diff --git a/src/communication.rs b/src/communication.rs index f4d74abdede2adb6539e869cd47dff96d051752d..cf0bbcc8dc8788030308cea887fa7be37ced01ab 100644 --- a/src/communication.rs +++ b/src/communication.rs @@ -15,9 +15,8 @@ use std::sync::{ use ring_channel::{RingReceiver, TryRecvError}; use sealed::MessageTransceiver; -use skyward_mavlink::mavlink::MavFrame; -use crate::mavlink::{MavMessage, TimedMessage}; +use crate::mavlink::{MavConnection, MavFrame, MavMessage, TimedMessage}; // Re-exports pub use error::{CommunicationError, ConnectionError}; @@ -26,6 +25,8 @@ pub use serial::SerialConfiguration; const MAX_STORED_MSGS: usize = 1000; // e.g., 192 bytes each = 192 KB +pub(super) type BoxedConnection = Box<dyn MavConnection<MavMessage> + Send + Sync>; + mod sealed { use std::{ num::NonZeroUsize, diff --git a/src/communication/ethernet.rs b/src/communication/ethernet.rs index 2506fa956300839834ec0eb68dd0743e24e61942..629887cd034a5f8a5553d70267de702861f23bd2 100644 --- a/src/communication/ethernet.rs +++ b/src/communication/ethernet.rs @@ -3,19 +3,16 @@ //! Provides functionality to connect via Ethernet using UDP, allowing message //! transmission and reception over a network. -use std::net::UdpSocket; - use skyward_mavlink::mavlink::{ - MavFrame, + self, error::{MessageReadError, MessageWriteError}, - read_v1_msg, write_v1_msg, }; use tracing::{debug, trace}; -use crate::mavlink::{MAX_MSG_SIZE, MavMessage, TimedMessage, peek_reader::PeekReader}; +use crate::mavlink::{MavFrame, MavMessage, MavlinkVersion, TimedMessage}; use super::{ - ConnectionError, + BoxedConnection, ConnectionError, sealed::{Connectable, MessageTransceiver}, }; @@ -31,48 +28,39 @@ impl Connectable for EthernetConfiguration { /// Binds to the specified UDP port to create a network connection. #[profiling::function] fn connect(&self) -> Result<Self::Connected, ConnectionError> { - let recv_addr = format!("0.0.0.0:{}", self.port); - let server_socket = UdpSocket::bind(recv_addr)?; - debug!("Bound to Ethernet port on port {}", self.port); - let send_addr = "0.0.0.0:0"; - let cast_addr = format!("255.255.255.255:{}", self.port); - let client_socket = UdpSocket::bind(send_addr)?; - client_socket.set_broadcast(true)?; - client_socket.connect(&cast_addr)?; - debug!("Created Ethernet connection to {}", cast_addr); + let incoming_addr = format!("udpin:0.0.0.0:{}", self.port); + let outgoing_addr = format!("udpbcast:255.255.255.255:{}", self.port); + let mut incoming_conn: BoxedConnection = mavlink::connect(&incoming_addr)?; + let mut outgoing_conn: BoxedConnection = mavlink::connect(&outgoing_addr)?; + incoming_conn.set_protocol_version(MavlinkVersion::V1); + outgoing_conn.set_protocol_version(MavlinkVersion::V1); + debug!("Ethernet connections set up on port {}", self.port); Ok(EthernetTransceiver { - server_socket, - client_socket, + incoming_conn, + outgoing_conn, }) } } /// Manages a connection over Ethernet. pub struct EthernetTransceiver { - server_socket: UdpSocket, - client_socket: UdpSocket, + incoming_conn: BoxedConnection, + outgoing_conn: BoxedConnection, } impl MessageTransceiver for EthernetTransceiver { /// Waits for a message over Ethernet, blocking until a valid message arrives. #[profiling::function] fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError> { - let mut buf = [0; MAX_MSG_SIZE]; - let read = self.server_socket.recv(&mut buf)?; - trace!("Received {} bytes", read); - let mut reader = PeekReader::new(&buf[..read]); - let (_, res) = read_v1_msg(&mut reader)?; - debug!("Received message: {:?}", res); - Ok(TimedMessage::just_received(res)) + let (_, msg) = self.incoming_conn.recv()?; + debug!("Received message: {:?}", &msg); + Ok(TimedMessage::just_received(msg)) } /// Transmits a message using the UDP socket. #[profiling::function] fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError> { - let MavFrame { header, msg, .. } = msg; - let mut write_buf = Vec::new(); - write_v1_msg(&mut write_buf, header, &msg)?; - let written = self.client_socket.send(&write_buf)?; + let written = self.outgoing_conn.send_frame(&msg)?; debug!("Sent message: {:?}", msg); trace!("Sent {} bytes via Ethernet", written); Ok(written) diff --git a/src/communication/serial.rs b/src/communication/serial.rs index 3a9d798ddcbc8038cf02c469f2be45ba4e67990c..8f1c6fa73b6c4c924357f0050857d371f02d4762 100644 --- a/src/communication/serial.rs +++ b/src/communication/serial.rs @@ -3,27 +3,20 @@ //! Provides functions for listing USB serial ports, finding a STM32 port, //! and handling serial connections including message transmission and reception. -use std::sync::Mutex; - -use serialport::{SerialPort, SerialPortInfo, SerialPortType}; +use serialport::{SerialPortInfo, SerialPortType}; use skyward_mavlink::mavlink::{ - MavFrame, + self, error::{MessageReadError, MessageWriteError}, - read_v1_msg, write_v1_msg, }; use tracing::{debug, trace}; -use crate::{ - error::ErrInstrument, - mavlink::{MavMessage, TimedMessage, peek_reader::PeekReader}, -}; +use crate::mavlink::{MavFrame, MavMessage, MavlinkVersion, TimedMessage}; use super::{ - ConnectionError, + BoxedConnection, ConnectionError, sealed::{Connectable, MessageTransceiver}, }; -const SERIAL_PORT_TIMEOUT_MS: u64 = 100; pub const DEFAULT_BAUD_RATE: u32 = 115200; /// Returns a list of all USB serial ports available on the system. @@ -71,150 +64,37 @@ impl Connectable for SerialConfiguration { /// Connects using the serial port configuration. #[profiling::function] fn connect(&self) -> Result<Self::Connected, ConnectionError> { - let port = serialport::new(&self.port_name, self.baud_rate) - .timeout(std::time::Duration::from_millis(SERIAL_PORT_TIMEOUT_MS)) - .open()?; + let serial_edpoint = format!("serial:{}:{}", self.port_name, self.baud_rate); + let mut mav_connection: BoxedConnection = mavlink::connect(&serial_edpoint)?; + mav_connection.set_protocol_version(MavlinkVersion::V1); debug!( "Connected to serial port {} with baud rate {}", self.port_name, self.baud_rate ); - Ok(SerialTransceiver { - serial_reader: Mutex::new(Box::new(PeekReader::new(port.try_clone()?))), - serial_writer: Mutex::new(port), - }) - } -} - -impl From<serialport::Error> for ConnectionError { - fn from(e: serialport::Error) -> Self { - let serialport::Error { kind, description } = e.clone(); - match kind { - serialport::ErrorKind::NoDevice => ConnectionError::WrongConfiguration(description), - serialport::ErrorKind::InvalidInput => ConnectionError::WrongConfiguration(description), - serialport::ErrorKind::Unknown => ConnectionError::Unknown(description), - serialport::ErrorKind::Io(e) => ConnectionError::Io(e.into()), - } + Ok(SerialTransceiver { mav_connection }) } } /// Manages a connection to a serial port. pub struct SerialTransceiver { - serial_reader: Mutex<Box<PeekReader<Box<dyn SerialPort>>>>, - #[allow(dead_code)] - serial_writer: Mutex<Box<dyn SerialPort>>, + mav_connection: BoxedConnection, } impl MessageTransceiver for SerialTransceiver { /// Blocks until a valid message is received from the serial port. #[profiling::function] fn wait_for_message(&self) -> Result<TimedMessage, MessageReadError> { - loop { - let res: Result<(_, MavMessage), MessageReadError> = - read_v1_msg(&mut self.serial_reader.lock().log_unwrap()); - match res { - Ok((_, msg)) => { - return Ok(TimedMessage::just_received(msg)); - } - Err(MessageReadError::Io(e)) if e.kind() == std::io::ErrorKind::TimedOut => { - continue; - } - Err(e) => { - return Err(e); - } - } - } + let (_, msg) = self.mav_connection.recv()?; + debug!("Received message: {:?}", &msg); + Ok(TimedMessage::just_received(msg)) } /// Transmits a message via the serial connection. #[profiling::function] fn transmit_message(&self, msg: MavFrame<MavMessage>) -> Result<usize, MessageWriteError> { - let MavFrame { header, msg, .. } = msg; - let written = write_v1_msg(&mut *self.serial_writer.lock().log_unwrap(), header, &msg)?; + let written = self.mav_connection.send_frame(&msg)?; debug!("Sent message: {:?}", msg); trace!("Sent {} bytes via serial", written); Ok(written) } } - -#[allow(clippy::unwrap_used)] -#[cfg(test)] -mod tests { - use std::{collections::VecDeque, io::Read}; - - use rand::prelude::*; - use skyward_mavlink::{mavlink::*, orion::*}; - - use super::*; - - struct ChunkedMessageStreamGenerator { - rng: SmallRng, - buffer: VecDeque<u8>, - } - - impl ChunkedMessageStreamGenerator { - const KINDS: [u32; 2] = [ACK_TM_DATA::ID, NACK_TM_DATA::ID]; - - fn new() -> Self { - Self { - rng: SmallRng::seed_from_u64(42), - buffer: VecDeque::new(), - } - } - - fn msg_push(&mut self, msg: &MavMessage, header: MavHeader) -> std::io::Result<()> { - write_v1_msg(&mut self.buffer, header, msg).unwrap(); - Ok(()) - } - - fn fill_buffer(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { - while buf.len() > self.buffer.len() { - self.add_next_rand(); - } - let n = buf.len(); - buf.iter_mut() - .zip(self.buffer.drain(..n)) - .for_each(|(a, b)| *a = b); - Ok(n) - } - - fn add_next_rand(&mut self) { - let i = self.rng.random_range(0..Self::KINDS.len()); - let id = Self::KINDS[i]; - let msg = MavMessage::default_message_from_id(id).unwrap(); - let header = MavHeader { - system_id: 1, - component_id: 1, - sequence: 0, - }; - self.msg_push(&msg, header).unwrap(); - } - } - - impl Read for ChunkedMessageStreamGenerator { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { - // fill buffer with sequence of byte of random length - if buf.len() == 1 { - self.fill_buffer(&mut buf[..1]) - } else if !buf.is_empty() { - let size = self.rng.random_range(1..buf.len()); - self.fill_buffer(&mut buf[..size]) - } else { - Ok(0) - } - } - } - - #[test] - fn test_peek_reader_with_chunked_transmission() { - let mut gms = ChunkedMessageStreamGenerator::new(); - let mut reader = PeekReader::new(&mut gms); - let mut msgs = Vec::new(); - for _ in 0..100 { - let (_, msg): (MavHeader, MavMessage) = read_v1_msg(&mut reader).unwrap(); - msgs.push(msg); - } - for msg in msgs { - assert!(msg.message_id() == ACK_TM_DATA::ID || msg.message_id() == NACK_TM_DATA::ID); - } - } -} diff --git a/src/mavlink.rs b/src/mavlink.rs index 9589a2fda51e99ef7fb1debdfebc184111e333b9..91193466aa0254f7dbe1e0415be3421d18e4ab09 100644 --- a/src/mavlink.rs +++ b/src/mavlink.rs @@ -13,5 +13,3 @@ pub use reflection::ReflectionContext; /// Default port for the Ethernet connection pub const DEFAULT_ETHERNET_PORT: u16 = 42069; -/// Maximum size of a Mavlink message -pub const MAX_MSG_SIZE: usize = 280;