Struct tokio::io::unix::AsyncFd

source ·
pub struct AsyncFd<T: AsRawFd> { /* private fields */ }
Expand description

Associates an IO object backed by a Unix file descriptor with the tokio reactor, allowing for readiness to be polled. The file descriptor must be of a type that can be used with the OS polling facilities (ie, poll, epoll, kqueue, etc), such as a network socket or pipe, and the file descriptor must have the nonblocking mode set to true.

Creating an AsyncFd registers the file descriptor with the current tokio Reactor, allowing you to directly await the file descriptor being readable or writable. Once registered, the file descriptor remains registered until the AsyncFd is dropped.

The AsyncFd takes ownership of an arbitrary object to represent the IO object. It is intended that this object will handle closing the file descriptor when it is dropped, avoiding resource leaks and ensuring that the AsyncFd can clean up the registration before closing the file descriptor. The AsyncFd::into_inner function can be used to extract the inner object to retake control from the tokio IO reactor.

The inner object is required to implement AsRawFd. This file descriptor must not change while AsyncFd owns the inner object, i.e. the AsRawFd::as_raw_fd method on the inner type must always return the same file descriptor when called multiple times. Failure to uphold this results in unspecified behavior in the IO driver, which may include breaking notifications for other sockets/etc.

Polling for readiness is done by calling the async functions readable and writable. These functions complete when the associated readiness condition is observed. Any number of tasks can query the same AsyncFd in parallel, on the same or different conditions.

On some platforms, the readiness detecting mechanism relies on edge-triggered notifications. This means that the OS will only notify Tokio when the file descriptor transitions from not-ready to ready. For this to work you should first try to read or write and only poll for readiness if that fails with an error of std::io::ErrorKind::WouldBlock.

Tokio internally tracks when it has received a ready notification, and when readiness checking functions like readable and writable are called, if the readiness flag is set, these async functions will complete immediately. This however does mean that it is critical to ensure that this ready flag is cleared when (and only when) the file descriptor ceases to be ready. The AsyncFdReadyGuard returned from readiness checking functions serves this function; after calling a readiness-checking async function, you must use this AsyncFdReadyGuard to signal to tokio whether the file descriptor is no longer in a ready state.

Use with to a poll-based API

In some cases it may be desirable to use AsyncFd from APIs similar to TcpStream::poll_read_ready. The AsyncFd::poll_read_ready and AsyncFd::poll_write_ready functions are provided for this purpose. Because these functions don’t create a future to hold their state, they have the limitation that only one task can wait on each direction (read or write) at a time.

Examples

This example shows how to turn std::net::TcpStream asynchronous using AsyncFd. It implements the read/write operations both as an async fn and using the IO traits AsyncRead and AsyncWrite.

use futures::ready;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::unix::AsyncFd;

pub struct AsyncTcpStream {
    inner: AsyncFd<TcpStream>,
}

impl AsyncTcpStream {
    pub fn new(tcp: TcpStream) -> io::Result<Self> {
        tcp.set_nonblocking(true)?;
        Ok(Self {
            inner: AsyncFd::new(tcp)?,
        })
    }

    pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
        loop {
            let mut guard = self.inner.readable().await?;

            match guard.try_io(|inner| inner.get_ref().read(out)) {
                Ok(result) => return result,
                Err(_would_block) => continue,
            }
        }
    }

    pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
        loop {
            let mut guard = self.inner.writable().await?;

            match guard.try_io(|inner| inner.get_ref().write(buf)) {
                Ok(result) => return result,
                Err(_would_block) => continue,
            }
        }
    }
}

impl AsyncRead for AsyncTcpStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>
    ) -> Poll<io::Result<()>> {
        loop {
            let mut guard = ready!(self.inner.poll_read_ready(cx))?;

            let unfilled = buf.initialize_unfilled();
            match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
                Ok(Ok(len)) => {
                    buf.advance(len);
                    return Poll::Ready(Ok(()));
                },
                Ok(Err(err)) => return Poll::Ready(Err(err)),
                Err(_would_block) => continue,
            }
        }
    }
}

impl AsyncWrite for AsyncTcpStream {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<io::Result<usize>> {
        loop {
            let mut guard = ready!(self.inner.poll_write_ready(cx))?;

            match guard.try_io(|inner| inner.get_ref().write(buf)) {
                Ok(result) => return Poll::Ready(result),
                Err(_would_block) => continue,
            }
        }
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        // tcp flush is a no-op
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
        Poll::Ready(Ok(()))
    }
}

