Select Git revision
serial.rs 5.87 KiB
use std::{
io::Read,
ops::Deref,
sync::Arc,
thread::{self, JoinHandle},
time::{Duration, Instant},
};
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use hashbrown::HashMap;
use parking_lot::{Mutex, RwLock};
use serialport::SerialPort;
use crate::error::{Error, SResult};
/// Handler of the serial port
pub struct SerialManager {
serial_map: RwLock<HashMap<String, SerialQueue>>,
}
/// Queue data to allow non-blocking writes and blocking reads
#[derive(Debug)]
struct SerialQueue {
handle: Vec<JoinHandle<()>>,
writer_ch: Option<Sender<Vec<u8>>>,
reader_ch: Option<Receiver<u8>>,
serial: Arc<Mutex<Option<Box<dyn SerialPort>>>>,
/// this the timeout after which it returns a timeout error
read_timeout: Duration,
}
impl SerialManager {
pub fn new() -> Self {
Self {
serial_map: RwLock::new(HashMap::new()),
}
}
/// Open the serial port with the given `port` device path and `baudrate`
/// and bind it to the `name` key
pub fn open(&mut self, name: &str, port: &str, baudrate: u32) -> SResult<()> {
self.serial_map
.write()
.insert(name.to_owned(), SerialQueue::open(port, baudrate)?);
Ok(())
}
/// Close the serial port if it is open
pub fn close(&mut self, name: &str) -> SResult<()> {
if self.serial_map.read().contains_key(name) {
self.serial_map.write().remove(name);
Ok(())
} else {
Err(Error::SerialNotOpen)
}
}
/// Close all serial that are open
pub fn close_all(&mut self) {
self.serial_map.write().clear();
}
/// Read `n` bytes from the serial port
pub fn read_n_bytes(&self, name: &str, n: usize) -> SResult<Vec<u8>> {
let map = self.serial_map.read();
let port = map.get(name).ok_or(Error::SerialNotOpen)?;
let mut buf = vec![0; n];
// implement a read exact to allow better error messages
let mut filled = 0;
while filled < n { match port
.reader_ch
.as_ref()
.unwrap()
.recv_timeout(port.read_timeout)
{
Ok(b) => {
buf[filled] = b;
filled += 1;
}
Err(RecvTimeoutError::Timeout) => {
if filled == 0 {
return Err(Error::NoDataAvailable);
}
return Err(Error::IncompleteRead {
amount_read: filled,
inner: Box::new(Error::NoDataAvailable),
});
}
Err(RecvTimeoutError::Disconnected) => {
return Err(Error::SerialNotOpen);
}
}
}
Ok(buf)
}
/// Write `data` to the serial port
pub fn enqueue_bytes(&self, name: &str, data: &[u8]) -> SResult<()> {
let map = self.serial_map.read();
map.get(name)
.ok_or(Error::SerialNotOpen)?
.enqueue_bytes(data)
}
/// Clear the buffer of the serial port
pub fn clear_buffer(&self, name: &str) -> SResult<()> {
let map = self.serial_map.read();
let port = map.get(name).ok_or(Error::SerialNotOpen)?;
port.clear_buffer()
}
}
impl Deref for SerialQueue {
type Target = Mutex<Option<Box<dyn SerialPort>>>;
fn deref(&self) -> &Self::Target {
self.serial.as_ref()
}
}
impl SerialQueue {
/// Open the serial port and start a thread to write data as soon as it is
/// available (by using a channel)
fn open(port: &str, baudrate: u32) -> SResult<Self> {
let read_waiting_interval = Duration::from_millis(1);
let serial = Arc::new(Mutex::new(Some(
serialport::new(port, baudrate)
.timeout(read_waiting_interval)
.open()?,
)));
let mut handles = Vec::new();
// spawn a thread to write data as soon as it is available
let (wtx, wrx) = unbounded::<Vec<u8>>();
let ser = Arc::clone(&serial);
handles.push(thread::spawn(move || {
while let Ok(data) = wrx.recv() {
ser.lock().as_mut().unwrap().write_all(&data).unwrap();
} }));
// spawn a thread to read data as soon as it is available
let (rtx, rrx) = unbounded::<u8>();
let ser = Arc::clone(&serial);
handles.push(thread::spawn(move || {
let mut buf = [0; 1024];
loop {
match ser.lock().as_mut().map(|p| p.read(&mut buf)) {
Some(Ok(n)) => {
for b in buf.iter().take(n) {
rtx.send(*b).unwrap();
}
}
Some(Err(e)) if e.kind() == std::io::ErrorKind::TimedOut => {}
_ => break,
}
}
}));
Ok(Self {
handle: handles,
writer_ch: Some(wtx),
reader_ch: Some(rrx),
serial,
read_timeout: Duration::from_millis(1000),
})
}
/// Enqueue `data` to be written to the serial port, this is non-blocking
fn enqueue_bytes(&self, data: &[u8]) -> SResult<()> {
self.writer_ch.as_ref().unwrap().send(data.to_vec())?;
Ok(())
}
/// Clear the buffer of the serial port
fn clear_buffer(&self) -> SResult<()> {
while self
.reader_ch
.as_ref()
.unwrap()
.recv_timeout(Duration::from_millis(50))
.is_ok()
{}
Ok(())
}
}
impl Drop for SerialQueue {
fn drop(&mut self) {
drop(self.writer_ch.take().unwrap());
drop(self.serial.lock().take().unwrap());
for handle in self.handle.drain(..) {
handle.join().unwrap();
}
}
}