1use std::io;
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
5use std::time::Duration;
6
7#[cfg(not(target_os = "redox"))]
8use rustix::event::{eventfd, EventfdFlags};
9#[cfg(not(target_os = "redox"))]
10use rustix::time::{
11 timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
12 Timespec,
13};
14
15use rustix::event::epoll;
16use rustix::fd::OwnedFd;
17use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
18use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
19use rustix::pipe::{pipe, pipe_with, PipeFlags};
20
21use crate::{Event, PollMode};
22
23#[derive(Debug)]
25pub struct Poller {
26 epoll_fd: OwnedFd,
28
29 notifier: Notifier,
31
32 #[cfg(not(target_os = "redox"))]
36 timer_fd: Option<OwnedFd>,
37}
38
39impl Poller {
40 pub fn new() -> io::Result<Poller> {
42 let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
46
47 let notifier = Notifier::new()?;
49 #[cfg(not(target_os = "redox"))]
50 let timer_fd = timerfd_create(
51 TimerfdClockId::Monotonic,
52 TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
53 )
54 .ok();
55
56 let poller = Poller {
57 epoll_fd,
58 notifier,
59 #[cfg(not(target_os = "redox"))]
60 timer_fd,
61 };
62
63 unsafe {
64 #[cfg(not(target_os = "redox"))]
65 if let Some(ref timer_fd) = poller.timer_fd {
66 poller.add(
67 timer_fd.as_raw_fd(),
68 Event::none(crate::NOTIFY_KEY),
69 PollMode::Oneshot,
70 )?;
71 }
72
73 poller.add(
74 poller.notifier.as_fd().as_raw_fd(),
75 Event::readable(crate::NOTIFY_KEY),
76 PollMode::Oneshot,
77 )?;
78 }
79
80 tracing::trace!(
81 epoll_fd = ?poller.epoll_fd.as_raw_fd(),
82 notifier = ?poller.notifier,
83 "new",
84 );
85 Ok(poller)
86 }
87
88 pub fn supports_level(&self) -> bool {
90 true
91 }
92
93 pub fn supports_edge(&self) -> bool {
95 true
96 }
97
98 pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
105 let span = tracing::trace_span!(
106 "add",
107 epoll_fd = ?self.epoll_fd.as_raw_fd(),
108 ?fd,
109 ?ev,
110 );
111 let _enter = span.enter();
112
113 epoll::add(
114 &self.epoll_fd,
115 unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
116 epoll::EventData::new_u64(ev.key as u64),
117 epoll_flags(&ev, mode) | ev.extra.flags,
118 )?;
119
120 Ok(())
121 }
122
123 pub fn modify(&self, fd: BorrowedFd<'_>, ev: Event, mode: PollMode) -> io::Result<()> {
125 let span = tracing::trace_span!(
126 "modify",
127 epoll_fd = ?self.epoll_fd.as_raw_fd(),
128 ?fd,
129 ?ev,
130 );
131 let _enter = span.enter();
132
133 epoll::modify(
134 &self.epoll_fd,
135 fd,
136 epoll::EventData::new_u64(ev.key as u64),
137 epoll_flags(&ev, mode) | ev.extra.flags,
138 )?;
139
140 Ok(())
141 }
142
143 pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
145 let span = tracing::trace_span!(
146 "delete",
147 epoll_fd = ?self.epoll_fd.as_raw_fd(),
148 ?fd,
149 );
150 let _enter = span.enter();
151
152 epoll::delete(&self.epoll_fd, fd)?;
153
154 Ok(())
155 }
156
157 #[allow(clippy::needless_update)]
159 pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
160 let span = tracing::trace_span!(
161 "wait",
162 epoll_fd = ?self.epoll_fd.as_raw_fd(),
163 ?timeout,
164 );
165 let _enter = span.enter();
166
167 #[cfg(not(target_os = "redox"))]
168 if let Some(ref timer_fd) = self.timer_fd {
169 let new_val = Itimerspec {
171 it_interval: TS_ZERO,
172 it_value: match timeout {
173 None => TS_ZERO,
174 Some(t) => {
175 let mut ts = TS_ZERO;
176 ts.tv_sec = t.as_secs() as _;
177 ts.tv_nsec = t.subsec_nanos() as _;
178 ts
179 }
180 },
181 ..unsafe { std::mem::zeroed() }
182 };
183
184 timerfd_settime(timer_fd, TimerfdTimerFlags::empty(), &new_val)?;
185
186 self.modify(
188 timer_fd.as_fd(),
189 Event::readable(crate::NOTIFY_KEY),
190 PollMode::Oneshot,
191 )?;
192 }
193
194 #[cfg(not(target_os = "redox"))]
195 let timer_fd = &self.timer_fd;
196 #[cfg(target_os = "redox")]
197 let timer_fd: Option<core::convert::Infallible> = None;
198
199 let timeout_ms = match (timer_fd, timeout) {
201 (_, Some(t)) if t == Duration::from_secs(0) => 0,
202 (None, Some(t)) => {
203 let mut ms = t.as_millis().try_into().unwrap_or(i32::MAX);
205 if Duration::from_millis(ms as u64) < t {
206 ms = ms.saturating_add(1);
207 }
208 ms
209 }
210 _ => -1,
211 };
212
213 epoll::wait(&self.epoll_fd, &mut events.list, timeout_ms)?;
215 tracing::trace!(
216 epoll_fd = ?self.epoll_fd.as_raw_fd(),
217 res = ?events.list.len(),
218 "new events",
219 );
220
221 self.notifier.clear();
223 self.modify(
224 self.notifier.as_fd(),
225 Event::readable(crate::NOTIFY_KEY),
226 PollMode::Oneshot,
227 )?;
228 Ok(())
229 }
230
231 pub fn notify(&self) -> io::Result<()> {
233 let span = tracing::trace_span!(
234 "notify",
235 epoll_fd = ?self.epoll_fd.as_raw_fd(),
236 notifier = ?self.notifier,
237 );
238 let _enter = span.enter();
239
240 self.notifier.notify();
241 Ok(())
242 }
243}
244
245impl AsRawFd for Poller {
246 fn as_raw_fd(&self) -> RawFd {
247 self.epoll_fd.as_raw_fd()
248 }
249}
250
251impl AsFd for Poller {
252 fn as_fd(&self) -> BorrowedFd<'_> {
253 self.epoll_fd.as_fd()
254 }
255}
256
257impl Drop for Poller {
258 fn drop(&mut self) {
259 let span = tracing::trace_span!(
260 "drop",
261 epoll_fd = ?self.epoll_fd.as_raw_fd(),
262 notifier = ?self.notifier,
263 );
264 let _enter = span.enter();
265
266 #[cfg(not(target_os = "redox"))]
267 if let Some(timer_fd) = self.timer_fd.take() {
268 let _ = self.delete(timer_fd.as_fd());
269 }
270 let _ = self.delete(self.notifier.as_fd());
271 }
272}
273
274#[cfg(not(target_os = "redox"))]
276const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
277
278fn epoll_flags(interest: &Event, mode: PollMode) -> epoll::EventFlags {
280 let mut flags = match mode {
281 PollMode::Oneshot => epoll::EventFlags::ONESHOT,
282 PollMode::Level => epoll::EventFlags::empty(),
283 PollMode::Edge => epoll::EventFlags::ET,
284 PollMode::EdgeOneshot => epoll::EventFlags::ET | epoll::EventFlags::ONESHOT,
285 };
286 if interest.readable {
287 flags |= read_flags();
288 }
289 if interest.writable {
290 flags |= write_flags();
291 }
292 flags
293}
294
295fn read_flags() -> epoll::EventFlags {
297 use epoll::EventFlags as Epoll;
298 Epoll::IN | Epoll::HUP | Epoll::ERR | Epoll::PRI
299}
300
301fn write_flags() -> epoll::EventFlags {
303 use epoll::EventFlags as Epoll;
304 Epoll::OUT | Epoll::HUP | Epoll::ERR
305}
306
307pub struct Events {
309 list: epoll::EventVec,
310}
311
312unsafe impl Send for Events {}
313
314impl Events {
315 pub fn with_capacity(cap: usize) -> Events {
317 Events {
318 list: epoll::EventVec::with_capacity(cap),
319 }
320 }
321
322 pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
324 self.list.iter().map(|ev| {
325 let flags = ev.flags;
326 Event {
327 key: ev.data.u64() as usize,
328 readable: flags.intersects(read_flags()),
329 writable: flags.intersects(write_flags()),
330 extra: EventExtra { flags },
331 }
332 })
333 }
334
335 pub fn clear(&mut self) {
337 self.list.clear();
338 }
339
340 pub fn capacity(&self) -> usize {
342 self.list.capacity()
343 }
344}
345
346#[derive(Debug, Clone, Copy, PartialEq, Eq)]
348pub struct EventExtra {
349 flags: epoll::EventFlags,
350}
351
352impl EventExtra {
353 #[inline]
355 pub const fn empty() -> EventExtra {
356 EventExtra {
357 flags: epoll::EventFlags::empty(),
358 }
359 }
360
361 #[inline]
363 pub fn set_hup(&mut self, active: bool) {
364 self.flags.set(epoll::EventFlags::HUP, active);
365 }
366
367 #[inline]
369 pub fn set_pri(&mut self, active: bool) {
370 self.flags.set(epoll::EventFlags::PRI, active);
371 }
372
373 #[inline]
375 pub fn is_hup(&self) -> bool {
376 self.flags.contains(epoll::EventFlags::HUP)
377 }
378
379 #[inline]
381 pub fn is_pri(&self) -> bool {
382 self.flags.contains(epoll::EventFlags::PRI)
383 }
384
385 #[inline]
386 pub fn is_connect_failed(&self) -> Option<bool> {
387 Some(
388 self.flags.contains(epoll::EventFlags::ERR)
389 && self.flags.contains(epoll::EventFlags::HUP),
390 )
391 }
392
393 #[inline]
394 pub fn is_err(&self) -> Option<bool> {
395 Some(self.flags.contains(epoll::EventFlags::ERR))
396 }
397}
398
399#[derive(Debug)]
407enum Notifier {
408 #[cfg(not(target_os = "redox"))]
410 EventFd(OwnedFd),
411
412 Pipe {
414 read_pipe: OwnedFd,
416
417 write_pipe: OwnedFd,
419 },
420}
421
422impl Notifier {
423 fn new() -> io::Result<Self> {
425 #[cfg(not(target_os = "redox"))]
427 {
428 if !cfg!(polling_test_epoll_pipe) {
429 match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
431 Ok(fd) => {
432 tracing::trace!("created eventfd for notifier");
433 return Ok(Notifier::EventFd(fd));
434 }
435
436 Err(err) => {
437 tracing::warn!(
438 "eventfd() failed with error ({}), falling back to pipe",
439 err
440 );
441 }
442 }
443 }
444 }
445
446 let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
447 let (read, write) = pipe()?;
448 fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?;
449 fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?;
450 io::Result::Ok((read, write))
451 })?;
452
453 fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?;
454 Ok(Notifier::Pipe {
455 read_pipe: read,
456 write_pipe: write,
457 })
458 }
459
460 fn as_fd(&self) -> BorrowedFd<'_> {
462 match self {
463 #[cfg(not(target_os = "redox"))]
464 Notifier::EventFd(fd) => fd.as_fd(),
465 Notifier::Pipe {
466 read_pipe: read, ..
467 } => read.as_fd(),
468 }
469 }
470
471 fn notify(&self) {
473 match self {
474 #[cfg(not(target_os = "redox"))]
475 Self::EventFd(fd) => {
476 let buf: [u8; 8] = 1u64.to_ne_bytes();
477 let _ = write(fd, &buf);
478 }
479
480 Self::Pipe { write_pipe, .. } => {
481 write(write_pipe, &[0; 1]).ok();
482 }
483 }
484 }
485
486 fn clear(&self) {
488 match self {
489 #[cfg(not(target_os = "redox"))]
490 Self::EventFd(fd) => {
491 let mut buf = [0u8; 8];
492 let _ = read(fd, &mut buf);
493 }
494
495 Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {},
496 }
497 }
498}