diff --git a/mavlink-core/Cargo.toml b/mavlink-core/Cargo.toml
index 64a20c938d735189a8796a5f8f61ceff26340111..d51915905ab581e8985728a2752e309fd44b63e0 100644
--- a/mavlink-core/Cargo.toml
+++ b/mavlink-core/Cargo.toml
@@ -26,8 +26,9 @@ embedded-io-async = { version = "0.6.1", optional = true }
 serde = { version = "1.0.115", optional = true, features = ["derive"] }
 serde_arrays = { version = "0.1.0", optional = true }
 serial = { version = "0.4", optional = true }
-tokio = { version = "1.0", default-features = false, features = ["io-util"], optional = true }
+tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs", ], optional = true }
 sha2 = { version = "0.10", optional = true }
+async-trait = { version = "0.1.18", optional = true }
 
 [features]
 "std" = ["byteorder/std"]
@@ -40,6 +41,10 @@ sha2 = { version = "0.10", optional = true }
 "embedded" = ["dep:embedded-io", "dep:embedded-io-async"]
 "embedded-hal-02" = ["dep:nb", "dep:embedded-hal-02"]
 "serde" = ["dep:serde", "dep:serde_arrays"]
-"tokio-1" = ["dep:tokio"]
+"tokio-1" = ["dep:tokio", "dep:async-trait"]
 "signing" = ["dep:sha2"]
 default = ["std", "tcp", "udp", "direct-serial", "serde"]