Implementations§

source§

impl<T: AsRawFd> AsyncFd<T>

source

pub fn new(inner: T) -> Result<Self>
where T: AsRawFd,

Creates an AsyncFd backed by (and taking ownership of) an object implementing AsRawFd. The backing file descriptor is cached at the time of creation.

Only configures the Interest::READABLE and Interest::WRITABLE interests. For more control, use AsyncFd::with_interest.

This method must be called in the context of a tokio runtime.

Panics

This function panics if there is no current reactor set, or if the rt feature flag is not enabled.

source

pub fn with_interest(inner: T, interest: Interest) -> Result<Self>
where T: AsRawFd,

Creates an AsyncFd backed by (and taking ownership of) an object implementing AsRawFd, with a specific Interest. The backing file descriptor is cached at the time of creation.

Panics

This function panics if there is no current reactor set, or if the rt feature flag is not enabled.

source

pub fn get_ref(&self) -> &T

Returns a shared reference to the backing object of this AsyncFd.

source

pub fn get_mut(&mut self) -> &mut T

Returns a mutable reference to the backing object of this AsyncFd.

source

pub fn into_inner(self) -> T

Deregisters this file descriptor and returns ownership of the backing object.

source

pub fn poll_read_ready<'a>( &'a self, cx: &mut Context<'_> ) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>

Polls for read readiness.

If the file descriptor is not currently ready for reading, this method will store a clone of the Waker from the provided Context. When the file descriptor becomes ready for reading, Waker::wake will be called.

Note that on multiple calls to poll_read_ready or poll_read_ready_mut, only the Waker from the Context passed to the most recent call is scheduled to receive a wakeup. (However, poll_write_ready retains a second, independent waker).

This method is intended for cases where creating and pinning a future via readable is not feasible. Where possible, using readable is preferred, as this supports polling from multiple tasks at once.

This method takes &self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling the AsyncFdReadyGuard.

source

pub fn poll_read_ready_mut<'a>( &'a mut self, cx: &mut Context<'_> ) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>

Polls for read readiness.

If the file descriptor is not currently ready for reading, this method will store a clone of the Waker from the provided Context. When the file descriptor becomes ready for reading, Waker::wake will be called.

Note that on multiple calls to poll_read_ready or poll_read_ready_mut, only the Waker from the Context passed to the most recent call is scheduled to receive a wakeup. (However, poll_write_ready retains a second, independent waker).

This method is intended for cases where creating and pinning a future via readable is not feasible. Where possible, using readable is preferred, as this supports polling from multiple tasks at once.

This method takes &mut self, so it is possible to access the inner IO resource mutably when handling the AsyncFdReadyMutGuard.

source

pub fn poll_write_ready<'a>( &'a self, cx: &mut Context<'_> ) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>

Polls for write readiness.

If the file descriptor is not currently ready for writing, this method will store a clone of the Waker from the provided Context. When the file descriptor becomes ready for writing, Waker::wake will be called.

Note that on multiple calls to poll_write_ready or poll_write_ready_mut, only the Waker from the Context passed to the most recent call is scheduled to receive a wakeup. (However, poll_read_ready retains a second, independent waker).

This method is intended for cases where creating and pinning a future via writable is not feasible. Where possible, using writable is preferred, as this supports polling from multiple tasks at once.

This method takes &self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling the AsyncFdReadyGuard.

source

pub fn poll_write_ready_mut<'a>( &'a mut self, cx: &mut Context<'_> ) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>

Polls for write readiness.

If the file descriptor is not currently ready for writing, this method will store a clone of the Waker from the provided Context. When the file descriptor becomes ready for writing, Waker::wake will be called.

Note that on multiple calls to poll_write_ready or poll_write_ready_mut, only the Waker from the Context passed to the most recent call is scheduled to receive a wakeup. (However, poll_read_ready retains a second, independent waker).

This method is intended for cases where creating and pinning a future via writable is not feasible. Where possible, using writable is preferred, as this supports polling from multiple tasks at once.

This method takes &mut self, so it is possible to access the inner IO resource mutably when handling the AsyncFdReadyMutGuard.

source

pub async fn ready( &self, interest: Interest ) -> Result<AsyncFdReadyGuard<'_, T>>

