diff --git a/Cargo.lock b/Cargo.lock index b29a0bca4ea8b14e2357358ddc3a1ea7bf271f81..211fd95195c8046b09bb7db4f1d6909991139097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,6 +59,21 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "hashbrown" version = "0.14.3" @@ -372,6 +387,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" name = "serial-bridge" version = "0.2.0" dependencies = [ + "crossbeam-channel", "hashbrown", "lazy_static", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 39e183e8ddf8d729504b7d2444abdcc19fb50299..56f1f655d9db656cc4a147803ac3842b2be8fd03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ license = "MIT" crate-type = ["cdylib"] [dependencies] +crossbeam-channel = "0.5.12" hashbrown = "0.14.3" lazy_static = "1.4.0" parking_lot = "0.12.1" diff --git a/src/error.rs b/src/error.rs index f9b35dbb2f6ce76638ccd50e15b2de6fd3e20ce2..10300ca4ffe9e9b7cc6fdc65a4b3db291ee41073 100644 --- a/src/error.rs +++ b/src/error.rs @@ -94,7 +94,7 @@ pub enum Error { inner: Box<Self>, }, #[error("Channel error: {0}")] - Channel(#[from] std::sync::mpsc::SendError<Vec<u8>>), + Channel(#[from] crossbeam_channel::SendError<Vec<u8>>), #[error("Parse error")] Parse, #[error("Matlab error: {0}")] diff --git a/src/serial.rs b/src/serial.rs index cdaf70095e72cc140878b876b1278695c427f9e3..c9b3ea2bc5591b6e328a12bf2aa4cd798903baff 100644 --- a/src/serial.rs +++ b/src/serial.rs @@ -1,14 +1,12 @@ use std::{ io::Read, ops::Deref, - sync::{ - mpsc::{channel, Sender}, - Arc, - }, + sync::Arc, thread::{self, JoinHandle}, time::{Duration, Instant}, }; +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use hashbrown::HashMap; use parking_lot::{Mutex, RwLock}; use serialport::SerialPort; @@ -23,9 +21,10 @@ pub struct SerialManager { /// Queue data to allow non-blocking writes and blocking reads #[derive(Debug)] struct SerialQueue { - handle: Option<JoinHandle<()>>, - sender: Option<Sender<Vec<u8>>>, - serial: Arc<Mutex<Box<dyn SerialPort>>>, + handle: Vec<JoinHandle<()>>, + writer_ch: Option<Sender<Vec<u8>>>, + reader_ch: Option<Receiver<u8>>, + serial: Arc<Mutex<Option<Box<dyn SerialPort>>>>, /// this the timeout after which it returns a timeout error read_timeout: Duration, } @@ -68,29 +67,25 @@ impl SerialManager { let mut buf = vec![0; n]; // implement a read exact to allow better error messages let mut filled = 0; - let start = Instant::now(); + let timeout = Instant::now() + port.read_timeout; while filled < n { - let mut reader = port.lock(); - match reader.read(&mut buf[filled..]) { - Ok(n) => filled += n, - Err(e) => match e.kind() { - std::io::ErrorKind::TimedOut => { - // if timeout hasn't been reached yet - if start.elapsed() < port.read_timeout { - continue; // this triggers a waiting mutex on the writing side - } - - // otherwise - if filled == 0 { - return Err(Error::NoDataAvailable); - } - return Err(Error::IncompleteRead { - amount_read: filled, - inner: Box::new(Error::NoDataAvailable), - }); + match port.reader_ch.as_ref().unwrap().recv_deadline(timeout) { + Ok(b) => { + buf[filled] = b; + filled += 1; + } + Err(RecvTimeoutError::Timeout) => { + if filled == 0 { + return Err(Error::NoDataAvailable); } - _ => return Err(e.into()), - }, + return Err(Error::IncompleteRead { + amount_read: filled, + inner: Box::new(Error::NoDataAvailable), + }); + } + Err(RecvTimeoutError::Disconnected) => { + return Err(Error::SerialNotOpen); + } } } Ok(buf) @@ -106,10 +101,10 @@ impl SerialManager { } impl Deref for SerialQueue { - type Target = Mutex<Box<dyn SerialPort>>; + type Target = Mutex<Option<Box<dyn SerialPort>>>; fn deref(&self) -> &Self::Target { - &self.serial + self.serial.as_ref() } } @@ -118,21 +113,46 @@ impl SerialQueue { /// available (by using a channel) fn open(port: &str, baudrate: u32) -> SResult<Self> { let read_waiting_interval = Duration::from_millis(50); - let serial = Arc::new(Mutex::new( + let serial = Arc::new(Mutex::new(Some( serialport::new(port, baudrate) .timeout(read_waiting_interval) .open()?, - )); - let (tx, rx) = channel::<Vec<u8>>(); + ))); + let mut handles = Vec::new(); + + // spawn a thread to write data as soon as it is available + let (wtx, wrx) = unbounded::<Vec<u8>>(); + let ser = Arc::clone(&serial); + handles.push(thread::spawn(move || { + while let Ok(data) = wrx.recv() { + ser.lock().as_mut().unwrap().write_all(&data).unwrap(); + } + })); + + // spawn a thread to read data as soon as it is available + let (rtx, rrx) = unbounded::<u8>(); let ser = Arc::clone(&serial); - let handle = thread::spawn(move || { - while let Ok(data) = rx.recv() { - ser.lock().write_all(&data).unwrap(); + handles.push(thread::spawn(move || { + let mut buf = [0; 1024]; + loop { + match ser.lock().as_mut().map(|p| p.read(&mut buf)) { + Some(Ok(n)) => { + for b in buf.iter().take(n) { + rtx.send(*b).unwrap(); + } + } + Some(Err(e)) if e.kind() == std::io::ErrorKind::TimedOut => { + thread::sleep(read_waiting_interval); + } + _ => break, + } } - }); + })); + Ok(Self { - handle: Some(handle), - sender: Some(tx), + handle: handles, + writer_ch: Some(wtx), + reader_ch: Some(rrx), serial, read_timeout: Duration::from_millis(1000), }) @@ -140,14 +160,17 @@ impl SerialQueue { /// Enqueue `data` to be written to the serial port, this is non-blocking fn enqueue_bytes(&self, data: &[u8]) -> SResult<()> { - self.sender.as_ref().unwrap().send(data.to_vec())?; + self.writer_ch.as_ref().unwrap().send(data.to_vec())?; Ok(()) } } impl Drop for SerialQueue { fn drop(&mut self) { - drop(self.sender.take().unwrap()); - self.handle.take().unwrap().join().unwrap(); + drop(self.writer_ch.take().unwrap()); + drop(self.serial.lock().take().unwrap()); + for handle in self.handle.drain(..) { + handle.join().unwrap(); + } } }