+
+[dev-dependencies]
+# 1.39 tokio macros seems to be currently broken
+tokio = { version = "1.0, <=1.38", default-features = false, features = ["io-util", "net", "sync", "fs","macros", "rt" ] }
\ No newline at end of file
diff --git a/mavlink-core/src/async_connection/file.rs b/mavlink-core/src/async_connection/file.rs
new file mode 100644
index 0000000000000000000000000000000000000000..9b1c90af3ae9d4f0aeb4e543dec0aa5f1aa8abde
--- /dev/null
+++ b/mavlink-core/src/async_connection/file.rs
@@ -0,0 +1,83 @@
+use core::ops::DerefMut;
+
+use super::AsyncMavConnection;
+use crate::error::{MessageReadError, MessageWriteError};
+
+use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message};
+
+use tokio::fs::File;
+use tokio::io;
+use tokio::sync::Mutex;
+
+#[cfg(not(feature = "signing"))]
+use crate::read_versioned_msg_async;
+
+#[cfg(feature = "signing")]
+use crate::{read_versioned_msg_async_signed, SigningConfig, SigningData};
+
+/// File MAVLINK connection
+
+pub async fn open(file_path: &str) -> io::Result<AsyncFileConnection> {
+    let file = File::open(file_path).await?;
+    Ok(AsyncFileConnection {
+        file: Mutex::new(AsyncPeekReader::new(file)),
+        protocol_version: MavlinkVersion::V2,
+        #[cfg(feature = "signing")]
+        signing_data: None,
+    })
+}
+
+pub struct AsyncFileConnection {
+    file: Mutex<AsyncPeekReader<File>>,
+    protocol_version: MavlinkVersion,
+
+    #[cfg(feature = "signing")]
+    signing_data: Option<SigningData>,
+}
+
+#[async_trait::async_trait]
+impl<M: Message + Sync> AsyncMavConnection<M> for AsyncFileConnection {
+    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
+        let mut file = self.file.lock().await;
+
+        loop {
+            #[cfg(not(feature = "signing"))]
+            let result = read_versioned_msg_async(file.deref_mut(), self.protocol_version).await;
+            #[cfg(feature = "signing")]
+            let result = read_versioned_msg_async_signed(
+                file.deref_mut(),
+                self.protocol_version,
+                self.signing_data.as_ref(),
+            )
+            .await;
+            match result {
+                ok @ Ok(..) => {
+                    return ok;
+                }
+                Err(MessageReadError::Io(e)) => {
+                    if e.kind() == io::ErrorKind::UnexpectedEof {
+                        return Err(MessageReadError::Io(e));
+                    }
+                }
+                _ => {}
+            }
+        }
+    }
+
+    async fn send(&self, _header: &MavHeader, _data: &M) -> Result<usize, MessageWriteError> {
+        Ok(0)
+    }
+
+    fn set_protocol_version(&mut self, version: MavlinkVersion) {
+        self.protocol_version = version;
+    }
+
+    fn get_protocol_version(&self) -> MavlinkVersion {
+        self.protocol_version
+    }
+
+    #[cfg(feature = "signing")]
+    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
+        self.signing_data = signing_data.map(SigningData::from_config)
+    }
+}
diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..6ef4cc156ad013e9f591936f0296520b3523b72c
--- /dev/null
+++ b/mavlink-core/src/async_connection/mod.rs
@@ -0,0 +1,137 @@
+use tokio::io;
+
+use crate::{MavFrame, MavHeader, MavlinkVersion, Message};
+
+#[cfg(feature = "tcp")]
+mod tcp;
+
+#[cfg(feature = "udp")]
+mod udp;
+
+mod file;
+
+#[cfg(feature = "signing")]
+use crate::SigningConfig;
+
+/// A MAVLink connection
+#[async_trait::async_trait]
+pub trait AsyncMavConnection<M: Message + Sync> {
+    /// Receive a mavlink message.
+    ///
+    /// Blocks until a valid frame is received, ignoring invalid messages.
+    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>;
+
+    /// Send a mavlink message
+    async fn send(
+        &self,
+        header: &MavHeader,
+        data: &M,
+    ) -> Result<usize, crate::error::MessageWriteError>;
+
+    fn set_protocol_version(&mut self, version: MavlinkVersion);
+    fn get_protocol_version(&self) -> MavlinkVersion;
+
+    /// Write whole frame
+    async fn send_frame(
+        &self,
+        frame: &MavFrame<M>,
+    ) -> Result<usize, crate::error::MessageWriteError> {
+        self.send(&frame.header, &frame.msg).await
+    }
+
+    /// Read whole frame
+    async fn recv_frame(&self) -> Result<MavFrame<M>, crate::error::MessageReadError> {
+        let (header, msg) = self.recv().await?;
+        let protocol_version = self.get_protocol_version();
+        Ok(MavFrame {
+            header,
+            msg,
+            protocol_version,
+        })
+    }
+
+    /// Send a message with default header
+    async fn send_default(&self, data: &M) -> Result<usize, crate::error::MessageWriteError> {
+        let header = MavHeader::default();
+        self.send(&header, data).await
+    }
+
+    /// Setup secret key used for message signing, or disable message signing
+    #[cfg(feature = "signing")]
+    fn setup_signing(&mut self, signing_data: Option<SigningConfig>);
+}
+
+/// Connect to a MAVLink node by address string.
+///
+/// The address must be in one of the following formats:
+///
+///  * `tcpin:<addr>:<port>` to create a TCP server, listening for incoming connections
+///  * `tcpout:<addr>:<port>` to create a TCP client
+///  * `udpin:<addr>:<port>` to create a UDP server, listening for incoming packets
+///  * `udpout:<addr>:<port>` to create a UDP client
+///  * `udpbcast:<addr>:<port>` to create a UDP broadcast
+///  *  NOT `serial:<port>:<baudrate>` to create a serial connection
+///  * `file:<path>` to extract file data
+///
+/// The type of the connection is determined at runtime based on the address type, so the
+/// connection is returned as a trait object.
+// TODO only reason this has to be send is udp serve
+pub async fn connect_async<M: Message + Sync + Send>(
+    address: &str,
+) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> {
+    let protocol_err = Err(io::Error::new(
+        io::ErrorKind::AddrNotAvailable,
+        "Protocol unsupported",
+    ));
+
+    if cfg!(feature = "tcp") && address.starts_with("tcp") {
+        #[cfg(feature = "tcp")]
+        {
+            tcp::select_protocol(address).await
+        }
+        #[cfg(not(feature = "tcp"))]
+        {
+            protocol_err
+        }
+    } else if cfg!(feature = "udp") && address.starts_with("udp") {
+        #[cfg(feature = "udp")]
+        {
+            udp::select_protocol(address).await
+        }
+        #[cfg(not(feature = "udp"))]
+        {
+            protocol_err
+        }
+    } else if cfg!(feature = "direct-serial") && address.starts_with("serial:") {
+        #[cfg(feature = "direct-serial")]
+        {
+            todo!()
+            //Ok(Box::new(direct_serial::open(&address["serial:".len()..])?))
+        }
+        #[cfg(not(feature = "direct-serial"))]
+        {
+            protocol_err
+        }
+    } else if address.starts_with("file") {
+        Ok(Box::new(file::open(&address["file:".len()..]).await?))
+    } else {
+        protocol_err
+    }
+}
+
+// TODO remove this ?
+/// Returns the socket address for the given address.
+pub(crate) fn get_socket_addr<T: std::net::ToSocketAddrs>(
+    address: T,
+) -> Result<std::net::SocketAddr, io::Error> {
+    let addr = match address.to_socket_addrs()?.next() {
+        Some(addr) => addr,
+        None => {
+            return Err(io::Error::new(
+                io::ErrorKind::Other,
+                "Host address lookup failed",
+            ));
+        }
+    };
+    Ok(addr)
+}
diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs
new file mode 100644
index 0000000000000000000000000000000000000000..e05d70455bca6fd5e0053fcad55bca210c28a72a
--- /dev/null
+++ b/mavlink-core/src/async_connection/tcp.rs
@@ -0,0 +1,156 @@
+use super::{get_socket_addr, AsyncMavConnection};
+use crate::async_peek_reader::AsyncPeekReader;
+use crate::{MavHeader, MavlinkVersion, Message};
+
+use core::ops::DerefMut;
+use tokio::io;
+use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::sync::Mutex;
+
+#[cfg(not(feature = "signing"))]
+use crate::{read_versioned_msg_async, write_versioned_msg_async};
+#[cfg(feature = "signing")]
+use crate::{
+    read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData,
+};
+
+/// TCP MAVLink connection
+
+pub async fn select_protocol<M: Message + Sync>(
+    address: &str,
+) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> {
+    let connection = if let Some(address) = address.strip_prefix("tcpout:") {
+        tcpout(address).await
+    } else if let Some(address) = address.strip_prefix("tcpin:") {
+        tcpin(address).await
+    } else {
+        Err(io::Error::new(
+            io::ErrorKind::AddrNotAvailable,
+            "Protocol unsupported",
+        ))
+    };
+
+    Ok(Box::new(connection?))
+}
+
+pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
+    let addr = get_socket_addr(address)?;
+
+    let socket = TcpStream::connect(addr).await?;
+
+    let (reader, writer) = socket.into_split();
+
+    Ok(TcpConnection {
+        reader: Mutex::new(AsyncPeekReader::new(reader)),
+        writer: Mutex::new(TcpWrite {
+            socket: writer,
+            sequence: 0,
+        }),
+        protocol_version: MavlinkVersion::V2,
+        #[cfg(feature = "signing")]
+        signing_data: None,
+    })
+}
+
+pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
+    let addr = get_socket_addr(address)?;
+    let listener = TcpListener::bind(addr).await?;
+
+    //For now we only accept one incoming stream: this blocks until we get one
+    match listener.accept().await {
+        Ok((socket, _)) => {
+            let (reader, writer) = socket.into_split();
+            return Ok(TcpConnection {
+                reader: Mutex::new(AsyncPeekReader::new(reader)),
+                writer: Mutex::new(TcpWrite {
+                    socket: writer,
+                    sequence: 0,
+                }),
+                protocol_version: MavlinkVersion::V2,
+                #[cfg(feature = "signing")]
+                signing_data: None,
+            });
+        }
+        Err(e) => {
+            //TODO don't println in lib
+            println!("listener err: {e}");
+        }
+    }
+    Err(io::Error::new(
+        io::ErrorKind::NotConnected,
+        "No incoming connections!",
+    ))
+}
+
+pub struct TcpConnection {
+    reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
+    writer: Mutex<TcpWrite>,
+    protocol_version: MavlinkVersion,
+    #[cfg(feature = "signing")]
+    signing_data: Option<SigningData>,
+}
+
+struct TcpWrite {
+    socket: OwnedWriteHalf,
+    sequence: u8,
+}
+
+#[async_trait::async_trait]
+impl<M: Message + Sync> AsyncMavConnection<M> for TcpConnection {
+    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
+        let mut reader = self.reader.lock().await;
+        #[cfg(not(feature = "signing"))]
+        let result = read_versioned_msg_async(reader.deref_mut(), self.protocol_version).await;
+        #[cfg(feature = "signing")]
+        let result = read_versioned_msg_async_signed(
+            reader.deref_mut(),
+            self.protocol_version,
+            self.signing_data.as_ref(),
+        )
+        .await;
+        result
+    }
+
+    async fn send(
+        &self,
+        header: &MavHeader,
+        data: &M,
+    ) -> Result<usize, crate::error::MessageWriteError> {
+        let mut lock = self.writer.lock().await;
+
+        let header = MavHeader {
+            sequence: lock.sequence,
+            system_id: header.system_id,
+            component_id: header.component_id,
+        };
+
+        lock.sequence = lock.sequence.wrapping_add(1);
+        #[cfg(not(feature = "signing"))]
+        let result =
+            write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await;
+        #[cfg(feature = "signing")]
+        let result = write_versioned_msg_async_signed(
+            &mut lock.socket,
+            self.protocol_version,
+            header,
+            data,
+            self.signing_data.as_ref(),
+        )
+        .await;
+        result
+    }
+
+    fn set_protocol_version(&mut self, version: MavlinkVersion) {
+        self.protocol_version = version;
+    }
+
+    fn get_protocol_version(&self) -> MavlinkVersion {
+        self.protocol_version
+    }
+
+    #[cfg(feature = "signing")]
+    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
+        self.signing_data = signing_data.map(SigningData::from_config)
+    }
+}
diff --git a/mavlink-core/src/async_connection/udp.rs b/mavlink-core/src/async_connection/udp.rs
new file mode 100644
index 0000000000000000000000000000000000000000..cd945e8055eb9a62e34eacdf498cde0fdaf05172
--- /dev/null
+++ b/mavlink-core/src/async_connection/udp.rs
@@ -0,0 +1,277 @@
+use core::{ops::DerefMut, task::Poll};
+use std::{collections::VecDeque, io::Read, sync::Arc};
+
+use tokio::{
+    io::{self, AsyncRead, ReadBuf},
+    net::UdpSocket,
+    sync::Mutex,
+};
+
+use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message};
+
+use super::{get_socket_addr, AsyncMavConnection};
+
+#[cfg(not(feature = "signing"))]
+use crate::{read_versioned_msg_async, write_versioned_msg_async};
+#[cfg(feature = "signing")]
+use crate::{
+    read_versioned_msg_async_signed, write_versioned_msg_signed, SigningConfig, SigningData,
+};
+
+pub async fn select_protocol<M: Message + Sync + Send>(
+    address: &str,
+) -> tokio::io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> {
+    let connection = if let Some(address) = address.strip_prefix("udpin:") {
+        udpin(address).await
+    } else if let Some(address) = address.strip_prefix("udpout:") {
+        udpout(address).await
+    } else if let Some(address) = address.strip_prefix("udpbcast:") {
+        udpbcast(address).await
+    } else {
+        Err(tokio::io::Error::new(
+            tokio::io::ErrorKind::AddrNotAvailable,
+            "Protocol unsupported",
+        ))
+    };
+
+    Ok(Box::new(connection?))
+}
+
+pub async fn udpbcast<T: std::net::ToSocketAddrs>(address: T) -> tokio::io::Result<UdpConnection> {
+    let addr = get_socket_addr(address)?;
+    let socket = UdpSocket::bind("0.0.0.0:0").await?;
+    socket
+        .set_broadcast(true)
+        .expect("Couldn't bind to broadcast address.");
+    UdpConnection::new(socket, false, Some(addr))
+}
+
+pub async fn udpout<T: std::net::ToSocketAddrs>(address: T) -> tokio::io::Result<UdpConnection> {
+    let addr = get_socket_addr(address)?;
+    let socket = UdpSocket::bind("0.0.0.0:0").await?;
+    UdpConnection::new(socket, false, Some(addr))
+}
+
+pub async fn udpin<T: std::net::ToSocketAddrs>(address: T) -> tokio::io::Result<UdpConnection> {
+    let addr = address
+        .to_socket_addrs()
+        .unwrap()
+        .next()
+        .expect("Invalid address");
+    let socket = UdpSocket::bind(addr).await?;
+    UdpConnection::new(socket, true, None)
+}
+
+struct UdpRead {
+    socket: Arc<UdpSocket>,
+    buffer: VecDeque<u8>,
+    last_recv_address: Option<std::net::SocketAddr>,
+}
+
+const MTU_SIZE: usize = 1500;
+impl AsyncRead for UdpRead {
+    fn poll_read(
+        mut self: core::pin::Pin<&mut Self>,
+        cx: &mut core::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        if self.buffer.is_empty() {
+            let mut read_buffer = [0u8; MTU_SIZE];
+            let mut read_buffer = ReadBuf::new(&mut read_buffer);
+
+            match self.socket.poll_recv_from(cx, &mut read_buffer) {
+                Poll::Ready(Ok(address)) => {
+                    let n_buffer = read_buffer.filled().len();
+
+                    let n = (&read_buffer.filled()[0..n_buffer]).read(buf.initialize_unfilled())?;
+                    buf.advance(n);
+
+                    self.buffer.extend(&read_buffer.filled()[n..n_buffer]);
+                    self.last_recv_address = Some(address);
+                    Poll::Ready(Ok(()))
+                }
+                Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
+                Poll::Pending => Poll::Pending,
+            }
+        } else {
+            let read_result = self.buffer.read(buf.initialize_unfilled());
+            let result = match read_result {
+                Ok(n) => {
+                    buf.advance(n);
+                    Ok(())
+                }
+                Err(err) => Err(err),
+            };
+            Poll::Ready(result)
+        }
+    }
+}
+
+struct UdpWrite {
+    socket: Arc<UdpSocket>,
+    dest: Option<std::net::SocketAddr>,
+    sequence: u8,
+}
+
+pub struct UdpConnection {
+    reader: Mutex<AsyncPeekReader<UdpRead>>,
+    writer: Mutex<UdpWrite>,
+    protocol_version: MavlinkVersion,
+    server: bool,
+    #[cfg(feature = "signing")]
+    signing_data: Option<SigningData>,
+}
+
+impl UdpConnection {
+    fn new(
+        socket: UdpSocket,
+        server: bool,
+        dest: Option<std::net::SocketAddr>,
+    ) -> tokio::io::Result<Self> {
+        let socket = Arc::new(socket);
+        Ok(Self {
+            server,
+            reader: Mutex::new(AsyncPeekReader::new(UdpRead {
+                socket: socket.clone(),
+                buffer: VecDeque::new(),
+                last_recv_address: None,
+            })),
+            writer: Mutex::new(UdpWrite {
+                socket,
+                dest,
+                sequence: 0,
+            }),
+            protocol_version: MavlinkVersion::V2,
+            #[cfg(feature = "signing")]
+            signing_data: None,
+        })
+    }
+}
+
+#[async_trait::async_trait]
+impl<M: Message + Sync + Send> AsyncMavConnection<M> for UdpConnection {
+    async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
+        let mut reader = self.reader.lock().await;
+
+        loop {
+            #[cfg(not(feature = "signing"))]
+            let result = read_versioned_msg_async(reader.deref_mut(), self.protocol_version).await;
+            #[cfg(feature = "signing")]
+            let result = read_versioned_msg_async_signed(
+                reader.deref_mut(),
+                self.protocol_version,
+                self.signing_data.as_ref(),
+            )
+            .await;
+            if self.server {
+                if let addr @ Some(_) = reader.reader_ref().last_recv_address {
+                    self.writer.lock().await.dest = addr;
+                }
+            }
+            if let ok @ Ok(..) = result {
+                return ok;
+            }
+        }
+    }
+
+    async fn send(
+        &self,
+        header: &MavHeader,
+        data: &M,
+    ) -> Result<usize, crate::error::MessageWriteError> {
+        let mut guard = self.writer.lock().await;
+        let state = &mut *guard;
+
+        let header = MavHeader {
+            sequence: state.sequence,
+            system_id: header.system_id,
+            component_id: header.component_id,
+        };
+
+        state.sequence = state.sequence.wrapping_add(1);
+
+        let len = if let Some(addr) = state.dest {
+            let mut buf = Vec::new();
+            #[cfg(not(feature = "signing"))]
+            write_versioned_msg_async(
+                &mut buf,
+                self.protocol_version,
+                header,
+                data,
+                #[cfg(feature = "signing")]
+                self.signing_data.as_ref(),
+            )
+            .await?;
+            #[cfg(feature = "signing")]
+            write_versioned_msg_signed(
+                &mut buf,
+                self.protocol_version,
+                header,
+                data,
+                #[cfg(feature = "signing")]
+                self.signing_data.as_ref(),
+            )?;
+            state.socket.send_to(&buf, addr).await?
+        } else {
+            0
+        };
+
+        Ok(len)
+    }
+
+    fn set_protocol_version(&mut self, version: MavlinkVersion) {
+        self.protocol_version = version;
+    }
+
+    fn get_protocol_version(&self) -> MavlinkVersion {
+        self.protocol_version
+    }
+
+    #[cfg(feature = "signing")]
+    fn setup_signing(&mut self, signing_data: Option<SigningConfig>) {
+        self.signing_data = signing_data.map(SigningData::from_config)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use tokio::io::AsyncReadExt;
+
+    #[tokio::test]
+    async fn test_datagram_buffering() {
+        let receiver_socket = Arc::new(UdpSocket::bind("127.0.0.1:5001").await.unwrap());
+        let mut udp_reader = UdpRead {
+            socket: receiver_socket.clone(),
+            buffer: VecDeque::new(),
+            last_recv_address: None,
+        };
+        let sender_socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
+        sender_socket.connect("127.0.0.1:5001").await.unwrap();
+
+        let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();
+
+        let mut n_sent = sender_socket.send(&datagram).await.unwrap();
+        assert_eq!(n_sent, datagram.len());
+        n_sent = sender_socket.send(&datagram).await.unwrap();
+        assert_eq!(n_sent, datagram.len());
+
+        let mut buf = [0u8; 30];
+
+        let mut n_read = udp_reader.read(&mut buf).await.unwrap();
+        assert_eq!(n_read, 30);
+        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
+
+        n_read = udp_reader.read(&mut buf).await.unwrap();
+        assert_eq!(n_read, 20);
+        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
+
+        n_read = udp_reader.read(&mut buf).await.unwrap();
+        assert_eq!(n_read, 30);
+        assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());
+
+        n_read = udp_reader.read(&mut buf).await.unwrap();
+        assert_eq!(n_read, 20);
+        assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
+    }
+}
diff --git a/mavlink-core/src/async_peek_reader.rs b/mavlink-core/src/async_peek_reader.rs
new file mode 100644
index 0000000000000000000000000000000000000000..1d6ada2c4bede7bad8c2de9d9040136c1b1a6334
--- /dev/null
+++ b/mavlink-core/src/async_peek_reader.rs
@@ -0,0 +1,147 @@
+//! This module implements a buffered/peekable reader using async I/O.
+//!
+//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
+//!
+//! This is the async version of [`crate::peek_reader::PeekReader`].
+//! A reader implementing the tokio library's [`tokio::io::AsyncBufRead`]/[`tokio::io::AsyncBufReadExt`] traits seems like a good fit, but
+//! it does not allow for peeking a specific number of bytes, so it provides no way to request
+//! more data from the underlying reader without consuming the existing data.
+//!
+//! This API still tries to adhere to the [`tokio::io::AsyncBufRead`]'s trait philosophy.
+//!
+//! The main type [`AsyncPeekReader`] does not implement [`tokio::io::AsyncBufReadExt`] itself, as there is no added benefit
+//! in doing so.
+//!
+use tokio::io::AsyncReadExt;
+
+use crate::error::MessageReadError;
+
+/// A buffered/peekable reader
+///
+/// This reader wraps a type implementing [`tokio::io::AsyncRead`] and adds buffering via an internal buffer.
+///
+/// It allows the user to `peek` a specified number of bytes (without consuming them),
+/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
+///
+/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest
+/// possible message size of 280 bytes
+///
+pub struct AsyncPeekReader<R, const BUFFER_SIZE: usize = 280> {
+    // Internal buffer
+    buffer: [u8; BUFFER_SIZE],
+    // The position of the next byte to read from the buffer.
+    cursor: usize,
+    // The position of the next byte to read into the buffer.
+    top: usize,
+    // The wrapped reader.
+    reader: R,
+}
+
+impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFER_SIZE> {
+    /// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`tokio::io::AsyncReadExt`] and using the default chunk size
+    pub fn new(reader: R) -> Self {
+        Self {
+            buffer: [0; BUFFER_SIZE],
+            cursor: 0,
+            top: 0,
+            reader,
+        }
+    }
+
+    /// Peeks an exact amount of bytes from the internal buffer
+    ///
+    /// If the internal buffer does not contain enough data, this function will read
+    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
+    ///
+    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
+    ///
+    /// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
+    /// will still return the peeked data.
+    ///
+    pub async fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
+        self.fetch(amount, false).await
+    }
+
+    /// Reads a specified amount of bytes from the internal buffer
+    ///
+    /// If the internal buffer does not contain enough data, this function will read
+    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
+    ///
+    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
+    ///
+    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
+    ///
+    pub async fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> {
+        self.fetch(amount, true).await
+    }
+
+    /// Reads a byte from the internal buffer
+    ///
+    /// If the internal buffer does not contain enough data, this function will read
+    /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
+    ///
+    /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
+    ///
+    /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
+    ///
+    pub async fn read_u8(&mut self) -> Result<u8, MessageReadError> {
+        let buf = self.read_exact(1).await?;
+        Ok(buf[0])
+    }
+
+    /// Consumes a specified amount of bytes from the buffer
+    ///
+    /// If the internal buffer does not contain enough data, this function will consume as much data as is buffered.
+    ///
+    pub fn consume(&mut self, amount: usize) -> usize {
+        let amount = amount.min(self.top - self.cursor);
+        self.cursor += amount;
+        amount
+    }
+
+    /// Returns an immutable reference to the underlying [`tokio::io::AsyncRead`]
+    ///
+    /// Reading directly from the underlying stream will cause data loss
+    pub fn reader_ref(&mut self) -> &R {
+        &self.reader
+    }
+
+    /// Returns a mutable reference to the underlying [`tokio::io::AsyncRead`]
+    ///
+    /// Reading directly from the underlying stream will cause data loss
+    pub fn reader_mut(&mut self) -> &mut R {
+        &mut self.reader
+    }
+
+    /// Internal function to fetch data from the internal buffer and/or reader
+    async fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> {
+        let buffered = self.top - self.cursor;
+
+        // the caller requested more bytes than we have buffered, fetch them from the reader
+        if buffered < amount {
+            let bytes_read = amount - buffered;
+            assert!(bytes_read < BUFFER_SIZE);
+            let mut buf = [0u8; BUFFER_SIZE];
+
+            // read needed bytes from reader
+            self.reader.read_exact(&mut buf[..bytes_read]).await?;
+
+            // if some bytes were read, add them to the buffer
+
+            if self.buffer.len() - self.top < bytes_read {
+                // reallocate
+                self.buffer.copy_within(self.cursor..self.top, 0);
+                self.cursor = 0;
+                self.top = buffered;
+            }
+            self.buffer[self.top..self.top + bytes_read].copy_from_slice(&buf[..bytes_read]);
+            self.top += bytes_read;
+        }
+
+        let result = &self.buffer[self.cursor..self.cursor + amount];
+        if consume {
+            self.cursor += amount;
+        }
+        Ok(result)
+    }
+}
diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs
index 2ef674b1048304e9a3d18249bd0021a8092da7f5..d68fe1d21439e719e7f45db44942fcd66c421430 100644
--- a/mavlink-core/src/lib.rs
+++ b/mavlink-core/src/lib.rs
@@ -50,6 +50,16 @@ pub mod error;
 #[cfg(feature = "std")]
 pub use self::connection::{connect, MavConnection};
 
