Skip to main content

mz_timely_util/
builder_async.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types to build async operators with general shapes.
17
18use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use futures_util::Stream;
28use futures_util::task::ArcWake;
29use timely::communication::{Pull, Push};
30use timely::container::{CapacityContainerBuilder, PushInto};
31use timely::dataflow::channels::Message;
32use timely::dataflow::channels::pact::ParallelizationContract;
33use timely::dataflow::channels::pushers::Output;
34use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
35use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
36use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
37use timely::dataflow::{Scope, Stream as TimelyStream, StreamVec};
38use timely::progress::{Antichain, Timestamp};
39use timely::scheduling::{Activator, SyncActivator};
40use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
41
42use crate::containers::stack::FueledBuilder;
43
44/// Builds async operators with generic shape.
45pub struct OperatorBuilder<'scope, T: Timestamp> {
46    builder: OperatorBuilderRc<'scope, T>,
47    /// The activator for this operator
48    activator: Activator,
49    /// The waker set up to activate this timely operator when woken
50    operator_waker: Arc<TimelyWaker>,
51    /// The currently known upper frontier of each of the input handles.
52    input_frontiers: Vec<Antichain<T>>,
53    /// Input queues for each of the declared inputs of the operator.
54    input_queues: Vec<Box<dyn InputQueue<T>>>,
55    /// Holds type erased closures that flush an output handle when called. These handles will be
56    /// automatically drained when the operator is scheduled after the logic future has been polled
57    output_flushes: Vec<Box<dyn FnMut()>>,
58    /// A handle to check whether all workers have pressed the shutdown button.
59    shutdown_handle: ButtonHandle,
60    /// A button to coordinate shutdown of this operator among workers.
61    shutdown_button: Button,
62}
63
64/// A helper trait abstracting over an input handle. It facilitates keeping around type erased
65/// handles for each of the operator inputs.
66trait InputQueue<T: Timestamp> {
67    /// Accepts all available input into local queues.
68    fn accept_input(&mut self);
69
70    /// Drains all available input and empties the local queue.
71    fn drain_input(&mut self);
72
73    /// Registers a frontier notification to be delivered.
74    fn notify_progress(&mut self, upper: Antichain<T>);
75}
76
77impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
78where
79    T: Timestamp,
80    D: Container,
81    C: InputConnection<T> + 'static,
82    P: Pull<Message<T, D>> + 'static,
83{
84    fn accept_input(&mut self) {
85        let mut queue = self.queue.borrow_mut();
86        let mut new_data = false;
87        self.handle.for_each(|cap, data| {
88            new_data = true;
89            let cap = self.connection.accept(cap);
90            queue.push_back(Event::Data(cap, std::mem::take(data)));
91        });
92        if new_data {
93            if let Some(waker) = self.waker.take() {
94                waker.wake();
95            }
96        }
97    }
98
99    fn drain_input(&mut self) {
100        self.queue.borrow_mut().clear();
101        self.handle.for_each(|_, _| {});
102    }
103
104    fn notify_progress(&mut self, upper: Antichain<T>) {
105        let mut queue = self.queue.borrow_mut();
106        // It's beneficial to consolidate two consecutive progress statements into one if the
107        // operator hasn't seen the previous progress yet. This also avoids accumulation of
108        // progress statements in the queue if the operator only conditionally checks this input.
109        match queue.back_mut() {
110            Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
111            _ => queue.push_back(Event::Progress(upper)),
112        }
113        if let Some(waker) = self.waker.take() {
114            waker.wake();
115        }
116    }
117}
118
119struct InputHandleQueue<
120    T: Timestamp,
121    D: Container,
122    C: InputConnection<T>,
123    P: Pull<Message<T, D>> + 'static,
124> {
125    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
126    waker: Rc<Cell<Option<Waker>>>,
127    connection: C,
128    handle: InputHandleCore<T, D, P>,
129}
130
131/// An async Waker that activates a specific operator when woken and marks the task as ready
132struct TimelyWaker {
133    activator: SyncActivator,
134    active: AtomicBool,
135    task_ready: AtomicBool,
136}
137
138impl ArcWake for TimelyWaker {
139    fn wake_by_ref(arc_self: &Arc<Self>) {
140        arc_self.task_ready.store(true, Ordering::SeqCst);
141        // Only activate the timely operator if it's not already active to avoid an infinite loop
142        if !arc_self.active.load(Ordering::SeqCst) {
143            // We don't have any guarantees about how long the Waker will be held for and so we
144            // must be prepared for the receiving end to have hung up when we finally do get woken.
145            // This can happen if by the time the waker is called the receiving timely worker has
146            // been shutdown. For this reason we ignore the activation error.
147            let _ = arc_self.activator.activate();
148        }
149    }
150}
151
152/// Async handle to an operator's input stream
153pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
154    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
155    waker: Rc<Cell<Option<Waker>>>,
156    /// Whether this handle has finished producing data
157    done: bool,
158}
159
160impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
161    pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
162        let mut queue = self.queue.borrow_mut();
163        match queue.pop_front()? {
164            Event::Data(cap, data) => Some(Event::Data(cap, data)),
165            Event::Progress(frontier) => {
166                self.done = frontier.is_empty();
167                Some(Event::Progress(frontier))
168            }
169        }
170    }
171
172    /// Waits for the handle to have data. After this function returns it is guaranteed that at
173    /// least one call to `next_sync` will be `Some(_)`.
174    pub async fn ready(&self) {
175        std::future::poll_fn(|cx| self.poll_ready(cx)).await
176    }
177
178    fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
179        if self.queue.borrow().is_empty() {
180            self.waker.set(Some(cx.waker().clone()));
181            Poll::Pending
182        } else {
183            Poll::Ready(())
184        }
185    }
186}
187
188impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
189    type Item = Event<T, C::Capability, D>;
190
191    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192        if self.done {
193            return Poll::Ready(None);
194        }
195        ready!(self.poll_ready(cx));
196        Poll::Ready(self.next_sync())
197    }
198
199    fn size_hint(&self) -> (usize, Option<usize>) {
200        (self.queue.borrow().len(), None)
201    }
202}
203
204/// An event of an input stream
205#[derive(Debug)]
206pub enum Event<T: Timestamp, C, D> {
207    /// A data event
208    Data(C, D),
209    /// A progress event
210    Progress(Antichain<T>),
211}
212
213/// Shared part of an async output handle
214struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
215    /// Handle to write to the output stream.
216    output: Output<T, CB::Container>,
217    /// Current capability held by this output handle.
218    capability: Option<Capability<T>>,
219    /// Container builder to accumulate data before sending at `capability`.
220    builder: CB,
221}
222
223impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
224    /// Write all pending data to the output stream.
225    fn flush(&mut self) {
226        while let Some(container) = self.builder.finish() {
227            self.output
228                .give(self.capability.as_ref().expect("must exist"), container);
229        }
230    }
231
232    /// Cease this output handle, flushing all pending data and releasing its capability.
233    fn cease(&mut self) {
234        self.flush();
235        let _ = self.output.activate();
236        self.capability = None;
237    }
238
239    /// Provides data at the time specified by the capability. Flushes automatically when the
240    /// capability time changes.
241    fn give<D>(&mut self, cap: &Capability<T>, data: D)
242    where
243        CB: PushInto<D>,
244    {
245        if let Some(capability) = &self.capability
246            && cap.time() != capability.time()
247        {
248            self.flush();
249            self.capability = None;
250        }
251        if self.capability.is_none() {
252            self.capability = Some(cap.clone());
253        }
254
255        self.builder.push_into(data);
256        while let Some(container) = self.builder.extract() {
257            self.output
258                .give(self.capability.as_ref().expect("must exist"), container);
259        }
260    }
261}
262
263pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
264    inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
265    index: usize,
266}
267
268impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
269where
270    T: Timestamp,
271    C: Container + Clone + 'static,
272{
273    #[inline]
274    pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
275        let mut inner = self.inner.borrow_mut();
276        inner.flush();
277        inner.output.give(cap, container);
278    }
279}
280
281impl<T, CB> AsyncOutputHandle<T, CB>
282where
283    T: Timestamp,
284    CB: ContainerBuilder,
285{
286    fn new(output: Output<T, CB::Container>, index: usize) -> Self {
287        let inner = AsyncOutputHandleInner {
288            output,
289            capability: None,
290            builder: CB::default(),
291        };
292        Self {
293            inner: Rc::new(RefCell::new(inner)),
294            index,
295        }
296    }
297
298    fn cease(&self) {
299        self.inner.borrow_mut().cease();
300    }
301}
302
303impl<T, CB> AsyncOutputHandle<T, CB>
304where
305    T: Timestamp,
306    CB: ContainerBuilder,
307{
308    pub fn give<D>(&self, cap: &Capability<T>, data: D)
309    where
310        CB: PushInto<D>,
311    {
312        self.inner.borrow_mut().give(cap, data);
313    }
314}
315
316impl<T, D> AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<D>>>>
317where
318    D: Clone + 'static,
319    T: Timestamp,
320{
321    pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
322
323    /// Provides one record at the time specified by the capability and
324    /// charges `size_bytes` against the builder's fuel counter. Once at least
325    /// [`Self::MAX_OUTSTANDING_BYTES`] have been emitted since the last yield
326    /// this method yields back to timely and resets the counter.
327    pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2, size_bytes: usize)
328    where
329        FueledBuilder<CapacityContainerBuilder<Vec<D>>>: PushInto<D2>,
330    {
331        let should_yield = {
332            let mut handle = self.inner.borrow_mut();
333            handle.give(cap, data);
334            let bytes = &handle.builder.bytes;
335            bytes.set(bytes.get() + size_bytes);
336            let should_yield = bytes.get() > Self::MAX_OUTSTANDING_BYTES;
337            if should_yield {
338                bytes.set(0);
339            }
340            should_yield
341        };
342        if should_yield {
343            tokio::task::yield_now().await;
344        }
345    }
346}
347
348impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
349    fn clone(&self) -> Self {
350        Self {
351            inner: Rc::clone(&self.inner),
352            index: self.index,
353        }
354    }
355}
356
357/// A trait describing the connection behavior between an input of an operator and zero or more of
358/// its outputs.
359pub trait InputConnection<T: Timestamp> {
360    /// The capability type associated with this connection behavior.
361    type Capability;
362
363    /// Generates a summary description of the connection behavior given the number of outputs.
364    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
365
366    /// Accepts an input capability.
367    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
368}
369
370/// A marker type representing a disconnected input.
371pub struct Disconnected;
372
373impl<T: Timestamp> InputConnection<T> for Disconnected {
374    type Capability = T;
375
376    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
377        vec![Antichain::new(); outputs]
378    }
379
380    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
381        input_cap.time().clone()
382    }
383}
384
385/// A marker type representing an input connected to exactly one output.
386pub struct ConnectedToOne(usize);
387
388impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
389    type Capability = Capability<T>;
390
391    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
392        let mut summary = vec![Antichain::new(); outputs];
393        summary[self.0] = Antichain::from_elem(T::Summary::default());
394        summary
395    }
396
397    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
398        input_cap.retain(self.0)
399    }
400}
401
402/// A marker type representing an input connected to many outputs.
403pub struct ConnectedToMany<const N: usize>([usize; N]);
404
405impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
406    type Capability = [Capability<T>; N];
407
408    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
409        let mut summary = vec![Antichain::new(); outputs];
410        for output in self.0 {
411            summary[output] = Antichain::from_elem(T::Summary::default());
412        }
413        summary
414    }
415
416    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
417        self.0.map(|output| input_cap.retain(output))
418    }
419}
420
421/// A helper trait abstracting over an output handle. It facilitates passing type erased
422/// output handles during operator construction.
423/// It is not meant to be implemented by users.
424pub trait OutputIndex {
425    /// The output index of this handle.
426    fn index(&self) -> usize;
427}
428
429impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
430    fn index(&self) -> usize {
431        self.index
432    }
433}
434
435impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
436    /// Allocates a new generic async operator builder from its containing scope.
437    pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
438        let builder = OperatorBuilderRc::new(name, scope);
439        let info = builder.operator_info();
440        let activator = scope.activator_for(Rc::clone(&info.address));
441        let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
442        let operator_waker = TimelyWaker {
443            activator: sync_activator,
444            active: AtomicBool::new(false),
445            task_ready: AtomicBool::new(true),
446        };
447        let (shutdown_handle, shutdown_button) = button(scope, info.address);
448
449        OperatorBuilder {
450            builder,
451            activator,
452            operator_waker: Arc::new(operator_waker),
453            input_frontiers: Default::default(),
454            input_queues: Default::default(),
455            output_flushes: Default::default(),
456            shutdown_handle,
457            shutdown_button,
458        }
459    }
460
461    /// Adds a new input that is connected to the specified output, returning the async input handle to use.
462    pub fn new_input_for<D, P>(
463        &mut self,
464        stream: TimelyStream<'scope, T, D>,
465        pact: P,
466        output: &dyn OutputIndex,
467    ) -> AsyncInputHandle<T, D, ConnectedToOne>
468    where
469        D: Container + Clone + 'static,
470        P: ParallelizationContract<T, D>,
471    {
472        let index = output.index();
473        assert!(index < self.builder.shape().outputs());
474        self.new_input_connection(stream, pact, ConnectedToOne(index))
475    }
476
477    /// Adds a new input that is connected to the specified outputs, returning the async input handle to use.
478    pub fn new_input_for_many<const N: usize, D, P>(
479        &mut self,
480        stream: TimelyStream<'scope, T, D>,
481        pact: P,
482        outputs: [&dyn OutputIndex; N],
483    ) -> AsyncInputHandle<T, D, ConnectedToMany<N>>
484    where
485        D: Container + Clone + 'static,
486        P: ParallelizationContract<T, D>,
487    {
488        let indices = outputs.map(|output| output.index());
489        for index in indices {
490            assert!(index < self.builder.shape().outputs());
491        }
492        self.new_input_connection(stream, pact, ConnectedToMany(indices))
493    }
494
495    /// Adds a new input that is not connected to any output, returning the async input handle to use.
496    pub fn new_disconnected_input<D, P>(
497        &mut self,
498        stream: TimelyStream<'scope, T, D>,
499        pact: P,
500    ) -> AsyncInputHandle<T, D, Disconnected>
501    where
502        D: Container + Clone + 'static,
503        P: ParallelizationContract<T, D>,
504    {
505        self.new_input_connection(stream, pact, Disconnected)
506    }
507
508    /// Adds a new input with connection information, returning the async input handle to use.
509    pub fn new_input_connection<D, P, C>(
510        &mut self,
511        stream: TimelyStream<'scope, T, D>,
512        pact: P,
513        connection: C,
514    ) -> AsyncInputHandle<T, D, C>
515    where
516        D: Container + Clone + 'static,
517        P: ParallelizationContract<T, D>,
518        C: InputConnection<T> + 'static,
519    {
520        self.input_frontiers
521            .push(Antichain::from_elem(T::minimum()));
522
523        let outputs = self.builder.shape().outputs();
524        let handle = self.builder.new_input_connection(
525            stream,
526            pact,
527            connection.describe(outputs).into_iter().enumerate(),
528        );
529
530        let waker = Default::default();
531        let queue = Default::default();
532        let input_queue = InputHandleQueue {
533            queue: Rc::clone(&queue),
534            waker: Rc::clone(&waker),
535            connection,
536            handle,
537        };
538        self.input_queues.push(Box::new(input_queue));
539
540        AsyncInputHandle {
541            queue,
542            waker,
543            done: false,
544        }
545    }
546
547    /// Adds a new output, returning the output handle and stream.
548    pub fn new_output<CB: ContainerBuilder>(
549        &mut self,
550    ) -> (
551        AsyncOutputHandle<T, CB>,
552        TimelyStream<'scope, T, CB::Container>,
553    ) {
554        let index = self.builder.shape().outputs();
555
556        let (output, stream) = self.builder.new_output_connection([]);
557
558        let handle = AsyncOutputHandle::new(output, index);
559
560        let flush_handle = handle.clone();
561        self.output_flushes
562            .push(Box::new(move || flush_handle.cease()));
563
564        (handle, stream)
565    }
566
567    /// Creates an operator implementation from supplied logic constructor. It returns a shutdown
568    /// button that when pressed it will cause the logic future to be dropped and input handles to
569    /// be drained. The button can be converted into a token by using
570    /// [`Button::press_on_drop`]
571    pub fn build<B, L>(self, constructor: B) -> Button
572    where
573        B: FnOnce(Vec<Capability<T>>) -> L,
574        L: Future + 'static,
575    {
576        let operator_waker = self.operator_waker;
577        let mut input_frontiers = self.input_frontiers;
578        let mut input_queues = self.input_queues;
579        let mut output_flushes = self.output_flushes;
580        let mut shutdown_handle = self.shutdown_handle;
581        self.builder.build_reschedule(move |caps| {
582            let mut logic_fut = Some(Box::pin(constructor(caps)));
583            move |new_frontiers| {
584                operator_waker.active.store(true, Ordering::SeqCst);
585                for (i, queue) in input_queues.iter_mut().enumerate() {
586                    // First, discover if there are any frontier notifications
587                    let cur = &mut input_frontiers[i];
588                    let new = new_frontiers[i].frontier();
589                    if PartialOrder::less_than(&cur.borrow(), &new) {
590                        queue.notify_progress(new.to_owned());
591                        *cur = new.to_owned();
592                    }
593                    // Then accept all input into local queues. This step registers the received
594                    // messages with progress tracking.
595                    queue.accept_input();
596                }
597                operator_waker.active.store(false, Ordering::SeqCst);
598
599                // If our worker pressed the button we stop scheduling the logic future and/or
600                // draining the input handles to stop producing data and frontier updates
601                // downstream.
602                if shutdown_handle.local_pressed() {
603                    // When all workers press their buttons we drop the logic future and start
604                    // draining the input handles.
605                    if shutdown_handle.all_pressed() {
606                        logic_fut = None;
607                        for queue in input_queues.iter_mut() {
608                            queue.drain_input();
609                        }
610                        false
611                    } else {
612                        true
613                    }
614                } else {
615                    // Schedule the logic future if any of the wakers above marked the task as ready
616                    if let Some(fut) = logic_fut.as_mut() {
617                        if operator_waker.task_ready.load(Ordering::SeqCst) {
618                            let waker = futures_util::task::waker_ref(&operator_waker);
619                            let mut cx = Context::from_waker(&waker);
620                            operator_waker.task_ready.store(false, Ordering::SeqCst);
621                            if Pin::new(fut).poll(&mut cx).is_ready() {
622                                // We're done with logic so deallocate the task
623                                logic_fut = None;
624                            }
625                            // Flush all the outputs before exiting
626                            for flush in output_flushes.iter_mut() {
627                                (flush)();
628                            }
629                        }
630                    }
631
632                    // The timely operator needs to be kept alive if the task is pending
633                    if logic_fut.is_some() {
634                        true
635                    } else {
636                        // Othewise we should keep draining all inputs
637                        for queue in input_queues.iter_mut() {
638                            queue.drain_input();
639                        }
640                        false
641                    }
642                }
643            }
644        });
645
646        self.shutdown_button
647    }
648
649    /// Creates a fallible operator implementation from supplied logic constructor. If the `Future`
650    /// resolves to an error it will be emitted in the returned error stream and then the operator
651    /// will wait indefinitely until the shutdown button is pressed.
652    ///
653    /// # Capability handling
654    ///
655    /// Unlike [`OperatorBuilder::build`], this method does not give owned capabilities to the
656    /// constructor. All initial capabilities are wrapped in a `CapabilitySet` and a mutable
657    /// reference to them is given instead. This is done to avoid storing owned capabilities in the
658    /// state of the logic future which would make using the `?` operator unsafe, since the
659    /// frontiers would incorrectly advance, potentially causing incorrect actions downstream.
660    ///
661    /// ```ignore
662    /// builder.build_fallible(|caps| Box::pin(async move {
663    ///     // Assert that we have the number of capabilities we expect
664    ///     // `cap` will be a `&mut Option<Capability<T>>`:
665    ///     let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
666    ///
667    ///     // Using cap to send data:
668    ///     output.give(&cap_set[0], 42);
669    ///
670    ///     // Using cap_set to downgrade it:
671    ///     cap_set.downgrade([]);
672    ///
673    ///     // Explicitly dropping the capability:
674    ///     // Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
675    ///     *cap_set = CapabilitySet::new();
676    ///
677    ///     // !! BIG WARNING !!:
678    ///     // It is tempting to `take` the capability out of the set for convenience. This will
679    ///     // move the capability into the future state, tying its lifetime to it, which will get
680    ///     // dropped when an error is hit, causing incorrect progress statements.
681    ///     let cap = cap_set.delayed(&Timestamp::minimum());
682    ///     *cap_set = CapabilitySet::new(); // DO NOT DO THIS
683    /// }));
684    /// ```
685    pub fn build_fallible<E: 'static, F>(
686        mut self,
687        constructor: F,
688    ) -> (Button, StreamVec<'scope, T, Rc<E>>)
689    where
690        F: for<'a> FnOnce(
691                &'a mut [CapabilitySet<T>],
692            ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
693            + 'static,
694    {
695        // Create a new completely disconnected output
696        let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
697        let button = self.build(|mut caps| async move {
698            let error_cap = caps.pop().unwrap();
699            let mut caps = caps
700                .into_iter()
701                .map(CapabilitySet::from_elem)
702                .collect::<Vec<_>>();
703            if let Err(err) = constructor(&mut *caps).await {
704                error_output.give(&error_cap, Rc::new(err));
705                drop(error_cap);
706                // IMPORTANT: wedge this operator until the button is pressed. Returning would drop
707                // the capabilities and could produce incorrect progress statements.
708                std::future::pending().await
709            }
710        });
711        (button, error_stream)
712    }
713
714    /// Creates operator info for the operator.
715    pub fn operator_info(&self) -> OperatorInfo {
716        self.builder.operator_info()
717    }
718
719    /// Returns the activator for the operator.
720    pub fn activator(&self) -> &Activator {
721        &self.activator
722    }
723}
724
725/// Creates a new coordinated button the worker configuration described by `scope`.
726pub fn button<'scope, T: Timestamp>(
727    scope: Scope<'scope, T>,
728    addr: Rc<[usize]>,
729) -> (ButtonHandle, Button) {
730    let index = scope.worker().new_identifier();
731    let (pushers, puller) = scope.worker().allocate(index, addr);
732
733    let local_pressed = Rc::new(Cell::new(false));
734
735    let handle = ButtonHandle {
736        buttons_remaining: scope.peers(),
737        local_pressed: Rc::clone(&local_pressed),
738        puller,
739    };
740
741    let token = Button {
742        pushers,
743        local_pressed,
744    };
745
746    (handle, token)
747}
748
749/// A button that can be used to coordinate an action after all workers have pressed it.
750pub struct ButtonHandle {
751    /// The number of buttons still unpressed among workers.
752    buttons_remaining: usize,
753    /// A flag indicating whether this worker has pressed its button.
754    local_pressed: Rc<Cell<bool>>,
755    puller: Box<dyn Pull<Bincode<bool>>>,
756}
757
758impl ButtonHandle {
759    /// Returns whether this worker has pressed its button.
760    pub fn local_pressed(&self) -> bool {
761        self.local_pressed.get()
762    }
763
764    /// Returns whether all workers have pressed their buttons.
765    pub fn all_pressed(&mut self) -> bool {
766        while self.puller.recv().is_some() {
767            self.buttons_remaining -= 1;
768        }
769        self.buttons_remaining == 0
770    }
771}
772
773pub struct Button {
774    pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
775    local_pressed: Rc<Cell<bool>>,
776}
777
778impl Button {
779    /// Presses the button. It is safe to call this function multiple times.
780    pub fn press(&mut self) {
781        for mut pusher in self.pushers.drain(..) {
782            pusher.send(Bincode::from(true));
783            pusher.done();
784        }
785        self.local_pressed.set(true);
786    }
787
788    /// Converts this button into a deadman's switch that will automatically press the button when
789    /// dropped.
790    pub fn press_on_drop(self) -> PressOnDropButton {
791        PressOnDropButton(self)
792    }
793}
794
795pub struct PressOnDropButton(Button);
796
797impl Drop for PressOnDropButton {
798    fn drop(&mut self) {
799        self.0.press();
800    }
801}
802
803#[cfg(test)]
804mod test {
805    use futures_util::StreamExt;
806    use timely::WorkerConfig;
807    use timely::dataflow::channels::pact::Pipeline;
808    use timely::dataflow::operators::Capture;
809    use timely::dataflow::operators::capture::Extract;
810    use timely::dataflow::operators::vec::ToStream;
811
812    use super::*;
813
814    #[mz_ore::test]
815    fn async_operator() {
816        let capture = timely::example(|scope| {
817            let input = (0..10).to_stream(scope);
818
819            let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
820            let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
821            let mut input_handle = op.new_input_for(input, Pipeline, &output);
822
823            op.build(move |_capabilities| async move {
824                tokio::task::yield_now().await;
825                while let Some(event) = input_handle.next().await {
826                    match event {
827                        Event::Data(cap, data) => {
828                            for item in data.iter().copied() {
829                                tokio::task::yield_now().await;
830                                output.give(&cap, item);
831                            }
832                        }
833                        Event::Progress(_frontier) => {}
834                    }
835                }
836            });
837
838            output_stream.capture()
839        });
840        let extracted = capture.extract();
841
842        assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
843    }
844
845    #[mz_ore::test]
846    fn gh_18837() {
847        let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
848        timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
849            let index = worker.index();
850            let tokens = worker.dataflow::<u64, _, _>(move |scope| {
851                let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
852                let (_output, output_stream) =
853                    producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
854                let producer_button = producer.build(move |mut capabilities| async move {
855                    let mut cap = capabilities.pop().unwrap();
856                    if index != 0 {
857                        return;
858                    }
859                    // Worker 0 downgrades to 1 and keeps the capability around forever
860                    cap.downgrade(&1);
861                    std::future::pending().await
862                });
863
864                let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
865                let mut input_handle = consumer.new_disconnected_input(output_stream, Pipeline);
866                let consumer_button = consumer.build(move |_| async move {
867                    while let Some(event) = input_handle.next().await {
868                        if let Event::Progress(frontier) = event {
869                            // We should never observe a frontier greater than [1]
870                            assert!(frontier.less_equal(&1));
871                        }
872                    }
873                });
874
875                (
876                    producer_button.press_on_drop(),
877                    consumer_button.press_on_drop(),
878                )
879            });
880
881            // Run dataflow until only worker 0 holds the frontier to [1]
882            for _ in 0..100 {
883                worker.step();
884            }
885            // Then drop the tokens of worker 0
886            if index == 0 {
887                drop(tokens)
888            }
889            // And step the dataflow some more to ensure consumers don't observe frontiers advancing.
890            for _ in 0..100 {
891                worker.step();
892            }
893        })
894        .expect("timely panicked");
895    }
896}