tokio/net/unix/
listener.rs

1use crate::io::{Interest, PollEvented};
2use crate::net::unix::{SocketAddr, UnixStream};
3use crate::util::check_socket_for_blocking;
4
5use std::fmt;
6use std::io;
7#[cfg(target_os = "android")]
8use std::os::android::net::SocketAddrExt;
9#[cfg(target_os = "linux")]
10use std::os::linux::net::SocketAddrExt;
11#[cfg(any(target_os = "linux", target_os = "android"))]
12use std::os::unix::ffi::OsStrExt;
13use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
14use std::os::unix::net::{self, SocketAddr as StdSocketAddr};
15use std::path::Path;
16use std::task::{ready, Context, Poll};
17
18cfg_net_unix! {
19    /// A Unix socket which can accept connections from other Unix sockets.
20    ///
21    /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method.
22    ///
23    /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`].
24    ///
25    /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html
26    ///
27    /// # Errors
28    ///
29    /// Note that accepting a connection can lead to various errors and not all
30    /// of them are necessarily fatal ‒ for example having too many open file
31    /// descriptors or the other side closing the connection while it waits in
32    /// an accept queue. These would terminate the stream if not handled in any
33    /// way.
34    ///
35    /// # Examples
36    ///
37    /// ```no_run
38    /// use tokio::net::UnixListener;
39    ///
40    /// #[tokio::main]
41    /// async fn main() {
42    ///     let listener = UnixListener::bind("/path/to/the/socket").unwrap();
43    ///     loop {
44    ///         match listener.accept().await {
45    ///             Ok((stream, _addr)) => {
46    ///                 println!("new client!");
47    ///             }
48    ///             Err(e) => { /* connection failed */ }
49    ///         }
50    ///     }
51    /// }
52    /// ```
53    #[cfg_attr(docsrs, doc(alias = "uds"))]
54    pub struct UnixListener {
55        io: PollEvented<mio::net::UnixListener>,
56    }
57}
58
59impl UnixListener {
60    pub(crate) fn new(listener: mio::net::UnixListener) -> io::Result<UnixListener> {
61        let io = PollEvented::new(listener)?;
62        Ok(UnixListener { io })
63    }
64
65    /// Creates a new `UnixListener` bound to the specified path.
66    ///
67    /// # Panics
68    ///
69    /// This function panics if it is not called from within a runtime with
70    /// IO enabled.
71    ///
72    /// The runtime is usually set implicitly when this function is called
73    /// from a future driven by a tokio runtime, otherwise runtime can be set
74    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
75    #[track_caller]
76    pub fn bind<P>(path: P) -> io::Result<UnixListener>
77    where
78        P: AsRef<Path>,
79    {
80        // For now, we handle abstract socket paths on linux here.
81        #[cfg(any(target_os = "linux", target_os = "android"))]
82        let addr = {
83            let os_str_bytes = path.as_ref().as_os_str().as_bytes();
84            if os_str_bytes.starts_with(b"\0") {
85                StdSocketAddr::from_abstract_name(&os_str_bytes[1..])?
86            } else {
87                StdSocketAddr::from_pathname(path)?
88            }
89        };
90        #[cfg(not(any(target_os = "linux", target_os = "android")))]
91        let addr = StdSocketAddr::from_pathname(path)?;
92
93        let listener = mio::net::UnixListener::bind_addr(&addr)?;
94        let io = PollEvented::new(listener)?;
95        Ok(UnixListener { io })
96    }
97
98    /// Creates new [`UnixListener`] from a [`std::os::unix::net::UnixListener`].
99    ///
100    /// This function is intended to be used to wrap a `UnixListener` from the
101    /// standard library in the Tokio equivalent.
102    ///
103    /// # Notes
104    ///
105    /// The caller is responsible for ensuring that the listener is in
106    /// non-blocking mode. Otherwise all I/O operations on the listener
107    /// will block the thread, which will cause unexpected behavior.
108    /// Non-blocking mode can be set using [`set_nonblocking`].
109    ///
110    /// Passing a listener in blocking mode is always erroneous,
111    /// and the behavior in that case may change in the future.
112    /// For example, it could panic.
113    ///
114    /// [`set_nonblocking`]: std::os::unix::net::UnixListener::set_nonblocking
115    ///
116    /// # Examples
117    ///
118    /// ```no_run
119    /// use tokio::net::UnixListener;
120    /// use std::os::unix::net::UnixListener as StdUnixListener;
121    /// # use std::error::Error;
122    ///
123    /// # async fn dox() -> Result<(), Box<dyn Error>> {
124    /// let std_listener = StdUnixListener::bind("/path/to/the/socket")?;
125    /// std_listener.set_nonblocking(true)?;
126    /// let listener = UnixListener::from_std(std_listener)?;
127    /// # Ok(())
128    /// # }
129    /// ```
130    ///
131    /// # Panics
132    ///
133    /// This function panics if it is not called from within a runtime with
134    /// IO enabled.
135    ///
136    /// The runtime is usually set implicitly when this function is called
137    /// from a future driven by a tokio runtime, otherwise runtime can be set
138    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
139    #[track_caller]
140    pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
141        check_socket_for_blocking(&listener)?;
142
143        let listener = mio::net::UnixListener::from_std(listener);
144        let io = PollEvented::new(listener)?;
145        Ok(UnixListener { io })
146    }
147
148    /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
149    ///
150    /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode
151    /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
152    ///
153    /// # Examples
154    ///
155    /// ```rust,no_run
156    /// # use std::error::Error;
157    /// # async fn dox() -> Result<(), Box<dyn Error>> {
158    /// let tokio_listener = tokio::net::UnixListener::bind("/path/to/the/socket")?;
159    /// let std_listener = tokio_listener.into_std()?;
160    /// std_listener.set_nonblocking(false)?;
161    /// # Ok(())
162    /// # }
163    /// ```
164    ///
165    /// [`tokio::net::UnixListener`]: UnixListener
166    /// [`std::os::unix::net::UnixListener`]: std::os::unix::net::UnixListener
167    /// [`set_nonblocking`]: fn@std::os::unix::net::UnixListener::set_nonblocking
168    pub fn into_std(self) -> io::Result<std::os::unix::net::UnixListener> {
169        self.io
170            .into_inner()
171            .map(IntoRawFd::into_raw_fd)
172            .map(|raw_fd| unsafe { net::UnixListener::from_raw_fd(raw_fd) })
173    }
174
175    /// Returns the local socket address of this listener.
176    pub fn local_addr(&self) -> io::Result<SocketAddr> {
177        self.io.local_addr().map(SocketAddr)
178    }
179
180    /// Returns the value of the `SO_ERROR` option.
181    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
182        self.io.take_error()
183    }
184
185    /// Accepts a new incoming connection to this listener.
186    ///
187    /// # Cancel safety
188    ///
189    /// This method is cancel safe. If the method is used as the event in a
190    /// [`tokio::select!`](crate::select) statement and some other branch
191    /// completes first, then it is guaranteed that no new connections were
192    /// accepted by this method.
193    pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
194        let (mio, addr) = self
195            .io
196            .registration()
197            .async_io(Interest::READABLE, || self.io.accept())
198            .await?;
199
200        let addr = SocketAddr(addr);
201        let stream = UnixStream::new(mio)?;
202        Ok((stream, addr))
203    }
204
205    /// Polls to accept a new incoming connection to this listener.
206    ///
207    /// If there is no connection to accept, `Poll::Pending` is returned and the
208    /// current task will be notified by a waker.  Note that on multiple calls
209    /// to `poll_accept`, only the `Waker` from the `Context` passed to the most
210    /// recent call is scheduled to receive a wakeup.
211    pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
212        let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?;
213        let addr = SocketAddr(addr);
214        let sock = UnixStream::new(sock)?;
215        Poll::Ready(Ok((sock, addr)))
216    }
217}
218
219impl TryFrom<std::os::unix::net::UnixListener> for UnixListener {
220    type Error = io::Error;
221
222    /// Consumes stream, returning the tokio I/O object.
223    ///
224    /// This is equivalent to
225    /// [`UnixListener::from_std(stream)`](UnixListener::from_std).
226    fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> {
227        Self::from_std(stream)
228    }
229}
230
231impl fmt::Debug for UnixListener {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        self.io.fmt(f)
234    }
235}
236
237impl AsRawFd for UnixListener {
238    fn as_raw_fd(&self) -> RawFd {
239        self.io.as_raw_fd()
240    }
241}
242
243impl AsFd for UnixListener {
244    fn as_fd(&self) -> BorrowedFd<'_> {
245        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
246    }
247}