mz_timely_util/
builder_async.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types to build async operators with general shapes.
17
18use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use differential_dataflow::containers::{Columnation, TimelyStack};
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Tee;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{
37    InputHandleCore, OperatorInfo, OutputHandleCore, OutputWrapper,
38};
39use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
40use timely::dataflow::{Scope, StreamCore};
41use timely::progress::{Antichain, Timestamp};
42use timely::scheduling::{Activator, SyncActivator};
43use timely::{Bincode, Container, PartialOrder};
44
45use crate::containers::stack::AccountedStackBuilder;
46
47/// Builds async operators with generic shape.
48pub struct OperatorBuilder<G: Scope> {
49    builder: OperatorBuilderRc<G>,
50    /// The activator for this operator
51    activator: Activator,
52    /// The waker set up to activate this timely operator when woken
53    operator_waker: Arc<TimelyWaker>,
54    /// The currently known upper frontier of each of the input handles.
55    input_frontiers: Vec<Antichain<G::Timestamp>>,
56    /// Input queues for each of the declared inputs of the operator.
57    input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
58    /// Holds type erased closures that flush an output handle when called. These handles will be
59    /// automatically drained when the operator is scheduled after the logic future has been polled
60    output_flushes: Vec<Box<dyn FnMut()>>,
61    /// A handle to check whether all workers have pressed the shutdown button.
62    shutdown_handle: ButtonHandle,
63    /// A button to coordinate shutdown of this operator among workers.
64    shutdown_button: Button,
65}
66
67/// A helper trait abstracting over an input handle. It facilitates keeping around type erased
68/// handles for each of the operator inputs.
69trait InputQueue<T: Timestamp> {
70    /// Accepts all available input into local queues.
71    fn accept_input(&mut self);
72
73    /// Drains all available input and empties the local queue.
74    fn drain_input(&mut self);
75
76    /// Registers a frontier notification to be delivered.
77    fn notify_progress(&mut self, upper: Antichain<T>);
78}
79
80impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
81where
82    T: Timestamp,
83    D: Container,
84    C: InputConnection<T> + 'static,
85    P: Pull<Message<T, D>> + 'static,
86{
87    fn accept_input(&mut self) {
88        let mut queue = self.queue.borrow_mut();
89        let mut new_data = false;
90        while let Some((cap, data)) = self.handle.next() {
91            new_data = true;
92            let cap = self.connection.accept(cap);
93            queue.push_back(Event::Data(cap, std::mem::take(data)));
94        }
95        if new_data {
96            if let Some(waker) = self.waker.take() {
97                waker.wake();
98            }
99        }
100    }
101
102    fn drain_input(&mut self) {
103        self.queue.borrow_mut().clear();
104        self.handle.for_each(|_, _| {});
105    }
106
107    fn notify_progress(&mut self, upper: Antichain<T>) {
108        let mut queue = self.queue.borrow_mut();
109        // It's beneficial to consolidate two consecutive progress statements into one if the
110        // operator hasn't seen the previous progress yet. This also avoids accumulation of
111        // progress statements in the queue if the operator only conditionally checks this input.
112        match queue.back_mut() {
113            Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
114            _ => queue.push_back(Event::Progress(upper)),
115        }
116        if let Some(waker) = self.waker.take() {
117            waker.wake();
118        }
119    }
120}
121
122struct InputHandleQueue<
123    T: Timestamp,
124    D: Container,
125    C: InputConnection<T>,
126    P: Pull<Message<T, D>> + 'static,
127> {
128    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
129    waker: Rc<Cell<Option<Waker>>>,
130    connection: C,
131    handle: InputHandleCore<T, D, P>,
132}
133
134/// An async Waker that activates a specific operator when woken and marks the task as ready
135struct TimelyWaker {
136    activator: SyncActivator,
137    active: AtomicBool,
138    task_ready: AtomicBool,
139}
140
141impl ArcWake for TimelyWaker {
142    fn wake_by_ref(arc_self: &Arc<Self>) {
143        arc_self.task_ready.store(true, Ordering::SeqCst);
144        // Only activate the timely operator if it's not already active to avoid an infinite loop
145        if !arc_self.active.load(Ordering::SeqCst) {
146            // We don't have any guarantees about how long the Waker will be held for and so we
147            // must be prepared for the receiving end to have hung up when we finally do get woken.
148            // This can happen if by the time the waker is called the receiving timely worker has
149            // been shutdown. For this reason we ignore the activation error.
150            let _ = arc_self.activator.activate();
151        }
152    }
153}
154
155/// Async handle to an operator's input stream
156pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
157    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
158    waker: Rc<Cell<Option<Waker>>>,
159    /// Whether this handle has finished producing data
160    done: bool,
161}
162
163impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
164    pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
165        let mut queue = self.queue.borrow_mut();
166        match queue.pop_front()? {
167            Event::Data(cap, data) => Some(Event::Data(cap, data)),
168            Event::Progress(frontier) => {
169                self.done = frontier.is_empty();
170                Some(Event::Progress(frontier))
171            }
172        }
173    }
174
175    /// Waits for the handle to have data. After this function returns it is guaranteed that at
176    /// least one call to `next_sync` will be `Some(_)`.
177    pub async fn ready(&self) {
178        std::future::poll_fn(|cx| self.poll_ready(cx)).await
179    }
180
181    fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
182        if self.queue.borrow().is_empty() {
183            self.waker.set(Some(cx.waker().clone()));
184            Poll::Pending
185        } else {
186            Poll::Ready(())
187        }
188    }
189}
190
191impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
192    type Item = Event<T, C::Capability, D>;
193
194    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195        if self.done {
196            return Poll::Ready(None);
197        }
198        ready!(self.poll_ready(cx));
199        Poll::Ready(self.next_sync())
200    }
201
202    fn size_hint(&self) -> (usize, Option<usize>) {
203        (self.queue.borrow().len(), None)
204    }
205}
206
207/// An event of an input stream
208#[derive(Debug)]
209pub enum Event<T: Timestamp, C, D> {
210    /// A data event
211    Data(C, D),
212    /// A progress event
213    Progress(Antichain<T>),
214}
215
216pub struct AsyncOutputHandle<
217    T: Timestamp,
218    CB: ContainerBuilder,
219    P: Push<Message<T, CB::Container>> + 'static,
220> {
221    // The field order is important here as the handle is borrowing from the wrapper. See also the
222    // safety argument in the constructor
223    handle: Rc<RefCell<OutputHandleCore<'static, T, CB, P>>>,
224    wrapper: Rc<Pin<Box<OutputWrapper<T, CB, P>>>>,
225    index: usize,
226}
227
228impl<T, C, P> AsyncOutputHandle<T, CapacityContainerBuilder<C>, P>
229where
230    T: Timestamp,
231    C: Container + Clone + 'static,
232    P: Push<Message<T, C>> + 'static,
233{
234    #[inline]
235    pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
236        let mut handle = self.handle.borrow_mut();
237        handle.session_with_builder(cap).give_container(container);
238    }
239}
240
241impl<T, CB, P> AsyncOutputHandle<T, CB, P>
242where
243    T: Timestamp,
244    CB: ContainerBuilder,
245    P: Push<Message<T, CB::Container>> + 'static,
246{
247    fn new(wrapper: OutputWrapper<T, CB, P>, index: usize) -> Self {
248        let mut wrapper = Rc::new(Box::pin(wrapper));
249        // SAFETY:
250        // get_unchecked_mut is safe because we are not moving the wrapper
251        //
252        // transmute is safe because:
253        // * We're erasing the lifetime but we guarantee through field order that the handle will
254        //   be dropped before the wrapper, thus manually enforcing the lifetime.
255        // * We never touch wrapper again after this point
256        let handle = unsafe {
257            let handle = Rc::get_mut(&mut wrapper)
258                .unwrap()
259                .as_mut()
260                .get_unchecked_mut()
261                .activate();
262            std::mem::transmute::<OutputHandleCore<'_, T, CB, P>, OutputHandleCore<'static, T, CB, P>>(
263                handle,
264            )
265        };
266        Self {
267            wrapper,
268            handle: Rc::new(RefCell::new(handle)),
269            index,
270        }
271    }
272
273    fn cease(&self) {
274        self.handle.borrow_mut().cease()
275    }
276}
277
278impl<T, C, P> AsyncOutputHandle<T, CapacityContainerBuilder<C>, P>
279where
280    T: Timestamp,
281    C: Container + Clone + 'static,
282    P: Push<Message<T, C>> + 'static,
283{
284    pub fn give<D>(&self, cap: &Capability<T>, data: D)
285    where
286        CapacityContainerBuilder<C>: PushInto<D>,
287    {
288        let mut handle = self.handle.borrow_mut();
289        handle.session_with_builder(cap).give(data);
290    }
291}
292
293impl<T, D, P>
294    AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<D>>>, P>
295where
296    D: timely::Data + Columnation,
297    T: Timestamp,
298    P: Push<Message<T, TimelyStack<D>>>,
299{
300    pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
301
302    /// Provides one record at the time specified by the capability. This method will automatically
303    /// yield back to timely after [Self::MAX_OUTSTANDING_BYTES] have been produced.
304    pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
305    where
306        TimelyStack<D>: PushInto<D2>,
307    {
308        let should_yield = {
309            let mut handle = self.handle.borrow_mut();
310            let mut session = handle.session_with_builder(cap);
311            session.push_into(data);
312            let should_yield = session.builder().bytes.get() > Self::MAX_OUTSTANDING_BYTES;
313            if should_yield {
314                session.builder().bytes.set(0);
315            }
316            should_yield
317        };
318        if should_yield {
319            tokio::task::yield_now().await;
320        }
321    }
322}
323
324impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>> + 'static> Clone
325    for AsyncOutputHandle<T, CB, P>
326{
327    fn clone(&self) -> Self {
328        Self {
329            handle: Rc::clone(&self.handle),
330            wrapper: Rc::clone(&self.wrapper),
331            index: self.index,
332        }
333    }
334}
335
336/// A trait describing the connection behavior between an input of an operator and zero or more of
337/// its outputs.
338pub trait InputConnection<T: Timestamp> {
339    /// The capability type associated with this connection behavior.
340    type Capability;
341
342    /// Generates a summary description of the connection behavior given the number of outputs.
343    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
344
345    /// Accepts an input capability.
346    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
347}
348
349/// A marker type representing a disconnected input.
350pub struct Disconnected;
351
352impl<T: Timestamp> InputConnection<T> for Disconnected {
353    type Capability = T;
354
355    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
356        vec![Antichain::new(); outputs]
357    }
358
359    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
360        input_cap.time().clone()
361    }
362}
363
364/// A marker type representing an input connected to exactly one output.
365pub struct ConnectedToOne(usize);
366
367impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
368    type Capability = Capability<T>;
369
370    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
371        let mut summary = vec![Antichain::new(); outputs];
372        summary[self.0] = Antichain::from_elem(T::Summary::default());
373        summary
374    }
375
376    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
377        input_cap.retain_for_output(self.0)
378    }
379}
380
381/// A marker type representing an input connected to many outputs.
382pub struct ConnectedToMany<const N: usize>([usize; N]);
383
384impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
385    type Capability = [Capability<T>; N];
386
387    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
388        let mut summary = vec![Antichain::new(); outputs];
389        for output in self.0 {
390            summary[output] = Antichain::from_elem(T::Summary::default());
391        }
392        summary
393    }
394
395    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
396        self.0
397            .map(|output| input_cap.delayed_for_output(input_cap.time(), output))
398    }
399}
400
401/// A helper trait abstracting over an output handle. It facilitates passing type erased
402/// output handles during operator construction.
403/// It is not meant to be implemented by users.
404pub trait OutputIndex {
405    /// The output index of this handle.
406    fn index(&self) -> usize;
407}
408
409impl<T: Timestamp, CB: ContainerBuilder> OutputIndex
410    for AsyncOutputHandle<T, CB, Tee<T, CB::Container>>
411{
412    fn index(&self) -> usize {
413        self.index
414    }
415}
416
417impl<G: Scope> OperatorBuilder<G> {
418    /// Allocates a new generic async operator builder from its containing scope.
419    pub fn new(name: String, mut scope: G) -> Self {
420        let builder = OperatorBuilderRc::new(name, scope.clone());
421        let info = builder.operator_info();
422        let activator = scope.activator_for(Rc::clone(&info.address));
423        let sync_activator = scope.sync_activator_for(info.address.to_vec());
424        let operator_waker = TimelyWaker {
425            activator: sync_activator,
426            active: AtomicBool::new(false),
427            task_ready: AtomicBool::new(true),
428        };
429        let (shutdown_handle, shutdown_button) = button(&mut scope, info.address);
430
431        OperatorBuilder {
432            builder,
433            activator,
434            operator_waker: Arc::new(operator_waker),
435            input_frontiers: Default::default(),
436            input_queues: Default::default(),
437            output_flushes: Default::default(),
438            shutdown_handle,
439            shutdown_button,
440        }
441    }
442
443    /// Adds a new input that is connected to the specified output, returning the async input handle to use.
444    pub fn new_input_for<D, P>(
445        &mut self,
446        stream: &StreamCore<G, D>,
447        pact: P,
448        output: &dyn OutputIndex,
449    ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
450    where
451        D: Container + 'static,
452        P: ParallelizationContract<G::Timestamp, D>,
453    {
454        let index = output.index();
455        assert!(index < self.builder.shape().outputs());
456        self.new_input_connection(stream, pact, ConnectedToOne(index))
457    }
458
459    /// Adds a new input that is connected to the specified outputs, returning the async input handle to use.
460    pub fn new_input_for_many<const N: usize, D, P>(
461        &mut self,
462        stream: &StreamCore<G, D>,
463        pact: P,
464        outputs: [&dyn OutputIndex; N],
465    ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
466    where
467        D: Container + 'static,
468        P: ParallelizationContract<G::Timestamp, D>,
469    {
470        let indices = outputs.map(|output| output.index());
471        for index in indices {
472            assert!(index < self.builder.shape().outputs());
473        }
474        self.new_input_connection(stream, pact, ConnectedToMany(indices))
475    }
476
477    /// Adds a new input that is not connected to any output, returning the async input handle to use.
478    pub fn new_disconnected_input<D, P>(
479        &mut self,
480        stream: &StreamCore<G, D>,
481        pact: P,
482    ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
483    where
484        D: Container + 'static,
485        P: ParallelizationContract<G::Timestamp, D>,
486    {
487        self.new_input_connection(stream, pact, Disconnected)
488    }
489
490    /// Adds a new input with connection information, returning the async input handle to use.
491    pub fn new_input_connection<D, P, C>(
492        &mut self,
493        stream: &StreamCore<G, D>,
494        pact: P,
495        connection: C,
496    ) -> AsyncInputHandle<G::Timestamp, D, C>
497    where
498        D: Container + 'static,
499        P: ParallelizationContract<G::Timestamp, D>,
500        C: InputConnection<G::Timestamp> + 'static,
501    {
502        self.input_frontiers
503            .push(Antichain::from_elem(G::Timestamp::minimum()));
504
505        let outputs = self.builder.shape().outputs();
506        let handle = self.builder.new_input_connection(
507            stream,
508            pact,
509            connection.describe(outputs).into_iter().enumerate(),
510        );
511
512        let waker = Default::default();
513        let queue = Default::default();
514        let input_queue = InputHandleQueue {
515            queue: Rc::clone(&queue),
516            waker: Rc::clone(&waker),
517            connection,
518            handle,
519        };
520        self.input_queues.push(Box::new(input_queue));
521
522        AsyncInputHandle {
523            queue,
524            waker,
525            done: false,
526        }
527    }
528
529    /// Adds a new output, returning the output handle and stream.
530    pub fn new_output<CB: ContainerBuilder>(
531        &mut self,
532    ) -> (
533        AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
534        StreamCore<G, CB::Container>,
535    ) {
536        let index = self.builder.shape().outputs();
537
538        let (wrapper, stream) = self.builder.new_output_connection([]);
539
540        let handle = AsyncOutputHandle::new(wrapper, index);
541
542        let flush_handle = handle.clone();
543        self.output_flushes
544            .push(Box::new(move || flush_handle.cease()));
545
546        (handle, stream)
547    }
548
549    /// Creates an operator implementation from supplied logic constructor. It returns a shutdown
550    /// button that when pressed it will cause the logic future to be dropped and input handles to
551    /// be drained. The button can be converted into a token by using
552    /// [`Button::press_on_drop`]
553    pub fn build<B, L>(self, constructor: B) -> Button
554    where
555        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
556        L: Future + 'static,
557    {
558        let operator_waker = self.operator_waker;
559        let mut input_frontiers = self.input_frontiers;
560        let mut input_queues = self.input_queues;
561        let mut output_flushes = self.output_flushes;
562        let mut shutdown_handle = self.shutdown_handle;
563        self.builder.build_reschedule(move |caps| {
564            let mut logic_fut = Some(Box::pin(constructor(caps)));
565            move |new_frontiers| {
566                operator_waker.active.store(true, Ordering::SeqCst);
567                for (i, queue) in input_queues.iter_mut().enumerate() {
568                    // First, discover if there are any frontier notifications
569                    let cur = &mut input_frontiers[i];
570                    let new = new_frontiers[i].frontier();
571                    if PartialOrder::less_than(&cur.borrow(), &new) {
572                        queue.notify_progress(new.to_owned());
573                        *cur = new.to_owned();
574                    }
575                    // Then accept all input into local queues. This step registers the received
576                    // messages with progress tracking.
577                    queue.accept_input();
578                }
579                operator_waker.active.store(false, Ordering::SeqCst);
580
581                // If our worker pressed the button we stop scheduling the logic future and/or
582                // draining the input handles to stop producing data and frontier updates
583                // downstream.
584                if shutdown_handle.local_pressed() {
585                    // When all workers press their buttons we drop the logic future and start
586                    // draining the input handles.
587                    if shutdown_handle.all_pressed() {
588                        logic_fut = None;
589                        for queue in input_queues.iter_mut() {
590                            queue.drain_input();
591                        }
592                        false
593                    } else {
594                        true
595                    }
596                } else {
597                    // Schedule the logic future if any of the wakers above marked the task as ready
598                    if let Some(fut) = logic_fut.as_mut() {
599                        if operator_waker.task_ready.load(Ordering::SeqCst) {
600                            let waker = futures_util::task::waker_ref(&operator_waker);
601                            let mut cx = Context::from_waker(&waker);
602                            operator_waker.task_ready.store(false, Ordering::SeqCst);
603                            if Pin::new(fut).poll(&mut cx).is_ready() {
604                                // We're done with logic so deallocate the task
605                                logic_fut = None;
606                            }
607                            // Flush all the outputs before exiting
608                            for flush in output_flushes.iter_mut() {
609                                (flush)();
610                            }
611                        }
612                    }
613
614                    // The timely operator needs to be kept alive if the task is pending
615                    if logic_fut.is_some() {
616                        true
617                    } else {
618                        // Othewise we should keep draining all inputs
619                        for queue in input_queues.iter_mut() {
620                            queue.drain_input();
621                        }
622                        false
623                    }
624                }
625            }
626        });
627
628        self.shutdown_button
629    }
630
631    /// Creates a fallible operator implementation from supplied logic constructor. If the `Future`
632    /// resolves to an error it will be emitted in the returned error stream and then the operator
633    /// will wait indefinitely until the shutdown button is pressed.
634    ///
635    /// # Capability handling
636    ///
637    /// Unlike [`OperatorBuilder::build`], this method does not give owned capabilities to the
638    /// constructor. All initial capabilities are wrapped in a `CapabilitySet` and a mutable
639    /// reference to them is given instead. This is done to avoid storing owned capabilities in the
640    /// state of the logic future which would make using the `?` operator unsafe, since the
641    /// frontiers would incorrectly advance, potentially causing incorrect actions downstream.
642    ///
643    /// ```ignore
644    /// builder.build_fallible(|caps| Box::pin(async move {
645    ///     // Assert that we have the number of capabilities we expect
646    ///     // `cap` will be a `&mut Option<Capability<T>>`:
647    ///     let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
648    ///
649    ///     // Using cap to send data:
650    ///     output.give(&cap_set[0], 42);
651    ///
652    ///     // Using cap_set to downgrade it:
653    ///     cap_set.downgrade([]);
654    ///
655    ///     // Explicitly dropping the capability:
656    ///     // Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
657    ///     *cap_set = CapabilitySet::new();
658    ///
659    ///     // !! BIG WARNING !!:
660    ///     // It is tempting to `take` the capability out of the set for convenience. This will
661    ///     // move the capability into the future state, tying its lifetime to it, which will get
662    ///     // dropped when an error is hit, causing incorrect progress statements.
663    ///     let cap = cap_set.delayed(&Timestamp::minimum());
664    ///     *cap_set = CapabilitySet::new(); // DO NOT DO THIS
665    /// }));
666    /// ```
667    pub fn build_fallible<E: 'static, F>(
668        mut self,
669        constructor: F,
670    ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
671    where
672        F: for<'a> FnOnce(
673                &'a mut [CapabilitySet<G::Timestamp>],
674            ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
675            + 'static,
676    {
677        // Create a new completely disconnected output
678        let (error_output, error_stream) = self.new_output();
679        let button = self.build(|mut caps| async move {
680            let error_cap = caps.pop().unwrap();
681            let mut caps = caps
682                .into_iter()
683                .map(CapabilitySet::from_elem)
684                .collect::<Vec<_>>();
685            if let Err(err) = constructor(&mut *caps).await {
686                error_output.give(&error_cap, Rc::new(err));
687                drop(error_cap);
688                // IMPORTANT: wedge this operator until the button is pressed. Returning would drop
689                // the capabilities and could produce incorrect progress statements.
690                std::future::pending().await
691            }
692        });
693        (button, error_stream)
694    }
695
696    /// Creates operator info for the operator.
697    pub fn operator_info(&self) -> OperatorInfo {
698        self.builder.operator_info()
699    }
700
701    /// Returns the activator for the operator.
702    pub fn activator(&self) -> &Activator {
703        &self.activator
704    }
705}
706
707/// Creates a new coordinated button the worker configuration described by `scope`.
708pub fn button<G: Scope>(scope: &mut G, addr: Rc<[usize]>) -> (ButtonHandle, Button) {
709    let index = scope.new_identifier();
710    let (pushers, puller) = scope.allocate(index, addr);
711
712    let local_pressed = Rc::new(Cell::new(false));
713
714    let handle = ButtonHandle {
715        buttons_remaining: scope.peers(),
716        local_pressed: Rc::clone(&local_pressed),
717        puller,
718    };
719
720    let token = Button {
721        pushers,
722        local_pressed,
723    };
724
725    (handle, token)
726}
727
728/// A button that can be used to coordinate an action after all workers have pressed it.
729pub struct ButtonHandle {
730    /// The number of buttons still unpressed among workers.
731    buttons_remaining: usize,
732    /// A flag indicating whether this worker has pressed its button.
733    local_pressed: Rc<Cell<bool>>,
734    puller: Box<dyn Pull<Bincode<bool>>>,
735}
736
737impl ButtonHandle {
738    /// Returns whether this worker has pressed its button.
739    pub fn local_pressed(&self) -> bool {
740        self.local_pressed.get()
741    }
742
743    /// Returns whether all workers have pressed their buttons.
744    pub fn all_pressed(&mut self) -> bool {
745        while self.puller.recv().is_some() {
746            self.buttons_remaining -= 1;
747        }
748        self.buttons_remaining == 0
749    }
750}
751
752pub struct Button {
753    pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
754    local_pressed: Rc<Cell<bool>>,
755}
756
757impl Button {
758    /// Presses the button. It is safe to call this function multiple times.
759    pub fn press(&mut self) {
760        for mut pusher in self.pushers.drain(..) {
761            pusher.send(Bincode::from(true));
762            pusher.done();
763        }
764        self.local_pressed.set(true);
765    }
766
767    /// Converts this button into a deadman's switch that will automatically press the button when
768    /// dropped.
769    pub fn press_on_drop(self) -> PressOnDropButton {
770        PressOnDropButton(self)
771    }
772}
773
774pub struct PressOnDropButton(Button);
775
776impl Drop for PressOnDropButton {
777    fn drop(&mut self) {
778        self.0.press();
779    }
780}
781
782#[cfg(test)]
783mod test {
784    use futures_util::StreamExt;
785    use timely::WorkerConfig;
786    use timely::dataflow::channels::pact::Pipeline;
787    use timely::dataflow::operators::capture::Extract;
788    use timely::dataflow::operators::{Capture, ToStream};
789
790    use super::*;
791
792    #[mz_ore::test]
793    fn async_operator() {
794        let capture = timely::example(|scope| {
795            let input = (0..10).to_stream(scope);
796
797            let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
798            let (output, output_stream) = op.new_output();
799            let mut input_handle = op.new_input_for(&input, Pipeline, &output);
800
801            op.build(move |_capabilities| async move {
802                tokio::task::yield_now().await;
803                while let Some(event) = input_handle.next().await {
804                    match event {
805                        Event::Data(cap, data) => {
806                            for item in data.iter().copied() {
807                                tokio::task::yield_now().await;
808                                output.give(&cap, item);
809                            }
810                        }
811                        Event::Progress(_frontier) => {}
812                    }
813                }
814            });
815
816            output_stream.capture()
817        });
818        let extracted = capture.extract();
819
820        assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
821    }
822
823    #[mz_ore::test]
824    fn gh_18837() {
825        let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
826        timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
827            let index = worker.index();
828            let tokens = worker.dataflow::<u64, _, _>(move |scope| {
829                let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
830                let (_output, output_stream) =
831                    producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
832                let producer_button = producer.build(move |mut capabilities| async move {
833                    let mut cap = capabilities.pop().unwrap();
834                    if index != 0 {
835                        return;
836                    }
837                    // Worker 0 downgrades to 1 and keeps the capability around forever
838                    cap.downgrade(&1);
839                    std::future::pending().await
840                });
841
842                let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
843                let mut input_handle = consumer.new_disconnected_input(&output_stream, Pipeline);
844                let consumer_button = consumer.build(move |_| async move {
845                    while let Some(event) = input_handle.next().await {
846                        if let Event::Progress(frontier) = event {
847                            // We should never observe a frontier greater than [1]
848                            assert!(frontier.less_equal(&1));
849                        }
850                    }
851                });
852
853                (
854                    producer_button.press_on_drop(),
855                    consumer_button.press_on_drop(),
856                )
857            });
858
859            // Run dataflow until only worker 0 holds the frontier to [1]
860            for _ in 0..100 {
861                worker.step();
862            }
863            // Then drop the tokens of worker 0
864            if index == 0 {
865                drop(tokens)
866            }
867            // And step the dataflow some more to ensure consumers don't observe frontiers advancing.
868            for _ in 0..100 {
869                worker.step();
870            }
871        })
872        .expect("timely panicked");
873    }
874}