Waits for any of the requested ready states, returning a AsyncFdReadyGuard that must be dropped to resume polling for the requested ready states.

The function may complete without the file descriptor being ready. This is a false-positive and attempting an operation will return with io::ErrorKind::WouldBlock. The function can also return with an empty Ready set, so you should always check the returned value and possibly wait again if the requested states are not set.

When an IO operation does return io::ErrorKind::WouldBlock, the readiness must be cleared. When a combined interest is used, it is important to clear only the readiness that is actually observed to block. For instance when the combined interest Interest::READABLE | Interest::WRITABLE is used, and a read blocks, only read readiness should be cleared using the AsyncFdReadyGuard::clear_ready_matching method: guard.clear_ready_matching(Ready::READABLE). Also clearing the write readiness in this case would be incorrect. The AsyncFdReadyGuard::clear_ready method clears all readiness flags.

This method takes &self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling the AsyncFdReadyGuard.

Examples

Concurrently read and write to a std::net::TcpStream on the same task without splitting.

use std::error::Error;
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;
use tokio::io::unix::AsyncFd;
use tokio::io::{Interest, Ready};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let stream = TcpStream::connect("127.0.0.1:8080")?;
    stream.set_nonblocking(true)?;
    let stream = AsyncFd::new(stream)?;

    loop {
        let mut guard = stream
            .ready(Interest::READABLE | Interest::WRITABLE)
            .await?;

        if guard.ready().is_readable() {
            let mut data = vec![0; 1024];
            // Try to read data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match stream.get_ref().read(&mut data) {
                Ok(n) => {
                    println!("read {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // a read has blocked, but a write might still succeed.
                    // clear only the read readiness.
                    guard.clear_ready_matching(Ready::READABLE);
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }

        if guard.ready().is_writable() {
            // Try to write data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match stream.get_ref().write(b"hello world") {
                Ok(n) => {
                    println!("write {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // a write has blocked, but a read might still succeed.
                    // clear only the write readiness.
                    guard.clear_ready_matching(Ready::WRITABLE);
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }
    }
}
source

pub async fn ready_mut( &mut self, interest: Interest ) -> Result<AsyncFdReadyMutGuard<'_, T>>

Waits for any of the requested ready states, returning a AsyncFdReadyMutGuard that must be dropped to resume polling for the requested ready states.

The function may complete without the file descriptor being ready. This is a false-positive and attempting an operation will return with io::ErrorKind::WouldBlock. The function can also return with an empty Ready set, so you should always check the returned value and possibly wait again if the requested states are not set.

When an IO operation does return io::ErrorKind::WouldBlock, the readiness must be cleared. When a combined interest is used, it is important to clear only the readiness that is actually observed to block. For instance when the combined interest Interest::READABLE | Interest::WRITABLE is used, and a read blocks, only read readiness should be cleared using the AsyncFdReadyMutGuard::clear_ready_matching method: guard.clear_ready_matching(Ready::READABLE). Also clearing the write readiness in this case would be incorrect. The AsyncFdReadyMutGuard::clear_ready method clears all readiness flags.

This method takes &mut self, so it is possible to access the inner IO resource mutably when handling the AsyncFdReadyMutGuard.

Examples

Concurrently read and write to a std::net::TcpStream on the same task without splitting.

use std::error::Error;
use std::io;
use std::io::{Read, Write};
use std::net::TcpStream;
use tokio::io::unix::AsyncFd;
use tokio::io::{Interest, Ready};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let stream = TcpStream::connect("127.0.0.1:8080")?;
    stream.set_nonblocking(true)?;
    let mut stream = AsyncFd::new(stream)?;

    loop {
        let mut guard = stream
            .ready_mut(Interest::READABLE | Interest::WRITABLE)
            .await?;

        if guard.ready().is_readable() {
            let mut data = vec![0; 1024];
            // Try to read data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match guard.get_inner_mut().read(&mut data) {
                Ok(n) => {
                    println!("read {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // a read has blocked, but a write might still succeed.
                    // clear only the read readiness.
                    guard.clear_ready_matching(Ready::READABLE);
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }

        if guard.ready().is_writable() {
            // Try to write data, this may still fail with `WouldBlock`
            // if the readiness event is a false positive.
            match guard.get_inner_mut().write(b"hello world") {
                Ok(n) => {
                    println!("write {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // a write has blocked, but a read might still succeed.
                    // clear only the write readiness.
                    guard.clear_ready_matching(Ready::WRITABLE);
                    continue;
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }
    }
}
source

pub async fn readable<'a>(&'a self) -> Result<AsyncFdReadyGuard<'a, T>>

Waits for the file descriptor to become readable, returning a AsyncFdReadyGuard that must be dropped to resume read-readiness polling.

This method takes &self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling the AsyncFdReadyGuard.

source

pub async fn readable_mut<'a>( &'a mut self ) -> Result<AsyncFdReadyMutGuard<'a, T>>

Waits for the file descriptor to become readable, returning a AsyncFdReadyMutGuard that must be dropped to resume read-readiness polling.

This method takes &mut self, so it is possible to access the inner IO resource mutably when handling the AsyncFdReadyMutGuard.

source

pub async fn writable<'a>(&'a self) -> Result<AsyncFdReadyGuard<'a, T>>

Waits for the file descriptor to become writable, returning a AsyncFdReadyGuard that must be dropped to resume write-readiness polling.

This method takes &self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling the AsyncFdReadyGuard.

source

pub async fn writable_mut<'a>( &'a mut self ) -> Result<AsyncFdReadyMutGuard<'a, T>>

Waits for the file descriptor to become writable, returning a AsyncFdReadyMutGuard that must be dropped to resume write-readiness polling.

This method takes &mut self, so it is possible to access the inner IO resource mutably when handling the AsyncFdReadyMutGuard.

source

pub async fn async_io<R>( &self, interest: Interest, f: impl FnMut(&T) -> Result<R> ) -> Result<R>

Reads or writes from the file descriptor using a user-provided IO operation.

The async_io method is a convenience utility that waits for the file descriptor to become ready, and then executes the provided IO operation. Since file descriptors may be marked ready spuriously, the closure will be called repeatedly until it returns something other than a WouldBlock error. This is done using the following loop:

async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
    loop {
        // or `readable` if called with the read interest.
        let guard = self.writable().await?;

        match guard.try_io(&mut f) {
            Ok(result) => return result,
            Err(_would_block) => continue,
        }
    }
}

The closure should only return a WouldBlock error if it has performed an IO operation on the file descriptor that failed due to the file descriptor not being ready. Returning a WouldBlock error in any other situation will incorrectly clear the readiness flag, which can cause the file descriptor to behave incorrectly.

The closure should not perform the IO operation using any of the methods defined on the Tokio AsyncFd type, as this will mess with the readiness flag and can cause the file descriptor to behave incorrectly.

This method is not intended to be used with combined interests. The closure should perform only one type of IO operation, so it should not require more than one ready state. This method may panic or sleep forever if it is called with a combined interest.

Examples

This example sends some bytes on the inner std::net::UdpSocket. The async_io method waits for readiness, and retries if the send operation does block. This example is equivalent to the one given for try_io.

use tokio::io::{Interest, unix::AsyncFd};

use std::io;
use std::net::UdpSocket;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = UdpSocket::bind("0.0.0.0:8080")?;
    socket.set_nonblocking(true)?;
    let async_fd = AsyncFd::new(socket)?;

    let written = async_fd
        .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
        .await?;

    println!("wrote {written} bytes");

    Ok(())
}
source

pub async fn async_io_mut<R>( &mut self, interest: Interest, f: impl FnMut(&mut T) -> Result<R> ) -> Result<R>

Reads or writes from the file descriptor using a user-provided IO operation.

The behavior is the same as async_io, except that the closure can mutate the inner value of the AsyncFd.

Trait Implementations§

source§

impl<T: AsRawFd> AsFd for AsyncFd<T>

source§

fn as_fd(&self) -> BorrowedFd<'_>

Borrows the file descriptor. Read more
source§

impl<T: AsRawFd> AsRawFd for AsyncFd<T>

source§

fn as_raw_fd(&self) -> RawFd

Extracts the raw file descriptor. Read more
source§

impl<T: Debug + AsRawFd> Debug for AsyncFd<T>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T: AsRawFd> Drop for AsyncFd<T>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<T> !RefUnwindSafe for AsyncFd<T>

§

impl<T> Send for AsyncFd<T>
where T: Send,

§

impl<T> Sync for AsyncFd<T>
where T: Sync,

§

impl<T> Unpin for AsyncFd<T>
where T: Unpin,

§

impl<T> !UnwindSafe for AsyncFd<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.