futures_executor/local_pool.rs
1use crate::enter;
2use futures_core::future::Future;
3use futures_core::stream::Stream;
4use futures_core::task::{Context, Poll};
5use futures_task::{waker_ref, ArcWake};
6use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
7use futures_util::pin_mut;
8use futures_util::stream::FuturesUnordered;
9use futures_util::stream::StreamExt;
10use std::cell::RefCell;
11use std::ops::{Deref, DerefMut};
12use std::rc::{Rc, Weak};
13use std::sync::{
14    atomic::{AtomicBool, Ordering},
15    Arc,
16};
17use std::thread::{self, Thread};
18use std::vec::Vec;
19
20/// A single-threaded task pool for polling futures to completion.
21///
22/// This executor allows you to multiplex any number of tasks onto a single
23/// thread. It's appropriate to poll strictly I/O-bound futures that do very
24/// little work in between I/O actions.
25///
26/// To get a handle to the pool that implements
27/// [`Spawn`](futures_task::Spawn), use the
28/// [`spawner()`](LocalPool::spawner) method. Because the executor is
29/// single-threaded, it supports a special form of task spawning for non-`Send`
30/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
31#[derive(Debug)]
32pub struct LocalPool {
33    pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
34    incoming: Rc<Incoming>,
35}
36
37/// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn).
38#[derive(Clone, Debug)]
39pub struct LocalSpawner {
40    incoming: Weak<Incoming>,
41}
42
43type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
44
45pub(crate) struct ThreadNotify {
46    /// The (single) executor thread.
47    thread: Thread,
48    /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
49    /// before the next `park()`, which may otherwise happen if the code
50    /// being executed as part of the future(s) being polled makes use of
51    /// park / unpark calls of its own, i.e. we cannot assume that no other
52    /// code uses park / unpark on the executing `thread`.
53    unparked: AtomicBool,
54}
55
56std::thread_local! {
57    static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
58        thread: thread::current(),
59        unparked: AtomicBool::new(false),
60    });
61}
62
63impl ArcWake for ThreadNotify {
64    fn wake_by_ref(arc_self: &Arc<Self>) {
65        // Make sure the wakeup is remembered until the next `park()`.
66        let unparked = arc_self.unparked.swap(true, Ordering::Release);
67        if !unparked {
68            // If the thread has not been unparked yet, it must be done
69            // now. If it was actually parked, it will run again,
70            // otherwise the token made available by `unpark`
71            // may be consumed before reaching `park()`, but `unparked`
72            // ensures it is not forgotten.
73            arc_self.thread.unpark();
74        }
75    }
76}
77
78// Set up and run a basic single-threaded spawner loop, invoking `f` on each
79// turn.
80fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
81    let _enter = enter().expect(
82        "cannot execute `LocalPool` executor from within \
83         another executor",
84    );
85
86    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
87        let waker = waker_ref(thread_notify);
88        let mut cx = Context::from_waker(&waker);
89        loop {
90            if let Poll::Ready(t) = f(&mut cx) {
91                return t;
92            }
93
94            // Wait for a wakeup.
95            while !thread_notify.unparked.swap(false, Ordering::Acquire) {
96                // No wakeup occurred. It may occur now, right before parking,
97                // but in that case the token made available by `unpark()`
98                // is guaranteed to still be available and `park()` is a no-op.
99                thread::park();
100            }
101        }
102    })
103}
104
105/// Check for a wakeup, but don't consume it.
106fn woken() -> bool {
107    CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
108}
109
110impl LocalPool {
111    /// Create a new, empty pool of tasks.
112    pub fn new() -> Self {
113        Self { pool: FuturesUnordered::new(), incoming: Default::default() }
114    }
115
116    /// Get a clonable handle to the pool as a [`Spawn`].
117    pub fn spawner(&self) -> LocalSpawner {
118        LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
119    }
120
121    /// Run all tasks in the pool to completion.
122    ///
123    /// ```
124    /// use futures::executor::LocalPool;
125    ///
126    /// let mut pool = LocalPool::new();
127    ///
128    /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
129    ///
130    /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
131    /// pool.run();
132    /// ```
133    ///
134    /// The function will block the calling thread until *all* tasks in the pool
135    /// are complete, including any spawned while running existing tasks.
136    pub fn run(&mut self) {
137        run_executor(|cx| self.poll_pool(cx))
138    }
139
140    /// Runs all the tasks in the pool until the given future completes.
141    ///
142    /// ```
143    /// use futures::executor::LocalPool;
144    ///
145    /// let mut pool = LocalPool::new();
146    /// # let my_app  = async {};
147    ///
148    /// // run tasks in the pool until `my_app` completes
149    /// pool.run_until(my_app);
150    /// ```
151    ///
152    /// The function will block the calling thread *only* until the future `f`
153    /// completes; there may still be incomplete tasks in the pool, which will
154    /// be inert after the call completes, but can continue with further use of
155    /// one of the pool's run or poll methods. While the function is running,
156    /// however, all tasks in the pool will try to make progress.
157    pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
158        pin_mut!(future);
159
160        run_executor(|cx| {
161            {
162                // if our main task is done, so are we
163                let result = future.as_mut().poll(cx);
164                if let Poll::Ready(output) = result {
165                    return Poll::Ready(output);
166                }
167            }
168
169            let _ = self.poll_pool(cx);
170            Poll::Pending
171        })
172    }
173
174    /// Runs all tasks and returns after completing one future or until no more progress
175    /// can be made. Returns `true` if one future was completed, `false` otherwise.
176    ///
177    /// ```
178    /// use futures::executor::LocalPool;
179    /// use futures::task::LocalSpawnExt;
180    /// use futures::future::{ready, pending};
181    ///
182    /// let mut pool = LocalPool::new();
183    /// let spawner = pool.spawner();
184    ///
185    /// spawner.spawn_local(ready(())).unwrap();
186    /// spawner.spawn_local(ready(())).unwrap();
187    /// spawner.spawn_local(pending()).unwrap();
188    ///
189    /// // Run the two ready tasks and return true for them.
190    /// pool.try_run_one(); // returns true after completing one of the ready futures
191    /// pool.try_run_one(); // returns true after completing the other ready future
192    ///
193    /// // the remaining task can not be completed
194    /// assert!(!pool.try_run_one()); // returns false
195    /// ```
196    ///
197    /// This function will not block the calling thread and will return the moment
198    /// that there are no tasks left for which progress can be made or after exactly one
199    /// task was completed; Remaining incomplete tasks in the pool can continue with
200    /// further use of one of the pool's run or poll methods.
201    /// Though only one task will be completed, progress may be made on multiple tasks.
202    pub fn try_run_one(&mut self) -> bool {
203        run_executor(|cx| {
204            loop {
205                self.drain_incoming();
206
207                match self.pool.poll_next_unpin(cx) {
208                    // Success!
209                    Poll::Ready(Some(())) => return Poll::Ready(true),
210                    // The pool was empty.
211                    Poll::Ready(None) => return Poll::Ready(false),
212                    Poll::Pending => (),
213                }
214
215                if !self.incoming.borrow().is_empty() {
216                    // New tasks were spawned; try again.
217                    continue;
218                } else if woken() {
219                    // The pool yielded to us, but there's more progress to be made.
220                    return Poll::Pending;
221                } else {
222                    return Poll::Ready(false);
223                }
224            }
225        })
226    }
227
228    /// Runs all tasks in the pool and returns if no more progress can be made
229    /// on any task.
230    ///
231    /// ```
232    /// use futures::executor::LocalPool;
233    /// use futures::task::LocalSpawnExt;
234    /// use futures::future::{ready, pending};
235    ///
236    /// let mut pool = LocalPool::new();
237    /// let spawner = pool.spawner();
238    ///
239    /// spawner.spawn_local(ready(())).unwrap();
240    /// spawner.spawn_local(ready(())).unwrap();
241    /// spawner.spawn_local(pending()).unwrap();
242    ///
243    /// // Runs the two ready task and returns.
244    /// // The empty task remains in the pool.
245    /// pool.run_until_stalled();
246    /// ```
247    ///
248    /// This function will not block the calling thread and will return the moment
249    /// that there are no tasks left for which progress can be made;
250    /// remaining incomplete tasks in the pool can continue with further use of one
251    /// of the pool's run or poll methods. While the function is running, all tasks
252    /// in the pool will try to make progress.
253    pub fn run_until_stalled(&mut self) {
254        run_executor(|cx| match self.poll_pool(cx) {
255            // The pool is empty.
256            Poll::Ready(()) => Poll::Ready(()),
257            Poll::Pending => {
258                if woken() {
259                    Poll::Pending
260                } else {
261                    // We're stalled for now.
262                    Poll::Ready(())
263                }
264            }
265        });
266    }
267
268    /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
269    /// Repeat until either the pool is empty, or it returns `Pending`.
270    ///
271    /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
272    ///
273    /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
274    /// mean that the pool can't make progress.
275    fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
276        loop {
277            self.drain_incoming();
278
279            let pool_ret = self.pool.poll_next_unpin(cx);
280
281            // We queued up some new tasks; add them and poll again.
282            if !self.incoming.borrow().is_empty() {
283                continue;
284            }
285
286            match pool_ret {
287                Poll::Ready(Some(())) => continue,
288                Poll::Ready(None) => return Poll::Ready(()),
289                Poll::Pending => return Poll::Pending,
290            }
291        }
292    }
293
294    /// Empty the incoming queue of newly-spawned tasks.
295    fn drain_incoming(&mut self) {
296        let mut incoming = self.incoming.borrow_mut();
297        for task in incoming.drain(..) {
298            self.pool.push(task)
299        }
300    }
301}
302
303impl Default for LocalPool {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309/// Run a future to completion on the current thread.
310///
311/// This function will block the caller until the given future has completed.
312///
313/// Use a [`LocalPool`] if you need finer-grained control over spawned tasks.
314pub fn block_on<F: Future>(f: F) -> F::Output {
315    pin_mut!(f);
316    run_executor(|cx| f.as_mut().poll(cx))
317}
318
319/// Turn a stream into a blocking iterator.
320///
321/// When `next` is called on the resulting `BlockingStream`, the caller
322/// will be blocked until the next element of the `Stream` becomes available.
323pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
324    BlockingStream { stream }
325}
326
327/// An iterator which blocks on values from a stream until they become available.
328#[derive(Debug)]
329pub struct BlockingStream<S: Stream + Unpin> {
330    stream: S,
331}
332
333impl<S: Stream + Unpin> Deref for BlockingStream<S> {
334    type Target = S;
335    fn deref(&self) -> &Self::Target {
336        &self.stream
337    }
338}
339
340impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
341    fn deref_mut(&mut self) -> &mut Self::Target {
342        &mut self.stream
343    }
344}
345
346impl<S: Stream + Unpin> BlockingStream<S> {
347    /// Convert this `BlockingStream` into the inner `Stream` type.
348    pub fn into_inner(self) -> S {
349        self.stream
350    }
351}
352
353impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
354    type Item = S::Item;
355
356    fn next(&mut self) -> Option<Self::Item> {
357        LocalPool::new().run_until(self.stream.next())
358    }
359
360    fn size_hint(&self) -> (usize, Option<usize>) {
361        self.stream.size_hint()
362    }
363}
364
365impl Spawn for LocalSpawner {
366    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
367        if let Some(incoming) = self.incoming.upgrade() {
368            incoming.borrow_mut().push(future.into());
369            Ok(())
370        } else {
371            Err(SpawnError::shutdown())
372        }
373    }
374
375    fn status(&self) -> Result<(), SpawnError> {
376        if self.incoming.upgrade().is_some() {
377            Ok(())
378        } else {
379            Err(SpawnError::shutdown())
380        }
381    }
382}
383
384impl LocalSpawn for LocalSpawner {
385    fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
386        if let Some(incoming) = self.incoming.upgrade() {
387            incoming.borrow_mut().push(future);
388            Ok(())
389        } else {
390            Err(SpawnError::shutdown())
391        }
392    }
393
394    fn status_local(&self) -> Result<(), SpawnError> {
395        if self.incoming.upgrade().is_some() {
396            Ok(())
397        } else {
398            Err(SpawnError::shutdown())
399        }
400    }
401}