+#[cfg(feature = "tokio-1")]
+mod async_connection;
+#[cfg(feature = "tokio-1")]
+pub use self::async_connection::{connect_async, AsyncMavConnection};
+
+#[cfg(feature = "tokio-1")]
+pub mod async_peek_reader;
+#[cfg(feature = "tokio-1")]
+use async_peek_reader::AsyncPeekReader;
+
 #[cfg(any(feature = "embedded", feature = "embedded-hal-02"))]
 pub mod embedded;
 #[cfg(any(feature = "embedded", feature = "embedded-hal-02"))]
@@ -262,6 +272,17 @@ pub fn read_versioned_msg<M: Message, R: Read>(
     }
 }
 
+#[cfg(feature = "tokio-1")]
+pub async fn read_versioned_msg_async<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
+    r: &mut AsyncPeekReader<R>,
+    version: MavlinkVersion,
+) -> Result<(MavHeader, M), error::MessageReadError> {
+    match version {
+        MavlinkVersion::V2 => read_v2_msg_async(r).await,
+        MavlinkVersion::V1 => read_v1_msg_async(r).await,
+    }
+}
+
 #[cfg(feature = "signing")]
 pub fn read_versioned_msg_signed<M: Message, R: Read>(
     r: &mut PeekReader<R>,
@@ -274,6 +295,18 @@ pub fn read_versioned_msg_signed<M: Message, R: Read>(
     }
 }
 
