diff --git a/src/error.rs b/src/error.rs index f75e3142c214317f321e55a9aa2f77fe2b779e13..c6f8e9b5e6fd247d708b39c3a34ba681e5b03b13 100644 --- a/src/error.rs +++ b/src/error.rs @@ -58,7 +58,9 @@ pub enum Error { MissingSerialMode, #[error("Missing port name (second argument)")] MissingPortName, - #[error("Missing baudrate (third argument)")] + #[error("Missing port path (third argument)")] + MissingPortPath, + #[error("Missing baudrate (fourth argument)")] MissingBaudrate, #[error("Missing read amount (second argument)")] MissingReadAmount, @@ -72,7 +74,9 @@ pub enum Error { InvalidMode, #[error("Invalid port name (2nd argument): {0}")] InvalidPortName(Box<Self>), - #[error("Invalid baud rate (3rd argument): {0}")] + #[error("Invalid port path (3rd argument): {0}")] + InvalidPortPath(Box<Self>), + #[error("Invalid baud rate (4th argument): {0}")] InvalidBaudrate(Box<Self>), #[error("Invalid read amount (2nd argument): {0}")] InvalidReadAmount(Box<Self>), @@ -82,6 +86,8 @@ pub enum Error { SerialPort(#[from] serialport::Error), #[error("I/O error: {0}")] IO(#[from] std::io::Error), + #[error("Channel error: {0}")] + Channel(#[from] std::sync::mpsc::SendError<Vec<u8>>), #[error("Parse error")] Parse, #[error("Matlab error: {0}")] @@ -101,6 +107,7 @@ impl Error { Error::TooManyParameters => "serialbridge:invalid_input", Error::MissingSerialMode => "serialbridge:missing_input", Error::MissingPortName => "serialbridge:missing_input", + Error::MissingPortPath => "serialbridge:missing_input", Error::MissingBaudrate => "serialbridge:missing_input", Error::MissingReadAmount => "serialbridge:missing_input", Error::MissingWriteData => "serialbridge:missing_input", @@ -108,11 +115,13 @@ impl Error { Error::InvalidMatlabType(_) => "serialbridge:invalid_input", Error::InvalidMode => "serialbridge:invalid_input", Error::InvalidPortName(_) => "serialbridge:invalid_input", + Error::InvalidPortPath(_) => "serialbridge:invalid_input", Error::InvalidBaudrate(_) => "serialbridge:invalid_input", Error::InvalidReadAmount(_) => "serialbridge:invalid_input", Error::InvalidWriteData(_) => "serialbridge:invalid_input", Error::SerialPort(_) => "serialbridge:serial_error", Error::IO(_) => "serialbridge:serial_error", + Error::Channel(_) => "serialbridge:channel_error", Error::Parse => "serialbridge:parse_error", Error::Matlab(_) => "serialbridge:matlab_error", Error::Rustmex(err) => err.id(), diff --git a/src/lib.rs b/src/lib.rs index 67cbfda13c13c69346e57eb10f5582a517ca9a59..2e2dd29ac4c0d5eee99f4753034f5b9abbacb2d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,8 @@ mod error; mod serial; mod types; -use std::sync::RwLock; - +use lazy_static::lazy_static; +use parking_lot::RwLock; use rustmex::prelude::*; use error::{Error, MapMexError, SResult}; @@ -11,7 +11,7 @@ use types::{IntoMatlabType, IntoRustType, Mode}; use crate::serial::SerialManager; -lazy_static::lazy_static! { +lazy_static! { static ref SERIAL: RwLock<SerialManager> = RwLock::new(SerialManager::new()); } @@ -88,12 +88,16 @@ fn serialbridge(lhs: Lhs, rhs: Rhs) -> rustmex::Result<()> { /// Open the serial port specified by the first argument with the baudrate /// specified by the second argument fn open_serial(args: Args<'_>) -> SResult<()> { - args.assert_params_max_len(2)?; - let port = get_port_name(1, &args)?; + args.assert_params_max_len(3)?; + let name = get_port_name(1, &args)?; + let port: String = args + .get(2, Error::MissingPortPath)? + .into_rust() + .map_mexerr(|e| Error::InvalidPortPath(Box::new(e)))?; // Matlab defaults to f64 when inserting numbers, to improve UX we take a // f64 and cast to a u32 let arg2: f64 = args - .get(2, Error::MissingBaudrate)? + .get(3, Error::MissingBaudrate)? .into_rust() .map_mexerr(|e| Error::InvalidBaudrate(Box::new(e)))?; // Check for arg2 to resemble a baud rate (this type mismatch should be @@ -105,9 +109,14 @@ fn open_serial(args: Args<'_>) -> SResult<()> { } let baudrate = arg2 as u32; - warn_debug!("Open serial port {} with baudrate {}", port, baudrate); + warn_debug!( + "Open serial port {} with baudrate {} at {}", + port, + baudrate, + name + ); - SERIAL.write().unwrap().open(&port, baudrate)?; + SERIAL.write().open(&name, &port, baudrate)?; Ok(()) } @@ -117,7 +126,7 @@ fn close_serial(args: Args<'_>) -> SResult<()> { args.assert_params_max_len(1)?; let port = get_port_name(1, &args)?; - SERIAL.write().unwrap().close(&port)?; + SERIAL.write().close(&port)?; Ok(()) } @@ -163,7 +172,7 @@ fn write_to_serial(args: Args<'_>) -> SResult<()> { // Convert the doubles to a stream of bytes and write them to the serial port let data: Vec<u8> = data.iter().flat_map(|&x| x.to_be_bytes()).collect(); - SERIAL.read().unwrap().write_bytes(&port, &data)?; + SERIAL.read().enqueue_bytes(&port, &data)?; warn_debug!("Wrote {} bytes to serial port", data.len()); Ok(()) } diff --git a/src/serial.rs b/src/serial.rs index aa1d6fde6ee9d8f0b7efc833f5705c2a4397a32b..a850c29519bf7b13ab0d487969793052ddfcdf66 100644 --- a/src/serial.rs +++ b/src/serial.rs @@ -1,3 +1,13 @@ +use std::{ + io::Read, + ops::Deref, + sync::{ + mpsc::{channel, Sender}, + Arc, + }, + thread::{self, JoinHandle}, +}; + use hashbrown::HashMap; use parking_lot::{Mutex, RwLock}; use serialport::SerialPort; @@ -6,29 +16,37 @@ use crate::error::{Error, SResult}; /// Handler of the serial port pub struct SerialManager { - serial: RwLock<HashMap<String, Mutex<Box<dyn SerialPort>>>>, + serial_map: RwLock<HashMap<String, SerialQueue>>, +} + +/// Queue data to allow non-blocking writes and blocking reads +#[derive(Debug)] +struct SerialQueue { + _handle: JoinHandle<()>, + sender: Sender<Vec<u8>>, + serial: Arc<Mutex<Box<dyn SerialPort>>>, } impl SerialManager { pub fn new() -> Self { Self { - serial: RwLock::new(HashMap::new()), + serial_map: RwLock::new(HashMap::new()), } } /// Open the serial port with the given `port` device path and `baudrate` - pub fn open(&mut self, port: &str, baudrate: u32) -> SResult<()> { - let serial = serialport::new(port, baudrate).open()?; - self.serial + /// and bind it to the `name` key + pub fn open(&mut self, name: &str, port: &str, baudrate: u32) -> SResult<()> { + self.serial_map .write() - .insert(port.to_owned(), Mutex::new(serial)); + .insert(name.to_owned(), SerialQueue::open(port, baudrate)?); Ok(()) } /// Close the serial port if it is open - pub fn close(&mut self, port: &str) -> SResult<()> { - if self.serial.read().contains_key(port) { - self.serial.write().remove(port); + pub fn close(&mut self, name: &str) -> SResult<()> { + if self.serial_map.read().contains_key(name) { + self.serial_map.write().remove(name); Ok(()) } else { Err(Error::SerialNotOpen) @@ -36,19 +54,53 @@ impl SerialManager { } /// Read `n` bytes from the serial port - pub fn read_n_bytes(&self, port: &str, n: usize) -> SResult<Vec<u8>> { - let map = self.serial.read(); - let mut port = map.get(port).ok_or(Error::SerialNotOpen)?.lock(); + pub fn read_n_bytes(&self, name: &str, n: usize) -> SResult<Vec<u8>> { + let map = self.serial_map.read(); + let port = map.get(name).ok_or(Error::SerialNotOpen)?; let mut buf = vec![0; n]; - port.read_exact(&mut buf)?; + port.lock().read_exact(&mut buf)?; Ok(buf) } /// Write `data` to the serial port - pub fn write_bytes(&self, port: &str, data: &[u8]) -> SResult<()> { - let map = self.serial.read(); - let mut port = map.get(port).as_ref().ok_or(Error::SerialNotOpen)?.lock(); - port.write_all(data)?; + pub fn enqueue_bytes(&self, name: &str, data: &[u8]) -> SResult<()> { + let map = self.serial_map.read(); + map.get(name) + .ok_or(Error::SerialNotOpen)? + .enqueue_bytes(data) + } +} + +impl Deref for SerialQueue { + type Target = Mutex<Box<dyn SerialPort>>; + + fn deref(&self) -> &Self::Target { + &self.serial + } +} + +impl SerialQueue { + /// Open the serial port and start a thread to write data as soon as it is + /// available (by using a channel) + fn open(port: &str, baudrate: u32) -> SResult<Self> { + let serial = Arc::new(Mutex::new(serialport::new(port, baudrate).open()?)); + let (tx, rx) = channel::<Vec<u8>>(); + let ser = Arc::clone(&serial); + let _handle = thread::spawn(move || { + while let Ok(data) = rx.recv() { + ser.lock().write_all(&data).unwrap(); + } + }); + Ok(Self { + _handle, + sender: tx, + serial, + }) + } + + /// Enqueue `data` to be written to the serial port, this is non-blocking + fn enqueue_bytes(&self, data: &[u8]) -> SResult<()> { + self.sender.send(data.to_vec())?; Ok(()) } }