polling/
epoll.rs

1//! Bindings to epoll (Linux, Android).
2
3use std::io;
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
5use std::time::Duration;
6
7#[cfg(not(target_os = "redox"))]
8use rustix::event::{eventfd, EventfdFlags};
9#[cfg(not(target_os = "redox"))]
10use rustix::time::{
11    timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
12    Timespec,
13};
14
15use rustix::event::epoll;
16use rustix::fd::OwnedFd;
17use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
18use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
19use rustix::pipe::{pipe, pipe_with, PipeFlags};
20
21use crate::{Event, PollMode};
22
23/// Interface to epoll.
24#[derive(Debug)]
25pub struct Poller {
26    /// File descriptor for the epoll instance.
27    epoll_fd: OwnedFd,
28
29    /// Notifier used to wake up epoll.
30    notifier: Notifier,
31
32    /// File descriptor for the timerfd that produces timeouts.
33    ///
34    /// Redox does not support timerfd.
35    #[cfg(not(target_os = "redox"))]
36    timer_fd: Option<OwnedFd>,
37}
38
39impl Poller {
40    /// Creates a new poller.
41    pub fn new() -> io::Result<Poller> {
42        // Create an epoll instance.
43        //
44        // Use `epoll_create1` with `EPOLL_CLOEXEC`.
45        let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
46
47        // Set up notifier and timerfd.
48        let notifier = Notifier::new()?;
49        #[cfg(not(target_os = "redox"))]
50        let timer_fd = timerfd_create(
51            TimerfdClockId::Monotonic,
52            TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
53        )
54        .ok();
55
56        let poller = Poller {
57            epoll_fd,
58            notifier,
59            #[cfg(not(target_os = "redox"))]
60            timer_fd,
61        };
62
63        unsafe {
64            #[cfg(not(target_os = "redox"))]
65            if let Some(ref timer_fd) = poller.timer_fd {
66                poller.add(
67                    timer_fd.as_raw_fd(),
68                    Event::none(crate::NOTIFY_KEY),
69                    PollMode::Oneshot,
70                )?;
71            }
72
73            poller.add(
74                poller.notifier.as_fd().as_raw_fd(),
75                Event::readable(crate::NOTIFY_KEY),
76                PollMode::Oneshot,
77            )?;
78        }
79
80        tracing::trace!(
81            epoll_fd = ?poller.epoll_fd.as_raw_fd(),
82            notifier = ?poller.notifier,
83            "new",
84        );
85        Ok(poller)
86    }
87
88    /// Whether this poller supports level-triggered events.
89    pub fn supports_level(&self) -> bool {
90        true
91    }
92
93    /// Whether the poller supports edge-triggered events.
94    pub fn supports_edge(&self) -> bool {
95        true
96    }
97
98    /// Adds a new file descriptor.
99    ///
100    /// # Safety
101    ///
102    /// The `fd` must be a valid file descriptor. The usual condition of remaining registered in
103    /// the `Poller` doesn't apply to `epoll`.
104    pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
105        let span = tracing::trace_span!(
106            "add",
107            epoll_fd = ?self.epoll_fd.as_raw_fd(),
108            ?fd,
109            ?ev,
110        );
111        let _enter = span.enter();
112
113        epoll::add(
114            &self.epoll_fd,
115            unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
116            epoll::EventData::new_u64(ev.key as u64),
117            epoll_flags(&ev, mode) | ev.extra.flags,
118        )?;
119
120        Ok(())
121    }
122
123    /// Modifies an existing file descriptor.
124    pub fn modify(&self, fd: BorrowedFd<'_>, ev: Event, mode: PollMode) -> io::Result<()> {
125        let span = tracing::trace_span!(
126            "modify",
127            epoll_fd = ?self.epoll_fd.as_raw_fd(),
128            ?fd,
129            ?ev,
130        );
131        let _enter = span.enter();
132
133        epoll::modify(
134            &self.epoll_fd,
135            fd,
136            epoll::EventData::new_u64(ev.key as u64),
137            epoll_flags(&ev, mode) | ev.extra.flags,
138        )?;
139
140        Ok(())
141    }
142
143    /// Deletes a file descriptor.
144    pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
145        let span = tracing::trace_span!(
146            "delete",
147            epoll_fd = ?self.epoll_fd.as_raw_fd(),
148            ?fd,
149        );
150        let _enter = span.enter();
151
152        epoll::delete(&self.epoll_fd, fd)?;
153
154        Ok(())
155    }
156
157    /// Waits for I/O events with an optional timeout.
158    #[allow(clippy::needless_update)]
159    pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
160        let span = tracing::trace_span!(
161            "wait",
162            epoll_fd = ?self.epoll_fd.as_raw_fd(),
163            ?timeout,
164        );
165        let _enter = span.enter();
166
167        #[cfg(not(target_os = "redox"))]
168        if let Some(ref timer_fd) = self.timer_fd {
169            // Configure the timeout using timerfd.
170            let new_val = Itimerspec {
171                it_interval: TS_ZERO,
172                it_value: match timeout {
173                    None => TS_ZERO,
174                    Some(t) => {
175                        let mut ts = TS_ZERO;
176                        ts.tv_sec = t.as_secs() as _;
177                        ts.tv_nsec = t.subsec_nanos() as _;
178                        ts
179                    }
180                },
181                ..unsafe { std::mem::zeroed() }
182            };
183
184            timerfd_settime(timer_fd, TimerfdTimerFlags::empty(), &new_val)?;
185
186            // Set interest in timerfd.
187            self.modify(
188                timer_fd.as_fd(),
189                Event::readable(crate::NOTIFY_KEY),
190                PollMode::Oneshot,
191            )?;
192        }
193
194        #[cfg(not(target_os = "redox"))]
195        let timer_fd = &self.timer_fd;
196        #[cfg(target_os = "redox")]
197        let timer_fd: Option<core::convert::Infallible> = None;
198
199        // Timeout in milliseconds for epoll.
200        let timeout_ms = match (timer_fd, timeout) {
201            (_, Some(t)) if t == Duration::from_secs(0) => 0,
202            (None, Some(t)) => {
203                // Round up to a whole millisecond.
204                let mut ms = t.as_millis().try_into().unwrap_or(i32::MAX);
205                if Duration::from_millis(ms as u64) < t {
206                    ms = ms.saturating_add(1);
207                }
208                ms
209            }
210            _ => -1,
211        };
212
213        // Wait for I/O events.
214        epoll::wait(&self.epoll_fd, &mut events.list, timeout_ms)?;
215        tracing::trace!(
216            epoll_fd = ?self.epoll_fd.as_raw_fd(),
217            res = ?events.list.len(),
218            "new events",
219        );
220
221        // Clear the notification (if received) and re-register interest in it.
222        self.notifier.clear();
223        self.modify(
224            self.notifier.as_fd(),
225            Event::readable(crate::NOTIFY_KEY),
226            PollMode::Oneshot,
227        )?;
228        Ok(())
229    }
230
231    /// Sends a notification to wake up the current or next `wait()` call.
232    pub fn notify(&self) -> io::Result<()> {
233        let span = tracing::trace_span!(
234            "notify",
235            epoll_fd = ?self.epoll_fd.as_raw_fd(),
236            notifier = ?self.notifier,
237        );
238        let _enter = span.enter();
239
240        self.notifier.notify();
241        Ok(())
242    }
243}
244
245impl AsRawFd for Poller {
246    fn as_raw_fd(&self) -> RawFd {
247        self.epoll_fd.as_raw_fd()
248    }
249}
250
251impl AsFd for Poller {
252    fn as_fd(&self) -> BorrowedFd<'_> {
253        self.epoll_fd.as_fd()
254    }
255}
256
257impl Drop for Poller {
258    fn drop(&mut self) {
259        let span = tracing::trace_span!(
260            "drop",
261            epoll_fd = ?self.epoll_fd.as_raw_fd(),
262            notifier = ?self.notifier,
263        );
264        let _enter = span.enter();
265
266        #[cfg(not(target_os = "redox"))]
267        if let Some(timer_fd) = self.timer_fd.take() {
268            let _ = self.delete(timer_fd.as_fd());
269        }
270        let _ = self.delete(self.notifier.as_fd());
271    }
272}
273
274/// `timespec` value that equals zero.
275#[cfg(not(target_os = "redox"))]
276const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
277
278/// Get the EPOLL flags for the interest.
279fn epoll_flags(interest: &Event, mode: PollMode) -> epoll::EventFlags {
280    let mut flags = match mode {
281        PollMode::Oneshot => epoll::EventFlags::ONESHOT,
282        PollMode::Level => epoll::EventFlags::empty(),
283        PollMode::Edge => epoll::EventFlags::ET,
284        PollMode::EdgeOneshot => epoll::EventFlags::ET | epoll::EventFlags::ONESHOT,
285    };
286    if interest.readable {
287        flags |= read_flags();
288    }
289    if interest.writable {
290        flags |= write_flags();
291    }
292    flags
293}
294
295/// Epoll flags for all possible readability events.
296fn read_flags() -> epoll::EventFlags {
297    use epoll::EventFlags as Epoll;
298    Epoll::IN | Epoll::HUP | Epoll::ERR | Epoll::PRI
299}
300
301/// Epoll flags for all possible writability events.
302fn write_flags() -> epoll::EventFlags {
303    use epoll::EventFlags as Epoll;
304    Epoll::OUT | Epoll::HUP | Epoll::ERR
305}
306
307/// A list of reported I/O events.
308pub struct Events {
309    list: epoll::EventVec,
310}
311
312unsafe impl Send for Events {}
313
314impl Events {
315    /// Creates an empty list.
316    pub fn with_capacity(cap: usize) -> Events {
317        Events {
318            list: epoll::EventVec::with_capacity(cap),
319        }
320    }
321
322    /// Iterates over I/O events.
323    pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
324        self.list.iter().map(|ev| {
325            let flags = ev.flags;
326            Event {
327                key: ev.data.u64() as usize,
328                readable: flags.intersects(read_flags()),
329                writable: flags.intersects(write_flags()),
330                extra: EventExtra { flags },
331            }
332        })
333    }
334
335    /// Clear the list.
336    pub fn clear(&mut self) {
337        self.list.clear();
338    }
339
340    /// Get the capacity of the list.
341    pub fn capacity(&self) -> usize {
342        self.list.capacity()
343    }
344}
345
346/// Extra information about this event.
347#[derive(Debug, Clone, Copy, PartialEq, Eq)]
348pub struct EventExtra {
349    flags: epoll::EventFlags,
350}
351
352impl EventExtra {
353    /// Create an empty version of the data.
354    #[inline]
355    pub const fn empty() -> EventExtra {
356        EventExtra {
357            flags: epoll::EventFlags::empty(),
358        }
359    }
360
361    /// Add the interrupt flag to this event.
362    #[inline]
363    pub fn set_hup(&mut self, active: bool) {
364        self.flags.set(epoll::EventFlags::HUP, active);
365    }
366
367    /// Add the priority flag to this event.
368    #[inline]
369    pub fn set_pri(&mut self, active: bool) {
370        self.flags.set(epoll::EventFlags::PRI, active);
371    }
372
373    /// Tell if the interrupt flag is set.
374    #[inline]
375    pub fn is_hup(&self) -> bool {
376        self.flags.contains(epoll::EventFlags::HUP)
377    }
378
379    /// Tell if the priority flag is set.
380    #[inline]
381    pub fn is_pri(&self) -> bool {
382        self.flags.contains(epoll::EventFlags::PRI)
383    }
384
385    #[inline]
386    pub fn is_connect_failed(&self) -> Option<bool> {
387        Some(
388            self.flags.contains(epoll::EventFlags::ERR)
389                && self.flags.contains(epoll::EventFlags::HUP),
390        )
391    }
392
393    #[inline]
394    pub fn is_err(&self) -> Option<bool> {
395        Some(self.flags.contains(epoll::EventFlags::ERR))
396    }
397}
398
399/// The notifier for Linux.
400///
401/// Certain container runtimes do not expose eventfd to the client, as it relies on the host and
402/// can be used to "escape" the container under certain conditions. Gramine is the prime example,
403/// see [here](gramine). In this case, fall back to using a pipe.
404///
405/// [gramine]: https://gramine.readthedocs.io/en/stable/manifest-syntax.html#allowing-eventfd
406#[derive(Debug)]
407enum Notifier {
408    /// The primary notifier, using eventfd.
409    #[cfg(not(target_os = "redox"))]
410    EventFd(OwnedFd),
411
412    /// The fallback notifier, using a pipe.
413    Pipe {
414        /// The read end of the pipe.
415        read_pipe: OwnedFd,
416
417        /// The write end of the pipe.
418        write_pipe: OwnedFd,
419    },
420}
421
422impl Notifier {
423    /// Create a new notifier.
424    fn new() -> io::Result<Self> {
425        // Skip eventfd for testing if necessary.
426        #[cfg(not(target_os = "redox"))]
427        {
428            if !cfg!(polling_test_epoll_pipe) {
429                // Try to create an eventfd.
430                match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
431                    Ok(fd) => {
432                        tracing::trace!("created eventfd for notifier");
433                        return Ok(Notifier::EventFd(fd));
434                    }
435
436                    Err(err) => {
437                        tracing::warn!(
438                            "eventfd() failed with error ({}), falling back to pipe",
439                            err
440                        );
441                    }
442                }
443            }
444        }
445
446        let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
447            let (read, write) = pipe()?;
448            fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?;
449            fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?;
450            io::Result::Ok((read, write))
451        })?;
452
453        fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?;
454        Ok(Notifier::Pipe {
455            read_pipe: read,
456            write_pipe: write,
457        })
458    }
459
460    /// The file descriptor to register in the poller.
461    fn as_fd(&self) -> BorrowedFd<'_> {
462        match self {
463            #[cfg(not(target_os = "redox"))]
464            Notifier::EventFd(fd) => fd.as_fd(),
465            Notifier::Pipe {
466                read_pipe: read, ..
467            } => read.as_fd(),
468        }
469    }
470
471    /// Notify the poller.
472    fn notify(&self) {
473        match self {
474            #[cfg(not(target_os = "redox"))]
475            Self::EventFd(fd) => {
476                let buf: [u8; 8] = 1u64.to_ne_bytes();
477                let _ = write(fd, &buf);
478            }
479
480            Self::Pipe { write_pipe, .. } => {
481                write(write_pipe, &[0; 1]).ok();
482            }
483        }
484    }
485
486    /// Clear the notification.
487    fn clear(&self) {
488        match self {
489            #[cfg(not(target_os = "redox"))]
490            Self::EventFd(fd) => {
491                let mut buf = [0u8; 8];
492                let _ = read(fd, &mut buf);
493            }
494
495            Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {},
496        }
497    }
498}