Skip to content
Snippets Groups Projects
Select Git revision
4 results Searching

TODO

Blame
  • serial.rs 5.87 KiB
    use std::{
        io::Read,
        ops::Deref,
        sync::Arc,
        thread::{self, JoinHandle},
        time::{Duration, Instant},
    };
    
    use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
    use hashbrown::HashMap;
    use parking_lot::{Mutex, RwLock};
    use serialport::SerialPort;
    
    use crate::error::{Error, SResult};
    
    /// Handler of the serial port
    pub struct SerialManager {
        serial_map: RwLock<HashMap<String, SerialQueue>>,
    }
    
    /// Queue data to allow non-blocking writes and blocking reads
    #[derive(Debug)]
    struct SerialQueue {
        handle: Vec<JoinHandle<()>>,
        writer_ch: Option<Sender<Vec<u8>>>,
        reader_ch: Option<Receiver<u8>>,
        serial: Arc<Mutex<Option<Box<dyn SerialPort>>>>,
        /// this the timeout after which it returns a timeout error
        read_timeout: Duration,
    }
    
    impl SerialManager {
        pub fn new() -> Self {
            Self {
                serial_map: RwLock::new(HashMap::new()),
            }
        }
    
        /// Open the serial port with the given `port` device path and `baudrate`
        /// and bind it to the `name` key
        pub fn open(&mut self, name: &str, port: &str, baudrate: u32) -> SResult<()> {
            self.serial_map
                .write()
                .insert(name.to_owned(), SerialQueue::open(port, baudrate)?);
            Ok(())
        }
    
        /// Close the serial port if it is open
        pub fn close(&mut self, name: &str) -> SResult<()> {
            if self.serial_map.read().contains_key(name) {
                self.serial_map.write().remove(name);
                Ok(())
            } else {
                Err(Error::SerialNotOpen)
            }
        }
    
        /// Close all serial that are open
        pub fn close_all(&mut self) {
            self.serial_map.write().clear();
        }
    
        /// Read `n` bytes from the serial port
        pub fn read_n_bytes(&self, name: &str, n: usize) -> SResult<Vec<u8>> {
            let map = self.serial_map.read();
            let port = map.get(name).ok_or(Error::SerialNotOpen)?;
            let mut buf = vec![0; n];
            // implement a read exact to allow better error messages
            let mut filled = 0;
            while filled < n {
                match port
                    .reader_ch
                    .as_ref()
                    .unwrap()
                    .recv_timeout(port.read_timeout)
                {
                    Ok(b) => {
                        buf[filled] = b;
                        filled += 1;
                    }
                    Err(RecvTimeoutError::Timeout) => {
                        if filled == 0 {
                            return Err(Error::NoDataAvailable);
                        }
                        return Err(Error::IncompleteRead {
                            amount_read: filled,
                            inner: Box::new(Error::NoDataAvailable),
                        });
                    }
                    Err(RecvTimeoutError::Disconnected) => {
                        return Err(Error::SerialNotOpen);
                    }
                }
            }
            Ok(buf)
        }
    
        /// Write `data` to the serial port
        pub fn enqueue_bytes(&self, name: &str, data: &[u8]) -> SResult<()> {
            let map = self.serial_map.read();
            map.get(name)
                .ok_or(Error::SerialNotOpen)?
                .enqueue_bytes(data)
        }
    
        /// Clear the buffer of the serial port
        pub fn clear_buffer(&self, name: &str) -> SResult<()> {
            let map = self.serial_map.read();
            let port = map.get(name).ok_or(Error::SerialNotOpen)?;
            port.clear_buffer()
        }
    }
    
    impl Deref for SerialQueue {
        type Target = Mutex<Option<Box<dyn SerialPort>>>;
    
        fn deref(&self) -> &Self::Target {
            self.serial.as_ref()
        }
    }
    
    impl SerialQueue {
        /// Open the serial port and start a thread to write data as soon as it is
        /// available (by using a channel)
        fn open(port: &str, baudrate: u32) -> SResult<Self> {
            let read_waiting_interval = Duration::from_millis(1);
            let serial = Arc::new(Mutex::new(Some(
                serialport::new(port, baudrate)
                    .timeout(read_waiting_interval)
                    .open()?,
            )));
            let mut handles = Vec::new();
    
            // spawn a thread to write data as soon as it is available
            let (wtx, wrx) = unbounded::<Vec<u8>>();
            let ser = Arc::clone(&serial);
            handles.push(thread::spawn(move || {
                while let Ok(data) = wrx.recv() {
                    ser.lock().as_mut().unwrap().write_all(&data).unwrap();
                }
            }));
    
            // spawn a thread to read data as soon as it is available
            let (rtx, rrx) = unbounded::<u8>();
            let ser = Arc::clone(&serial);
            handles.push(thread::spawn(move || {
                let mut buf = [0; 1024];
                loop {
                    match ser.lock().as_mut().map(|p| p.read(&mut buf)) {
                        Some(Ok(n)) => {
                            for b in buf.iter().take(n) {
                                rtx.send(*b).unwrap();
                            }
                        }
                        Some(Err(e)) if e.kind() == std::io::ErrorKind::TimedOut => {}
                        _ => break,
                    }
                }
            }));
    
            Ok(Self {
                handle: handles,
                writer_ch: Some(wtx),
                reader_ch: Some(rrx),
                serial,
                read_timeout: Duration::from_millis(1000),
            })
        }
    
        /// Enqueue `data` to be written to the serial port, this is non-blocking
        fn enqueue_bytes(&self, data: &[u8]) -> SResult<()> {
            self.writer_ch.as_ref().unwrap().send(data.to_vec())?;
            Ok(())
        }
    
        /// Clear the buffer of the serial port
        fn clear_buffer(&self) -> SResult<()> {
            while self
                .reader_ch
                .as_ref()
                .unwrap()
                .recv_timeout(Duration::from_millis(50))
                .is_ok()
            {}
            Ok(())
        }
    }
    
    impl Drop for SerialQueue {
        fn drop(&mut self) {
            drop(self.writer_ch.take().unwrap());
            drop(self.serial.lock().take().unwrap());
            for handle in self.handle.drain(..) {
                handle.join().unwrap();
            }
        }
    }