diff --git a/mavlink-core/src/async_connection/direct_serial.rs b/mavlink-core/src/async_connection/direct_serial.rs index d9b2fe1de248c9ff97c632bdaf271a0242cc93e9..221edb0dd3ff5b961ea20b15d064facd7f6d0915 100644 --- a/mavlink-core/src/async_connection/direct_serial.rs +++ b/mavlink-core/src/async_connection/direct_serial.rs @@ -3,10 +3,15 @@ use core::ops::DerefMut; use std::io; +use async_trait::async_trait; use tokio::sync::Mutex; use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream}; -use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use super::AsyncConnectable; +use crate::{ + async_peek_reader::AsyncPeekReader, connectable::SerialConnectable, MavHeader, MavlinkVersion, + Message, +}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg_async, write_versioned_msg_async}; @@ -17,38 +22,6 @@ use crate::{ use super::AsyncMavConnection; -pub fn open(settings: &str) -> io::Result<AsyncSerialConnection> { - let settings_toks: Vec<&str> = settings.split(':').collect(); - if settings_toks.len() < 2 { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Incomplete port settings", - )); - } - - let Ok(baud) = settings_toks[1].parse::<u32>() else { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Invalid baud rate", - )); - }; - - let port_name = settings_toks[0]; - let mut port = tokio_serial::new(port_name, baud).open_native_async()?; - port.set_data_bits(tokio_serial::DataBits::Eight)?; - port.set_parity(tokio_serial::Parity::None)?; - port.set_stop_bits(tokio_serial::StopBits::One)?; - port.set_flow_control(tokio_serial::FlowControl::None)?; - - Ok(AsyncSerialConnection { - port: Mutex::new(AsyncPeekReader::new(port)), - sequence: Mutex::new(0), - protocol_version: MavlinkVersion::V2, - #[cfg(feature = "signing")] - signing_data: None, - }) -} - pub struct AsyncSerialConnection { port: Mutex<AsyncPeekReader<SerialStream>>, sequence: Mutex<u8>, @@ -118,3 +91,26 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +#[async_trait] +impl AsyncConnectable for SerialConnectable { + async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> + where + M: Message + Sync + Send, + { + let mut port = + tokio_serial::new(&self.port_name, self.baud_rate as u32).open_native_async()?; + port.set_data_bits(tokio_serial::DataBits::Eight)?; + port.set_parity(tokio_serial::Parity::None)?; + port.set_stop_bits(tokio_serial::StopBits::One)?; + port.set_flow_control(tokio_serial::FlowControl::None)?; + + Ok(Box::new(AsyncSerialConnection { + port: Mutex::new(AsyncPeekReader::new(port)), + sequence: Mutex::new(0), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + })) + } +} diff --git a/mavlink-core/src/async_connection/file.rs b/mavlink-core/src/async_connection/file.rs index 5dae56d99e27c7d20af9ca3b7318633372d36d73..60b589d33419a97dcf0da20f8eac1d760f060b11 100644 --- a/mavlink-core/src/async_connection/file.rs +++ b/mavlink-core/src/async_connection/file.rs @@ -2,11 +2,13 @@ use core::ops::DerefMut; -use super::AsyncMavConnection; +use super::{AsyncConnectable, AsyncMavConnection}; +use crate::connectable::FileConnectable; use crate::error::{MessageReadError, MessageWriteError}; use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use async_trait::async_trait; use tokio::fs::File; use tokio::io; use tokio::sync::Mutex; @@ -81,3 +83,13 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncFileConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +#[async_trait] +impl AsyncConnectable for FileConnectable { + async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> + where + M: Message + Sync + Send, + { + Ok(Box::new(open(&self.address).await?)) + } +} diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs index b2eb5c67573d4467ea8520482ec7a42a2bd9b2b2..281e255b9abe4afae0b0c79ac2b92928355be860 100644 --- a/mavlink-core/src/async_connection/mod.rs +++ b/mavlink-core/src/async_connection/mod.rs @@ -1,6 +1,7 @@ +use async_trait::async_trait; use tokio::io; -use crate::{MavFrame, MavHeader, MavlinkVersion, Message}; +use crate::{connectable::ConnectionAddress, MavFrame, MavHeader, MavlinkVersion, Message}; #[cfg(feature = "tcp")] mod tcp; @@ -81,43 +82,9 @@ pub trait AsyncMavConnection<M: Message + Sync + Send> { pub async fn connect_async<M: Message + Sync + Send>( address: &str, ) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> { - let protocol_err = Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )); - - if cfg!(feature = "tcp") && address.starts_with("tcp") { - #[cfg(feature = "tcp")] - { - tcp::select_protocol(address).await - } - #[cfg(not(feature = "tcp"))] - { - protocol_err - } - } else if cfg!(feature = "udp") && address.starts_with("udp") { - #[cfg(feature = "udp")] - { - udp::select_protocol(address).await - } - #[cfg(not(feature = "udp"))] - { - protocol_err - } - } else if cfg!(feature = "direct-serial") && address.starts_with("serial") { - #[cfg(feature = "direct-serial")] - { - Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) - } - #[cfg(not(feature = "direct-serial"))] - { - protocol_err - } - } else if address.starts_with("file") { - Ok(Box::new(file::open(&address["file:".len()..]).await?)) - } else { - protocol_err - } + ConnectionAddress::parse_address(address)? + .connect_async::<M>() + .await } /// Returns the socket address for the given address. @@ -135,3 +102,25 @@ pub(crate) fn get_socket_addr<T: std::net::ToSocketAddrs>( }; Ok(addr) } + +#[async_trait] +pub trait AsyncConnectable { + async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> + where + M: Message + Sync + Send; +} + +#[async_trait] +impl AsyncConnectable for ConnectionAddress { + async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> + where + M: Message + Sync + Send, + { + match self { + Self::Tcp(connectable) => connectable.connect_async::<M>().await, + Self::Udp(connectable) => connectable.connect_async::<M>().await, + Self::Serial(connectable) => connectable.connect_async::<M>().await, + Self::File(connectable) => connectable.connect_async::<M>().await, + } + } +} diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs index 4cd1f9ac9a1754a7d86b392dc3cd98ced08e4ca8..721785e6e0237afe55079a23a151e1a5cbc1fd7e 100644 --- a/mavlink-core/src/async_connection/tcp.rs +++ b/mavlink-core/src/async_connection/tcp.rs @@ -1,9 +1,11 @@ //! Async TCP MAVLink connection -use super::{get_socket_addr, AsyncMavConnection}; +use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection}; use crate::async_peek_reader::AsyncPeekReader; +use crate::connectable::TcpConnectable; use crate::{MavHeader, MavlinkVersion, Message}; +use async_trait::async_trait; use core::ops::DerefMut; use tokio::io; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; @@ -17,23 +19,6 @@ use crate::{ read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData, }; -pub async fn select_protocol<M: Message + Sync + Send>( - address: &str, -) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("tcpout:") { - tcpout(address).await - } else if let Some(address) = address.strip_prefix("tcpin:") { - tcpin(address).await - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> { let addr = get_socket_addr(address)?; @@ -154,3 +139,18 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +#[async_trait] +impl AsyncConnectable for TcpConnectable { + async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> + where + M: Message + Sync + Send, + { + let conn = if self.is_out { + tcpout(&self.address).await + } else { + tcpin(&self.address).await + }; + Ok(Box::new(conn?)) + } +} diff --git a/mavlink-core/src/async_connection/udp.rs b/mavlink-core/src/async_connection/udp.rs index 3f06b83e1d58e65e6bff246adbba05b388b41c4e..fa249a1ad2a025d167aed988786911bf152c6f6d 100644 --- a/mavlink-core/src/async_connection/udp.rs +++ b/mavlink-core/src/async_connection/udp.rs @@ -3,15 +3,20 @@ use core::{ops::DerefMut, task::Poll}; use std::{collections::VecDeque, io::Read, sync::Arc}; +use async_trait::async_trait; use tokio::{ io::{self, AsyncRead, ReadBuf}, net::UdpSocket, sync::Mutex, }; -use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use crate::{ + async_peek_reader::AsyncPeekReader, + connectable::{UdpConnectable, UdpMode}, + MavHeader, MavlinkVersion, Message, +}; -use super::{get_socket_addr, AsyncMavConnection}; +use super::{get_socket_addr, AsyncConnectable, AsyncMavConnection}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg_async, write_versioned_msg_async}; @@ -20,50 +25,6 @@ use crate::{ read_versioned_msg_async_signed, write_versioned_msg_signed, SigningConfig, SigningData, }; -pub async fn select_protocol<M: Message + Sync + Send>( - address: &str, -) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("udpin:") { - udpin(address).await - } else if let Some(address) = address.strip_prefix("udpout:") { - udpout(address).await - } else if let Some(address) = address.strip_prefix("udpbcast:") { - udpbcast(address).await - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - -pub async fn udpbcast<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncUdpConnection> { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0").await?; - socket - .set_broadcast(true) - .expect("Couldn't bind to broadcast address."); - AsyncUdpConnection::new(socket, false, Some(addr)) -} - -pub async fn udpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncUdpConnection> { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0").await?; - AsyncUdpConnection::new(socket, false, Some(addr)) -} - -pub async fn udpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncUdpConnection> { - let addr = address - .to_socket_addrs() - .unwrap() - .next() - .expect("Invalid address"); - let socket = UdpSocket::bind(addr).await?; - AsyncUdpConnection::new(socket, true, None) -} - struct UdpRead { socket: Arc<UdpSocket>, buffer: VecDeque<u8>, @@ -235,6 +196,24 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncUdpConnection { } } +#[async_trait] +impl AsyncConnectable for UdpConnectable { + async fn connect_async<M>(&self) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> + where + M: Message + Sync + Send, + { + let (addr, server, dest): (&str, _, _) = match self.mode { + UdpMode::Udpin => (&self.address, true, None), + _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)), + }; + let socket = UdpSocket::bind(addr).await?; + if matches!(self.mode, UdpMode::Udpcast) { + socket.set_broadcast(true)?; + } + Ok(Box::new(AsyncUdpConnection::new(socket, server, dest)?)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/mavlink-core/src/connectable.rs b/mavlink-core/src/connectable.rs new file mode 100644 index 0000000000000000000000000000000000000000..620139b6f7fbd8c82885c20155e1e4b1f026f938 --- /dev/null +++ b/mavlink-core/src/connectable.rs @@ -0,0 +1,152 @@ +use core::fmt::Display; +use std::io; + +#[derive(Debug, Clone, Copy)] +pub enum UdpMode { + Udpin, + Udpout, + Udpcast, +} + +#[derive(Debug, Clone)] +pub struct UdpConnectable { + pub(crate) address: String, + pub(crate) mode: UdpMode, +} + +impl UdpConnectable { + pub fn new(address: String, mode: UdpMode) -> Self { + Self { address, mode } + } +} +impl Display for UdpConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mode = match self.mode { + UdpMode::Udpin => "udpin", + UdpMode::Udpout => "udpout", + UdpMode::Udpcast => "udpcast", + }; + write!(f, "{mode}:{}", self.address) + } +} + +#[derive(Debug, Clone)] +pub struct SerialConnectable { + pub(crate) port_name: String, + pub(crate) baud_rate: usize, +} + +impl SerialConnectable { + pub fn new(port_name: String, baud_rate: usize) -> Self { + Self { + port_name, + baud_rate, + } + } +} +impl Display for SerialConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "serial:{}:{}", self.port_name, self.baud_rate) + } +} + +#[derive(Debug, Clone)] +pub struct TcpConnectable { + pub(crate) address: String, + pub(crate) is_out: bool, +} + +impl TcpConnectable { + pub fn new(address: String, is_out: bool) -> Self { + Self { address, is_out } + } +} +impl Display for TcpConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + if self.is_out { + write!(f, "tcpout:{}", self.address) + } else { + write!(f, "tcpin:{}", self.address) + } + } +} + +#[derive(Debug, Clone)] +pub struct FileConnectable { + pub(crate) address: String, +} + +impl FileConnectable { + pub fn new(address: String) -> Self { + Self { address } + } +} +impl Display for FileConnectable { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "file:{}", self.address) + } +} +pub enum ConnectionAddress { + Tcp(TcpConnectable), + Udp(UdpConnectable), + Serial(SerialConnectable), + File(FileConnectable), +} + +impl Display for ConnectionAddress { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Tcp(connectable) => write!(f, "{connectable}"), + Self::Udp(connectable) => write!(f, "{connectable}"), + Self::Serial(connectable) => write!(f, "{connectable}"), + Self::File(connectable) => write!(f, "{connectable}"), + } + } +} + +impl ConnectionAddress { + pub fn parse_address(address: &str) -> Result<Self, io::Error> { + let (protocol, address) = address.split_once(':').ok_or(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Protocol unsupported", + ))?; + let conn = match protocol { + #[cfg(feature = "direct-serial")] + "serial" => { + let (port_name, baud) = address.split_once(':').ok_or(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Incomplete port settings", + ))?; + Self::Serial(SerialConnectable::new( + port_name.to_string(), + baud.parse().map_err(|_| { + io::Error::new(io::ErrorKind::AddrNotAvailable, "Invalid baud rate") + })?, + )) + } + #[cfg(feature = "tcp")] + "tcpin" | "tcpout" => Self::Tcp(TcpConnectable::new( + address.to_string(), + protocol == "tcpout", + )), + #[cfg(feature = "udp")] + "udpin" | "udpout" | "udpcast" => Self::Udp(UdpConnectable::new( + address.to_string(), + match protocol { + "udpin" => UdpMode::Udpin, + "udpout" => UdpMode::Udpout, + "udpcast" => UdpMode::Udpcast, + _ => unreachable!(), + }, + )), + "file" => Self::File(FileConnectable::new(address.to_string())), + _ => { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Protocol unsupported", + )) + } + }; + Ok(conn) + } +} diff --git a/mavlink-core/src/connection/direct_serial.rs b/mavlink-core/src/connection/direct_serial.rs index 2954b9810783403d8f711bacc981d1b97305d80f..85c771b22d868602f3fb5a78f3619dec4c580a7e 100644 --- a/mavlink-core/src/connection/direct_serial.rs +++ b/mavlink-core/src/connection/direct_serial.rs @@ -1,5 +1,6 @@ //! Serial MAVLINK connection +use crate::connectable::SerialConnectable; use crate::connection::MavConnection; use crate::peek_reader::PeekReader; use crate::{MavHeader, MavlinkVersion, Message}; @@ -15,45 +16,7 @@ use crate::{read_versioned_msg, write_versioned_msg}; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData}; -pub fn open(settings: &str) -> io::Result<SerialConnection> { - let settings_toks: Vec<&str> = settings.split(':').collect(); - if settings_toks.len() < 2 { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Incomplete port settings", - )); - } - - let Ok(baud) = settings_toks[1] - .parse::<usize>() - .map(serial::core::BaudRate::from_speed) - else { - return Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Invalid baud rate", - )); - }; - - let settings = serial::core::PortSettings { - baud_rate: baud, - char_size: serial::Bits8, - parity: serial::ParityNone, - stop_bits: serial::Stop1, - flow_control: serial::FlowNone, - }; - - let port_name = settings_toks[0]; - let mut port = serial::open(port_name)?; - port.configure(&settings)?; - - Ok(SerialConnection { - port: Mutex::new(PeekReader::new(port)), - sequence: Mutex::new(0), - protocol_version: MavlinkVersion::V2, - #[cfg(feature = "signing")] - signing_data: None, - }) -} +use super::Connectable; pub struct SerialConnection { port: Mutex<PeekReader<SystemPort>>, @@ -127,3 +90,27 @@ impl<M: Message> MavConnection<M> for SerialConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +impl Connectable for SerialConnectable { + fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { + let baud_rate = serial::core::BaudRate::from_speed(self.baud_rate); + let settings = serial::core::PortSettings { + baud_rate, + char_size: serial::Bits8, + parity: serial::ParityNone, + stop_bits: serial::Stop1, + flow_control: serial::FlowNone, + }; + + let mut port = serial::open(&self.port_name)?; + port.configure(&settings)?; + + Ok(Box::new(SerialConnection { + port: Mutex::new(PeekReader::new(port)), + sequence: Mutex::new(0), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + })) + } +} diff --git a/mavlink-core/src/connection/file.rs b/mavlink-core/src/connection/file.rs index 72582593475119cd8aa11ffd390076ba64419610..176a4a8be952f39c8db999de8be4060c9812cfa4 100644 --- a/mavlink-core/src/connection/file.rs +++ b/mavlink-core/src/connection/file.rs @@ -1,5 +1,6 @@ //! File MAVLINK connection +use crate::connectable::FileConnectable; use crate::connection::MavConnection; use crate::error::{MessageReadError, MessageWriteError}; use crate::peek_reader::PeekReader; @@ -14,6 +15,8 @@ use crate::read_versioned_msg; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, SigningConfig, SigningData}; +use super::Connectable; + pub fn open(file_path: &str) -> io::Result<FileConnection> { let file = File::open(file_path)?; @@ -78,3 +81,9 @@ impl<M: Message> MavConnection<M> for FileConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +impl Connectable for FileConnectable { + fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { + Ok(Box::new(open(&self.address)?)) + } +} diff --git a/mavlink-core/src/connection/mod.rs b/mavlink-core/src/connection/mod.rs index e60386a6aa0f6e1dea0e4424adaf189190e41cf4..5fff34bc7094b7aef3b2fe0494a561c323ec0a7f 100644 --- a/mavlink-core/src/connection/mod.rs +++ b/mavlink-core/src/connection/mod.rs @@ -1,5 +1,6 @@ -use crate::{MavFrame, MavHeader, MavlinkVersion, Message}; +use crate::{connectable::ConnectionAddress, MavFrame, MavHeader, MavlinkVersion, Message}; +use core::fmt::Display; use std::io::{self}; #[cfg(feature = "tcp")] @@ -70,52 +71,36 @@ pub trait MavConnection<M: Message> { /// /// The type of the connection is determined at runtime based on the address type, so the /// connection is returned as a trait object. -pub fn connect<M: Message>(address: &str) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { - let protocol_err = Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )); - - if cfg!(feature = "tcp") && address.starts_with("tcp") { - #[cfg(feature = "tcp")] - { - tcp::select_protocol(address) - } - #[cfg(not(feature = "tcp"))] - { - protocol_err - } - } else if cfg!(feature = "udp") && address.starts_with("udp") { - #[cfg(feature = "udp")] - { - udp::select_protocol(address) - } - #[cfg(not(feature = "udp"))] - { - protocol_err - } - } else if cfg!(feature = "direct-serial") && address.starts_with("serial:") { - #[cfg(feature = "direct-serial")] - { - Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) - } - #[cfg(not(feature = "direct-serial"))] - { - protocol_err - } - } else if address.starts_with("file") { - Ok(Box::new(file::open(&address["file:".len()..])?)) - } else { - protocol_err - } +pub fn connect<M: Message + Sync + Send>( + address: &str, +) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { + ConnectionAddress::parse_address(address)?.connect::<M>() } /// Returns the socket address for the given address. pub(crate) fn get_socket_addr<T: std::net::ToSocketAddrs>( - address: T, + address: &T, ) -> Result<std::net::SocketAddr, io::Error> { address.to_socket_addrs()?.next().ok_or(io::Error::new( io::ErrorKind::Other, "Host address lookup failed", )) } + +pub trait Connectable: Display { + fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>>; +} + +impl Connectable for ConnectionAddress { + fn connect<M>(&self) -> std::io::Result<Box<dyn crate::MavConnection<M> + Sync + Send>> + where + M: Message, + { + match self { + Self::Tcp(connectable) => connectable.connect::<M>(), + Self::Udp(connectable) => connectable.connect::<M>(), + Self::Serial(connectable) => connectable.connect::<M>(), + Self::File(connectable) => connectable.connect::<M>(), + } + } +} diff --git a/mavlink-core/src/connection/tcp.rs b/mavlink-core/src/connection/tcp.rs index d362a5664a9a1f4de682c558ade575f1be08ada2..3f83c2dc628f8c6e7d1e340f73da7f0612267876 100644 --- a/mavlink-core/src/connection/tcp.rs +++ b/mavlink-core/src/connection/tcp.rs @@ -1,5 +1,6 @@ //! TCP MAVLink connection +use crate::connectable::TcpConnectable; use crate::connection::MavConnection; use crate::peek_reader::PeekReader; use crate::{MavHeader, MavlinkVersion, Message}; @@ -10,7 +11,7 @@ use std::net::{TcpListener, TcpStream}; use std::sync::Mutex; use std::time::Duration; -use super::get_socket_addr; +use super::{get_socket_addr, Connectable}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg, write_versioned_msg}; @@ -18,25 +19,8 @@ use crate::{read_versioned_msg, write_versioned_msg}; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData}; -pub fn select_protocol<M: Message>( - address: &str, -) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("tcpout:") { - tcpout(address) - } else if let Some(address) = address.strip_prefix("tcpin:") { - tcpin(address) - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> { - let addr = get_socket_addr(address)?; + let addr = get_socket_addr(&address)?; let socket = TcpStream::connect(addr)?; socket.set_read_timeout(Some(Duration::from_millis(100)))?; @@ -54,7 +38,7 @@ pub fn tcpout<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> { } pub fn tcpin<T: ToSocketAddrs>(address: T) -> io::Result<TcpConnection> { - let addr = get_socket_addr(address)?; + let addr = get_socket_addr(&address)?; let listener = TcpListener::bind(addr)?; //For now we only accept one incoming stream: this blocks until we get one @@ -147,3 +131,14 @@ impl<M: Message> MavConnection<M> for TcpConnection { self.signing_data = signing_data.map(SigningData::from_config) } } + +impl Connectable for TcpConnectable { + fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { + let conn = if self.is_out { + tcpout(&self.address) + } else { + tcpin(&self.address) + }; + Ok(Box::new(conn?)) + } +} diff --git a/mavlink-core/src/connection/udp.rs b/mavlink-core/src/connection/udp.rs index 75e5be3c48aa4966d36669102dbda5df5c88e213..25f5662277039ef0091f6712516c69000021afe9 100644 --- a/mavlink-core/src/connection/udp.rs +++ b/mavlink-core/src/connection/udp.rs @@ -2,16 +2,16 @@ use std::collections::VecDeque; +use crate::connectable::{UdpConnectable, UdpMode}; use crate::connection::MavConnection; use crate::peek_reader::PeekReader; use crate::{MavHeader, MavlinkVersion, Message}; use core::ops::DerefMut; use std::io::{self, Read}; -use std::net::ToSocketAddrs; use std::net::{SocketAddr, UdpSocket}; use std::sync::Mutex; -use super::get_socket_addr; +use super::{get_socket_addr, Connectable}; #[cfg(not(feature = "signing"))] use crate::{read_versioned_msg, write_versioned_msg}; @@ -19,50 +19,6 @@ use crate::{read_versioned_msg, write_versioned_msg}; #[cfg(feature = "signing")] use crate::{read_versioned_msg_signed, write_versioned_msg_signed, SigningConfig, SigningData}; -pub fn select_protocol<M: Message>( - address: &str, -) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { - let connection = if let Some(address) = address.strip_prefix("udpin:") { - udpin(address) - } else if let Some(address) = address.strip_prefix("udpout:") { - udpout(address) - } else if let Some(address) = address.strip_prefix("udpbcast:") { - udpbcast(address) - } else { - Err(io::Error::new( - io::ErrorKind::AddrNotAvailable, - "Protocol unsupported", - )) - }; - - Ok(Box::new(connection?)) -} - -pub fn udpbcast<T: ToSocketAddrs>(address: T) -> io::Result<UdpConnection> { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket - .set_broadcast(true) - .expect("Couldn't bind to broadcast address."); - UdpConnection::new(socket, false, Some(addr)) -} - -pub fn udpout<T: ToSocketAddrs>(address: T) -> io::Result<UdpConnection> { - let addr = get_socket_addr(address)?; - let socket = UdpSocket::bind("0.0.0.0:0")?; - UdpConnection::new(socket, false, Some(addr)) -} - -pub fn udpin<T: ToSocketAddrs>(address: T) -> io::Result<UdpConnection> { - let addr = address - .to_socket_addrs() - .unwrap() - .next() - .expect("Invalid address"); - let socket = UdpSocket::bind(addr)?; - UdpConnection::new(socket, true, None) -} - struct UdpRead { socket: UdpSocket, buffer: VecDeque<u8>, @@ -192,6 +148,20 @@ impl<M: Message> MavConnection<M> for UdpConnection { } } +impl Connectable for UdpConnectable { + fn connect<M: Message>(&self) -> io::Result<Box<dyn MavConnection<M> + Sync + Send>> { + let (addr, server, dest): (&str, _, _) = match self.mode { + UdpMode::Udpin => (&self.address, true, None), + _ => ("0.0.0.0:0", false, Some(get_socket_addr(&self.address)?)), + }; + let socket = UdpSocket::bind(addr)?; + if matches!(self.mode, UdpMode::Udpcast) { + socket.set_broadcast(true)?; + } + Ok(Box::new(UdpConnection::new(socket, server, dest)?)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index 4d2eebddd56a4ea074242125872d08b766c6e5ed..0cc5e877853f34aba8089d2548080cce55ac1e6d 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -48,12 +48,12 @@ pub mod bytes_mut; mod connection; pub mod error; #[cfg(feature = "std")] -pub use self::connection::{connect, MavConnection}; +pub use self::connection::{connect, Connectable, MavConnection}; #[cfg(feature = "tokio-1")] mod async_connection; #[cfg(feature = "tokio-1")] -pub use self::async_connection::{connect_async, AsyncMavConnection}; +pub use self::async_connection::{connect_async, AsyncConnectable, AsyncMavConnection}; #[cfg(feature = "tokio-1")] pub mod async_peek_reader; @@ -74,6 +74,13 @@ pub use self::signing::{SigningConfig, SigningData}; #[cfg(feature = "signing")] use sha2::{Digest, Sha256}; +#[cfg(any(feature = "std", feature = "tokio-1"))] +mod connectable; +#[cfg(any(feature = "std", feature = "tokio-1"))] +pub use connectable::{ + ConnectionAddress, FileConnectable, SerialConnectable, TcpConnectable, UdpConnectable, +}; + pub const MAX_FRAME_SIZE: usize = 280; pub trait Message diff --git a/mavlink/tests/parse_test.rs b/mavlink/tests/parse_test.rs new file mode 100644 index 0000000000000000000000000000000000000000..cfa8a3054c123d252dbc37cc783c33b3917eebaa --- /dev/null +++ b/mavlink/tests/parse_test.rs @@ -0,0 +1,30 @@ +mod parse_tests { + use mavlink::ConnectionAddress; + + fn assert_parse(addr: &str) { + assert_eq!( + format!("{}", ConnectionAddress::parse_address(addr).unwrap()), + addr + ); + } + + #[test] + fn test_parse() { + assert_parse("tcpin:example.com:99"); + assert_parse("tcpout:127.0.0.1:14549"); + assert_parse("file:/mnt/12_44-mav.bin"); + assert_parse("file:C:\\mav_logs\\test.bin"); + assert_parse("udpcast:[::1]:4567"); + assert_parse("udpin:[2001:db8:85a3:8d3:1319:8a2e:370:7348]:443"); + assert_parse("udpout:1.1.1.1:1"); + assert_parse("serial:/dev/ttyUSB0:9600"); + assert_parse("serial:COM0:115200"); + + assert!(ConnectionAddress::parse_address("serial:/dev/ttyUSB0").is_err()); + assert!(ConnectionAddress::parse_address("updout:1.1.1.1:1").is_err()); + assert!(ConnectionAddress::parse_address("tcp:127.0.0.1:14540").is_err()); + assert!(ConnectionAddress::parse_address("tcpin127.0.0.1:14540").is_err()); + assert!(ConnectionAddress::parse_address(" udpout:1.1.1.1:1 ").is_err()); + assert!(ConnectionAddress::parse_address(":udpcast:[::1]:4567").is_err()); + } +}