+#[cfg(all(feature = "tokio-1", feature = "signing"))]
+pub async fn read_versioned_msg_async_signed<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
+    r: &mut AsyncPeekReader<R>,
+    version: MavlinkVersion,
+    signing_data: Option<&SigningData>,
+) -> Result<(MavHeader, M), error::MessageReadError> {
+    match version {
+        MavlinkVersion::V2 => read_v2_msg_async_inner(r, signing_data).await,
+        MavlinkVersion::V1 => read_v1_msg_async(r).await,
+    }
+}
+
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
 // Follow protocol definition: `<https://mavlink.io/en/guide/serialization.html#v1_packet_format>`
 pub struct MAVLinkV1MessageRaw([u8; 1 + Self::HEADER_SIZE + 255 + 2]);
@@ -445,6 +478,42 @@ pub fn read_v1_raw_message<M: Message, R: Read>(
     }
 }
 
+/// Return a raw buffer with the mavlink message
+/// V1 maximum size is 263 bytes: `<https://mavlink.io/en/guide/serialization.html>`
+#[cfg(feature = "tokio-1")]
+pub async fn read_v1_raw_message_async<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
+    reader: &mut AsyncPeekReader<R>,
+) -> Result<MAVLinkV1MessageRaw, error::MessageReadError> {
+    loop {
+        loop {
+            // search for the magic framing value indicating start of mavlink message
+            if reader.read_u8().await? == MAV_STX {
+                break;
+            }
+        }
+
+        let mut message = MAVLinkV1MessageRaw::new();
+
+        message.0[0] = MAV_STX;
+        let header = &reader.peek_exact(MAVLinkV1MessageRaw::HEADER_SIZE).await?
+            [..MAVLinkV1MessageRaw::HEADER_SIZE];
+        message.mut_header().copy_from_slice(header);
+        let packet_length = message.raw_bytes().len() - 1;
+        let payload_and_checksum = &reader.peek_exact(packet_length).await?
+            [MAVLinkV1MessageRaw::HEADER_SIZE..packet_length];
+        message
+            .mut_payload_and_checksum()
+            .copy_from_slice(payload_and_checksum);
+
+        // retry if CRC failed after previous STX
+        // (an STX byte may appear in the middle of a message)
+        if message.has_valid_crc::<M>() {
+            reader.consume(message.raw_bytes().len() - 1);
+            return Ok(message);
+        }
+    }
+}
+
 /// Async read a raw buffer with the mavlink message
 /// V1 maximum size is 263 bytes: `<https://mavlink.io/en/guide/serialization.html>`
 ///
@@ -507,6 +576,27 @@ pub fn read_v1_msg<M: Message, R: Read>(
     ))
 }
 
+/// Read a MAVLink v1 message from a Read stream.
+#[cfg(feature = "tokio-1")]
+pub async fn read_v1_msg_async<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
+    r: &mut AsyncPeekReader<R>,
+) -> Result<(MavHeader, M), error::MessageReadError> {
+    let message = read_v1_raw_message_async::<M, _>(r).await?;
+
+    Ok((
+        MavHeader {
+            sequence: message.sequence(),
+            system_id: message.system_id(),
+            component_id: message.component_id(),
+        },
+        M::parse(
+            MavlinkVersion::V1,
+            u32::from(message.message_id()),
+            message.payload(),
+        )?,
+    ))
+}
+
 /// Async read a MAVLink v1 message from a Read stream.
 ///
 /// NOTE: it will be add ~80KB to firmware flash size because all *_DATA::deser methods will be add to firmware.
@@ -872,7 +962,7 @@ fn read_v2_raw_message_inner<M: Message, R: Read>(
 /// V2 maximum size is 280 bytes: `<https://mavlink.io/en/guide/serialization.html>`
 #[cfg(feature = "tokio-1")]
 pub async fn read_v2_raw_message_async<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
-    reader: &mut R,
+    reader: &mut AsyncPeekReader<R>,
 ) -> Result<MAVLinkV2MessageRaw, error::MessageReadError> {
     read_v2_raw_message_async_inner::<M, R>(reader, None).await
 }
@@ -881,7 +971,7 @@ pub async fn read_v2_raw_message_async<M: Message, R: tokio::io::AsyncReadExt +
 /// V2 maximum size is 280 bytes: `<https://mavlink.io/en/guide/serialization.html>`
 #[cfg(feature = "tokio-1")]
 async fn read_v2_raw_message_async_inner<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
-    reader: &mut R,
+    reader: &mut AsyncPeekReader<R>,
     signing_data: Option<&SigningData>,
 ) -> Result<MAVLinkV2MessageRaw, error::MessageReadError> {
     loop {
@@ -895,19 +985,26 @@ async fn read_v2_raw_message_async_inner<M: Message, R: tokio::io::AsyncReadExt
         let mut message = MAVLinkV2MessageRaw::new();
 
         message.0[0] = MAV_STX_V2;
-        let header_len = reader.read_exact(message.mut_header()).await?;
-        assert_eq!(header_len, MAVLinkV2MessageRaw::HEADER_SIZE);
+        let header = &reader.peek_exact(MAVLinkV2MessageRaw::HEADER_SIZE).await?
+            [..MAVLinkV2MessageRaw::HEADER_SIZE];
+        message.mut_header().copy_from_slice(header);
 
         if message.incompatibility_flags() & !MAVLINK_SUPPORTED_IFLAGS > 0 {
             // if there are incompatibility flags set that we do not know discard the message
             continue;
         }
 
-        reader
-            .read_exact(message.mut_payload_and_checksum_and_sign())
-            .await?;
+        let packet_length = message.raw_bytes().len() - 1;
+        let payload_and_checksum_and_sign = &reader.peek_exact(packet_length).await?
+            [MAVLinkV2MessageRaw::HEADER_SIZE..packet_length];
+        message
+            .mut_payload_and_checksum_and_sign()
+            .copy_from_slice(payload_and_checksum_and_sign);
 
-        if !message.has_valid_crc::<M>() {
+        if message.has_valid_crc::<M>() {
+            // even if the signature turn out to be invalid the valid crc shows that the received data presents a valid message as opposed to random bytes
+            reader.consume(message.raw_bytes().len() - 1);
+        } else {
             continue;
         }
 
@@ -926,7 +1023,7 @@ async fn read_v2_raw_message_async_inner<M: Message, R: tokio::io::AsyncReadExt
 /// V2 maximum size is 280 bytes: `<https://mavlink.io/en/guide/serialization.html>`
 #[cfg(all(feature = "tokio-1", feature = "signing"))]
 pub async fn read_v2_raw_message_async_signed<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
-    reader: &mut R,
+    reader: &mut AsyncPeekReader<R>,
     signing_data: Option<&SigningData>,
 ) -> Result<MAVLinkV2MessageRaw, error::MessageReadError> {
     read_v2_raw_message_async_inner::<M, R>(reader, signing_data).await
@@ -1017,9 +1114,26 @@ fn read_v2_msg_inner<M: Message, R: Read>(
 /// Async read a MAVLink v2  message from a Read stream.
 #[cfg(feature = "tokio-1")]
 pub async fn read_v2_msg_async<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
-    read: &mut R,
+    read: &mut AsyncPeekReader<R>,
+) -> Result<(MavHeader, M), error::MessageReadError> {
+    read_v2_msg_async_inner(read, None).await
+}
+
+/// Async read a MAVLink v2  message from a Read stream.
+#[cfg(all(feature = "tokio-1", feature = "signing"))]
+pub async fn read_v2_msg_async_signed<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
+    read: &mut AsyncPeekReader<R>,
+    signing_data: Option<&SigningData>,
 ) -> Result<(MavHeader, M), error::MessageReadError> {
-    let message = read_v2_raw_message_async::<M, _>(read).await?;
+    read_v2_msg_async_inner(read, signing_data).await
+}
+
+#[cfg(feature = "tokio-1")]
+async fn read_v2_msg_async_inner<M: Message, R: tokio::io::AsyncReadExt + Unpin>(
+    read: &mut AsyncPeekReader<R>,
+    signing_data: Option<&SigningData>,
+) -> Result<(MavHeader, M), error::MessageReadError> {
+    let message = read_v2_raw_message_async_signed::<M, _>(read, signing_data).await?;
 
     Ok((
         MavHeader {
@@ -1097,6 +1211,21 @@ pub async fn write_versioned_msg_async<M: Message, W: tokio::io::AsyncWriteExt +
     }
 }
 
+/// Async write a message with signing support using the given mavlink version
+#[cfg(feature = "tokio-1")]
+pub async fn write_versioned_msg_async_signed<M: Message, W: tokio::io::AsyncWriteExt + Unpin>(
+    w: &mut W,
+    version: MavlinkVersion,
+    header: MavHeader,
+    data: &M,
+    signing_data: Option<&SigningData>,
+) -> Result<usize, error::MessageWriteError> {
+    match version {
+        MavlinkVersion::V2 => write_v2_msg_async_signed(w, header, data, signing_data).await,
+        MavlinkVersion::V1 => write_v1_msg_async(w, header, data).await,
+    }
+}
+
 /// Async write a message using the given mavlink version
 ///
 /// NOTE: it will be add ~70KB to firmware flash size because all *_DATA::ser methods will be add to firmware.
@@ -1143,7 +1272,6 @@ pub fn write_v2_msg_signed<M: Message, W: Write>(
 
     let signature_len = if let Some(signing_data) = signing_data {
         if signing_data.config.sign_outgoing {
-            //header.incompat_flags |= MAVLINK_IFLAG_SIGNED;
             message_raw.serialize_message_for_signing(header, data);
             signing_data.sign_message(&mut message_raw);
             MAVLinkV2MessageRaw::SIGNATURE_SIZE
@@ -1182,6 +1310,39 @@ pub async fn write_v2_msg_async<M: Message, W: tokio::io::AsyncWriteExt + Unpin>
     Ok(len)
 }
 
+/// Write a MAVLink v2 message to a Write stream with signing support.
+#[cfg(feature = "signing")]
+#[cfg(feature = "tokio-1")]
+pub async fn write_v2_msg_async_signed<M: Message, W: tokio::io::AsyncWriteExt + Unpin>(
+    w: &mut W,
+    header: MavHeader,
+    data: &M,
+    signing_data: Option<&SigningData>,
+) -> Result<usize, error::MessageWriteError> {
+    let mut message_raw = MAVLinkV2MessageRaw::new();
+
+    let signature_len = if let Some(signing_data) = signing_data {
+        if signing_data.config.sign_outgoing {
+            message_raw.serialize_message_for_signing(header, data);
+            signing_data.sign_message(&mut message_raw);
+            MAVLinkV2MessageRaw::SIGNATURE_SIZE
+        } else {
+            message_raw.serialize_message(header, data);
+            0
+        }
+    } else {
+        message_raw.serialize_message(header, data);
+        0
+    };
+
+    let payload_length: usize = message_raw.payload_length().into();
+    let len = 1 + MAVLinkV2MessageRaw::HEADER_SIZE + payload_length + 2 + signature_len;
+
+    w.write_all(&message_raw.0[..len]).await?;
+
+    Ok(len)
+}
+
 /// Async write a MAVLink v2 message to a Write stream.
 ///
 /// NOTE: it will be add ~70KB to firmware flash size because all *_DATA::ser methods will be add to firmware.