Skip to main content

mz_timely_util/
builder_async.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types to build async operators with general shapes.
17
18use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use columnation::Columnation;
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Output;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
37use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
38use timely::dataflow::{Scope, Stream as TimelyStream, StreamVec};
39use timely::progress::{Antichain, Timestamp};
40use timely::scheduling::{Activator, SyncActivator};
41use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
42
43use crate::columnation::ColumnationStack;
44use crate::containers::stack::AccountedStackBuilder;
45
46/// Builds async operators with generic shape.
47pub struct OperatorBuilder<'scope, T: Timestamp> {
48    builder: OperatorBuilderRc<'scope, T>,
49    /// The activator for this operator
50    activator: Activator,
51    /// The waker set up to activate this timely operator when woken
52    operator_waker: Arc<TimelyWaker>,
53    /// The currently known upper frontier of each of the input handles.
54    input_frontiers: Vec<Antichain<T>>,
55    /// Input queues for each of the declared inputs of the operator.
56    input_queues: Vec<Box<dyn InputQueue<T>>>,
57    /// Holds type erased closures that flush an output handle when called. These handles will be
58    /// automatically drained when the operator is scheduled after the logic future has been polled
59    output_flushes: Vec<Box<dyn FnMut()>>,
60    /// A handle to check whether all workers have pressed the shutdown button.
61    shutdown_handle: ButtonHandle,
62    /// A button to coordinate shutdown of this operator among workers.
63    shutdown_button: Button,
64}
65
66/// A helper trait abstracting over an input handle. It facilitates keeping around type erased
67/// handles for each of the operator inputs.
68trait InputQueue<T: Timestamp> {
69    /// Accepts all available input into local queues.
70    fn accept_input(&mut self);
71
72    /// Drains all available input and empties the local queue.
73    fn drain_input(&mut self);
74
75    /// Registers a frontier notification to be delivered.
76    fn notify_progress(&mut self, upper: Antichain<T>);
77}
78
79impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
80where
81    T: Timestamp,
82    D: Container,
83    C: InputConnection<T> + 'static,
84    P: Pull<Message<T, D>> + 'static,
85{
86    fn accept_input(&mut self) {
87        let mut queue = self.queue.borrow_mut();
88        let mut new_data = false;
89        self.handle.for_each(|cap, data| {
90            new_data = true;
91            let cap = self.connection.accept(cap);
92            queue.push_back(Event::Data(cap, std::mem::take(data)));
93        });
94        if new_data {
95            if let Some(waker) = self.waker.take() {
96                waker.wake();
97            }
98        }
99    }
100
101    fn drain_input(&mut self) {
102        self.queue.borrow_mut().clear();
103        self.handle.for_each(|_, _| {});
104    }
105
106    fn notify_progress(&mut self, upper: Antichain<T>) {
107        let mut queue = self.queue.borrow_mut();
108        // It's beneficial to consolidate two consecutive progress statements into one if the
109        // operator hasn't seen the previous progress yet. This also avoids accumulation of
110        // progress statements in the queue if the operator only conditionally checks this input.
111        match queue.back_mut() {
112            Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
113            _ => queue.push_back(Event::Progress(upper)),
114        }
115        if let Some(waker) = self.waker.take() {
116            waker.wake();
117        }
118    }
119}
120
121struct InputHandleQueue<
122    T: Timestamp,
123    D: Container,
124    C: InputConnection<T>,
125    P: Pull<Message<T, D>> + 'static,
126> {
127    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
128    waker: Rc<Cell<Option<Waker>>>,
129    connection: C,
130    handle: InputHandleCore<T, D, P>,
131}
132
133/// An async Waker that activates a specific operator when woken and marks the task as ready
134struct TimelyWaker {
135    activator: SyncActivator,
136    active: AtomicBool,
137    task_ready: AtomicBool,
138}
139
140impl ArcWake for TimelyWaker {
141    fn wake_by_ref(arc_self: &Arc<Self>) {
142        arc_self.task_ready.store(true, Ordering::SeqCst);
143        // Only activate the timely operator if it's not already active to avoid an infinite loop
144        if !arc_self.active.load(Ordering::SeqCst) {
145            // We don't have any guarantees about how long the Waker will be held for and so we
146            // must be prepared for the receiving end to have hung up when we finally do get woken.
147            // This can happen if by the time the waker is called the receiving timely worker has
148            // been shutdown. For this reason we ignore the activation error.
149            let _ = arc_self.activator.activate();
150        }
151    }
152}
153
154/// Async handle to an operator's input stream
155pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
156    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
157    waker: Rc<Cell<Option<Waker>>>,
158    /// Whether this handle has finished producing data
159    done: bool,
160}
161
162impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
163    pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
164        let mut queue = self.queue.borrow_mut();
165        match queue.pop_front()? {
166            Event::Data(cap, data) => Some(Event::Data(cap, data)),
167            Event::Progress(frontier) => {
168                self.done = frontier.is_empty();
169                Some(Event::Progress(frontier))
170            }
171        }
172    }
173
174    /// Waits for the handle to have data. After this function returns it is guaranteed that at
175    /// least one call to `next_sync` will be `Some(_)`.
176    pub async fn ready(&self) {
177        std::future::poll_fn(|cx| self.poll_ready(cx)).await
178    }
179
180    fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
181        if self.queue.borrow().is_empty() {
182            self.waker.set(Some(cx.waker().clone()));
183            Poll::Pending
184        } else {
185            Poll::Ready(())
186        }
187    }
188}
189
190impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
191    type Item = Event<T, C::Capability, D>;
192
193    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
194        if self.done {
195            return Poll::Ready(None);
196        }
197        ready!(self.poll_ready(cx));
198        Poll::Ready(self.next_sync())
199    }
200
201    fn size_hint(&self) -> (usize, Option<usize>) {
202        (self.queue.borrow().len(), None)
203    }
204}
205
206/// An event of an input stream
207#[derive(Debug)]
208pub enum Event<T: Timestamp, C, D> {
209    /// A data event
210    Data(C, D),
211    /// A progress event
212    Progress(Antichain<T>),
213}
214
215/// Shared part of an async output handle
216struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
217    /// Handle to write to the output stream.
218    output: Output<T, CB::Container>,
219    /// Current capability held by this output handle.
220    capability: Option<Capability<T>>,
221    /// Container builder to accumulate data before sending at `capability`.
222    builder: CB,
223}
224
225impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
226    /// Write all pending data to the output stream.
227    fn flush(&mut self) {
228        while let Some(container) = self.builder.finish() {
229            self.output
230                .give(self.capability.as_ref().expect("must exist"), container);
231        }
232    }
233
234    /// Cease this output handle, flushing all pending data and releasing its capability.
235    fn cease(&mut self) {
236        self.flush();
237        let _ = self.output.activate();
238        self.capability = None;
239    }
240
241    /// Provides data at the time specified by the capability. Flushes automatically when the
242    /// capability time changes.
243    fn give<D>(&mut self, cap: &Capability<T>, data: D)
244    where
245        CB: PushInto<D>,
246    {
247        if let Some(capability) = &self.capability
248            && cap.time() != capability.time()
249        {
250            self.flush();
251            self.capability = None;
252        }
253        if self.capability.is_none() {
254            self.capability = Some(cap.clone());
255        }
256
257        self.builder.push_into(data);
258        while let Some(container) = self.builder.extract() {
259            self.output
260                .give(self.capability.as_ref().expect("must exist"), container);
261        }
262    }
263}
264
265pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
266    inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
267    index: usize,
268}
269
270impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
271where
272    T: Timestamp,
273    C: Container + Clone + 'static,
274{
275    #[inline]
276    pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
277        let mut inner = self.inner.borrow_mut();
278        inner.flush();
279        inner.output.give(cap, container);
280    }
281}
282
283impl<T, CB> AsyncOutputHandle<T, CB>
284where
285    T: Timestamp,
286    CB: ContainerBuilder,
287{
288    fn new(output: Output<T, CB::Container>, index: usize) -> Self {
289        let inner = AsyncOutputHandleInner {
290            output,
291            capability: None,
292            builder: CB::default(),
293        };
294        Self {
295            inner: Rc::new(RefCell::new(inner)),
296            index,
297        }
298    }
299
300    fn cease(&self) {
301        self.inner.borrow_mut().cease();
302    }
303}
304
305impl<T, CB> AsyncOutputHandle<T, CB>
306where
307    T: Timestamp,
308    CB: ContainerBuilder,
309{
310    pub fn give<D>(&self, cap: &Capability<T>, data: D)
311    where
312        CB: PushInto<D>,
313    {
314        self.inner.borrow_mut().give(cap, data);
315    }
316}
317
318impl<T, D>
319    AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<ColumnationStack<D>>>>
320where
321    D: Clone + 'static + Columnation,
322    T: Timestamp,
323{
324    pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
325
326    /// Provides one record at the time specified by the capability. This method will automatically
327    /// yield back to timely after [Self::MAX_OUTSTANDING_BYTES] have been produced.
328    pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
329    where
330        ColumnationStack<D>: PushInto<D2>,
331    {
332        let should_yield = {
333            let mut handle = self.inner.borrow_mut();
334            handle.give(cap, data);
335            let should_yield = handle.builder.bytes.get() > Self::MAX_OUTSTANDING_BYTES;
336            if should_yield {
337                handle.builder.bytes.set(0);
338            }
339            should_yield
340        };
341        if should_yield {
342            tokio::task::yield_now().await;
343        }
344    }
345}
346
347impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
348    fn clone(&self) -> Self {
349        Self {
350            inner: Rc::clone(&self.inner),
351            index: self.index,
352        }
353    }
354}
355
356/// A trait describing the connection behavior between an input of an operator and zero or more of
357/// its outputs.
358pub trait InputConnection<T: Timestamp> {
359    /// The capability type associated with this connection behavior.
360    type Capability;
361
362    /// Generates a summary description of the connection behavior given the number of outputs.
363    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
364
365    /// Accepts an input capability.
366    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
367}
368
369/// A marker type representing a disconnected input.
370pub struct Disconnected;
371
372impl<T: Timestamp> InputConnection<T> for Disconnected {
373    type Capability = T;
374
375    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
376        vec![Antichain::new(); outputs]
377    }
378
379    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
380        input_cap.time().clone()
381    }
382}
383
384/// A marker type representing an input connected to exactly one output.
385pub struct ConnectedToOne(usize);
386
387impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
388    type Capability = Capability<T>;
389
390    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
391        let mut summary = vec![Antichain::new(); outputs];
392        summary[self.0] = Antichain::from_elem(T::Summary::default());
393        summary
394    }
395
396    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
397        input_cap.retain(self.0)
398    }
399}
400
401/// A marker type representing an input connected to many outputs.
402pub struct ConnectedToMany<const N: usize>([usize; N]);
403
404impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
405    type Capability = [Capability<T>; N];
406
407    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
408        let mut summary = vec![Antichain::new(); outputs];
409        for output in self.0 {
410            summary[output] = Antichain::from_elem(T::Summary::default());
411        }
412        summary
413    }
414
415    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
416        self.0.map(|output| input_cap.retain(output))
417    }
418}
419
420/// A helper trait abstracting over an output handle. It facilitates passing type erased
421/// output handles during operator construction.
422/// It is not meant to be implemented by users.
423pub trait OutputIndex {
424    /// The output index of this handle.
425    fn index(&self) -> usize;
426}
427
428impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
429    fn index(&self) -> usize {
430        self.index
431    }
432}
433
434impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
435    /// Allocates a new generic async operator builder from its containing scope.
436    pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
437        let builder = OperatorBuilderRc::new(name, scope);
438        let info = builder.operator_info();
439        let activator = scope.activator_for(Rc::clone(&info.address));
440        let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
441        let operator_waker = TimelyWaker {
442            activator: sync_activator,
443            active: AtomicBool::new(false),
444            task_ready: AtomicBool::new(true),
445        };
446        let (shutdown_handle, shutdown_button) = button(scope, info.address);
447
448        OperatorBuilder {
449            builder,
450            activator,
451            operator_waker: Arc::new(operator_waker),
452            input_frontiers: Default::default(),
453            input_queues: Default::default(),
454            output_flushes: Default::default(),
455            shutdown_handle,
456            shutdown_button,
457        }
458    }
459
460    /// Adds a new input that is connected to the specified output, returning the async input handle to use.
461    pub fn new_input_for<D, P>(
462        &mut self,
463        stream: TimelyStream<'scope, T, D>,
464        pact: P,
465        output: &dyn OutputIndex,
466    ) -> AsyncInputHandle<T, D, ConnectedToOne>
467    where
468        D: Container + Clone + 'static,
469        P: ParallelizationContract<T, D>,
470    {
471        let index = output.index();
472        assert!(index < self.builder.shape().outputs());
473        self.new_input_connection(stream, pact, ConnectedToOne(index))
474    }
475
476    /// Adds a new input that is connected to the specified outputs, returning the async input handle to use.
477    pub fn new_input_for_many<const N: usize, D, P>(
478        &mut self,
479        stream: TimelyStream<'scope, T, D>,
480        pact: P,
481        outputs: [&dyn OutputIndex; N],
482    ) -> AsyncInputHandle<T, D, ConnectedToMany<N>>
483    where
484        D: Container + Clone + 'static,
485        P: ParallelizationContract<T, D>,
486    {
487        let indices = outputs.map(|output| output.index());
488        for index in indices {
489            assert!(index < self.builder.shape().outputs());
490        }
491        self.new_input_connection(stream, pact, ConnectedToMany(indices))
492    }
493
494    /// Adds a new input that is not connected to any output, returning the async input handle to use.
495    pub fn new_disconnected_input<D, P>(
496        &mut self,
497        stream: TimelyStream<'scope, T, D>,
498        pact: P,
499    ) -> AsyncInputHandle<T, D, Disconnected>
500    where
501        D: Container + Clone + 'static,
502        P: ParallelizationContract<T, D>,
503    {
504        self.new_input_connection(stream, pact, Disconnected)
505    }
506
507    /// Adds a new input with connection information, returning the async input handle to use.
508    pub fn new_input_connection<D, P, C>(
509        &mut self,
510        stream: TimelyStream<'scope, T, D>,
511        pact: P,
512        connection: C,
513    ) -> AsyncInputHandle<T, D, C>
514    where
515        D: Container + Clone + 'static,
516        P: ParallelizationContract<T, D>,
517        C: InputConnection<T> + 'static,
518    {
519        self.input_frontiers
520            .push(Antichain::from_elem(T::minimum()));
521
522        let outputs = self.builder.shape().outputs();
523        let handle = self.builder.new_input_connection(
524            stream,
525            pact,
526            connection.describe(outputs).into_iter().enumerate(),
527        );
528
529        let waker = Default::default();
530        let queue = Default::default();
531        let input_queue = InputHandleQueue {
532            queue: Rc::clone(&queue),
533            waker: Rc::clone(&waker),
534            connection,
535            handle,
536        };
537        self.input_queues.push(Box::new(input_queue));
538
539        AsyncInputHandle {
540            queue,
541            waker,
542            done: false,
543        }
544    }
545
546    /// Adds a new output, returning the output handle and stream.
547    pub fn new_output<CB: ContainerBuilder>(
548        &mut self,
549    ) -> (
550        AsyncOutputHandle<T, CB>,
551        TimelyStream<'scope, T, CB::Container>,
552    ) {
553        let index = self.builder.shape().outputs();
554
555        let (output, stream) = self.builder.new_output_connection([]);
556
557        let handle = AsyncOutputHandle::new(output, index);
558
559        let flush_handle = handle.clone();
560        self.output_flushes
561            .push(Box::new(move || flush_handle.cease()));
562
563        (handle, stream)
564    }
565
566    /// Creates an operator implementation from supplied logic constructor. It returns a shutdown
567    /// button that when pressed it will cause the logic future to be dropped and input handles to
568    /// be drained. The button can be converted into a token by using
569    /// [`Button::press_on_drop`]
570    pub fn build<B, L>(self, constructor: B) -> Button
571    where
572        B: FnOnce(Vec<Capability<T>>) -> L,
573        L: Future + 'static,
574    {
575        let operator_waker = self.operator_waker;
576        let mut input_frontiers = self.input_frontiers;
577        let mut input_queues = self.input_queues;
578        let mut output_flushes = self.output_flushes;
579        let mut shutdown_handle = self.shutdown_handle;
580        self.builder.build_reschedule(move |caps| {
581            let mut logic_fut = Some(Box::pin(constructor(caps)));
582            move |new_frontiers| {
583                operator_waker.active.store(true, Ordering::SeqCst);
584                for (i, queue) in input_queues.iter_mut().enumerate() {
585                    // First, discover if there are any frontier notifications
586                    let cur = &mut input_frontiers[i];
587                    let new = new_frontiers[i].frontier();
588                    if PartialOrder::less_than(&cur.borrow(), &new) {
589                        queue.notify_progress(new.to_owned());
590                        *cur = new.to_owned();
591                    }
592                    // Then accept all input into local queues. This step registers the received
593                    // messages with progress tracking.
594                    queue.accept_input();
595                }
596                operator_waker.active.store(false, Ordering::SeqCst);
597
598                // If our worker pressed the button we stop scheduling the logic future and/or
599                // draining the input handles to stop producing data and frontier updates
600                // downstream.
601                if shutdown_handle.local_pressed() {
602                    // When all workers press their buttons we drop the logic future and start
603                    // draining the input handles.
604                    if shutdown_handle.all_pressed() {
605                        logic_fut = None;
606                        for queue in input_queues.iter_mut() {
607                            queue.drain_input();
608                        }
609                        false
610                    } else {
611                        true
612                    }
613                } else {
614                    // Schedule the logic future if any of the wakers above marked the task as ready
615                    if let Some(fut) = logic_fut.as_mut() {
616                        if operator_waker.task_ready.load(Ordering::SeqCst) {
617                            let waker = futures_util::task::waker_ref(&operator_waker);
618                            let mut cx = Context::from_waker(&waker);
619                            operator_waker.task_ready.store(false, Ordering::SeqCst);
620                            if Pin::new(fut).poll(&mut cx).is_ready() {
621                                // We're done with logic so deallocate the task
622                                logic_fut = None;
623                            }
624                            // Flush all the outputs before exiting
625                            for flush in output_flushes.iter_mut() {
626                                (flush)();
627                            }
628                        }
629                    }
630
631                    // The timely operator needs to be kept alive if the task is pending
632                    if logic_fut.is_some() {
633                        true
634                    } else {
635                        // Othewise we should keep draining all inputs
636                        for queue in input_queues.iter_mut() {
637                            queue.drain_input();
638                        }
639                        false
640                    }
641                }
642            }
643        });
644
645        self.shutdown_button
646    }
647
648    /// Creates a fallible operator implementation from supplied logic constructor. If the `Future`
649    /// resolves to an error it will be emitted in the returned error stream and then the operator
650    /// will wait indefinitely until the shutdown button is pressed.
651    ///
652    /// # Capability handling
653    ///
654    /// Unlike [`OperatorBuilder::build`], this method does not give owned capabilities to the
655    /// constructor. All initial capabilities are wrapped in a `CapabilitySet` and a mutable
656    /// reference to them is given instead. This is done to avoid storing owned capabilities in the
657    /// state of the logic future which would make using the `?` operator unsafe, since the
658    /// frontiers would incorrectly advance, potentially causing incorrect actions downstream.
659    ///
660    /// ```ignore
661    /// builder.build_fallible(|caps| Box::pin(async move {
662    ///     // Assert that we have the number of capabilities we expect
663    ///     // `cap` will be a `&mut Option<Capability<T>>`:
664    ///     let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
665    ///
666    ///     // Using cap to send data:
667    ///     output.give(&cap_set[0], 42);
668    ///
669    ///     // Using cap_set to downgrade it:
670    ///     cap_set.downgrade([]);
671    ///
672    ///     // Explicitly dropping the capability:
673    ///     // Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
674    ///     *cap_set = CapabilitySet::new();
675    ///
676    ///     // !! BIG WARNING !!:
677    ///     // It is tempting to `take` the capability out of the set for convenience. This will
678    ///     // move the capability into the future state, tying its lifetime to it, which will get
679    ///     // dropped when an error is hit, causing incorrect progress statements.
680    ///     let cap = cap_set.delayed(&Timestamp::minimum());
681    ///     *cap_set = CapabilitySet::new(); // DO NOT DO THIS
682    /// }));
683    /// ```
684    pub fn build_fallible<E: 'static, F>(
685        mut self,
686        constructor: F,
687    ) -> (Button, StreamVec<'scope, T, Rc<E>>)
688    where
689        F: for<'a> FnOnce(
690                &'a mut [CapabilitySet<T>],
691            ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
692            + 'static,
693    {
694        // Create a new completely disconnected output
695        let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
696        let button = self.build(|mut caps| async move {
697            let error_cap = caps.pop().unwrap();
698            let mut caps = caps
699                .into_iter()
700                .map(CapabilitySet::from_elem)
701                .collect::<Vec<_>>();
702            if let Err(err) = constructor(&mut *caps).await {
703                error_output.give(&error_cap, Rc::new(err));
704                drop(error_cap);
705                // IMPORTANT: wedge this operator until the button is pressed. Returning would drop
706                // the capabilities and could produce incorrect progress statements.
707                std::future::pending().await
708            }
709        });
710        (button, error_stream)
711    }
712
713    /// Creates operator info for the operator.
714    pub fn operator_info(&self) -> OperatorInfo {
715        self.builder.operator_info()
716    }
717
718    /// Returns the activator for the operator.
719    pub fn activator(&self) -> &Activator {
720        &self.activator
721    }
722}
723
724/// Creates a new coordinated button the worker configuration described by `scope`.
725pub fn button<'scope, T: Timestamp>(
726    scope: Scope<'scope, T>,
727    addr: Rc<[usize]>,
728) -> (ButtonHandle, Button) {
729    let index = scope.worker().new_identifier();
730    let (pushers, puller) = scope.worker().allocate(index, addr);
731
732    let local_pressed = Rc::new(Cell::new(false));
733
734    let handle = ButtonHandle {
735        buttons_remaining: scope.peers(),
736        local_pressed: Rc::clone(&local_pressed),
737        puller,
738    };
739
740    let token = Button {
741        pushers,
742        local_pressed,
743    };
744
745    (handle, token)
746}
747
748/// A button that can be used to coordinate an action after all workers have pressed it.
749pub struct ButtonHandle {
750    /// The number of buttons still unpressed among workers.
751    buttons_remaining: usize,
752    /// A flag indicating whether this worker has pressed its button.
753    local_pressed: Rc<Cell<bool>>,
754    puller: Box<dyn Pull<Bincode<bool>>>,
755}
756
757impl ButtonHandle {
758    /// Returns whether this worker has pressed its button.
759    pub fn local_pressed(&self) -> bool {
760        self.local_pressed.get()
761    }
762
763    /// Returns whether all workers have pressed their buttons.
764    pub fn all_pressed(&mut self) -> bool {
765        while self.puller.recv().is_some() {
766            self.buttons_remaining -= 1;
767        }
768        self.buttons_remaining == 0
769    }
770}
771
772pub struct Button {
773    pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
774    local_pressed: Rc<Cell<bool>>,
775}
776
777impl Button {
778    /// Presses the button. It is safe to call this function multiple times.
779    pub fn press(&mut self) {
780        for mut pusher in self.pushers.drain(..) {
781            pusher.send(Bincode::from(true));
782            pusher.done();
783        }
784        self.local_pressed.set(true);
785    }
786
787    /// Converts this button into a deadman's switch that will automatically press the button when
788    /// dropped.
789    pub fn press_on_drop(self) -> PressOnDropButton {
790        PressOnDropButton(self)
791    }
792}
793
794pub struct PressOnDropButton(Button);
795
796impl Drop for PressOnDropButton {
797    fn drop(&mut self) {
798        self.0.press();
799    }
800}
801
802#[cfg(test)]
803mod test {
804    use futures_util::StreamExt;
805    use timely::WorkerConfig;
806    use timely::dataflow::channels::pact::Pipeline;
807    use timely::dataflow::operators::Capture;
808    use timely::dataflow::operators::capture::Extract;
809    use timely::dataflow::operators::vec::ToStream;
810
811    use super::*;
812
813    #[mz_ore::test]
814    fn async_operator() {
815        let capture = timely::example(|scope| {
816            let input = (0..10).to_stream(scope);
817
818            let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
819            let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
820            let mut input_handle = op.new_input_for(input, Pipeline, &output);
821
822            op.build(move |_capabilities| async move {
823                tokio::task::yield_now().await;
824                while let Some(event) = input_handle.next().await {
825                    match event {
826                        Event::Data(cap, data) => {
827                            for item in data.iter().copied() {
828                                tokio::task::yield_now().await;
829                                output.give(&cap, item);
830                            }
831                        }
832                        Event::Progress(_frontier) => {}
833                    }
834                }
835            });
836
837            output_stream.capture()
838        });
839        let extracted = capture.extract();
840
841        assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
842    }
843
844    #[mz_ore::test]
845    fn gh_18837() {
846        let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
847        timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
848            let index = worker.index();
849            let tokens = worker.dataflow::<u64, _, _>(move |scope| {
850                let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
851                let (_output, output_stream) =
852                    producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
853                let producer_button = producer.build(move |mut capabilities| async move {
854                    let mut cap = capabilities.pop().unwrap();
855                    if index != 0 {
856                        return;
857                    }
858                    // Worker 0 downgrades to 1 and keeps the capability around forever
859                    cap.downgrade(&1);
860                    std::future::pending().await
861                });
862
863                let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
864                let mut input_handle = consumer.new_disconnected_input(output_stream, Pipeline);
865                let consumer_button = consumer.build(move |_| async move {
866                    while let Some(event) = input_handle.next().await {
867                        if let Event::Progress(frontier) = event {
868                            // We should never observe a frontier greater than [1]
869                            assert!(frontier.less_equal(&1));
870                        }
871                    }
872                });
873
874                (
875                    producer_button.press_on_drop(),
876                    consumer_button.press_on_drop(),
877                )
878            });
879
880            // Run dataflow until only worker 0 holds the frontier to [1]
881            for _ in 0..100 {
882                worker.step();
883            }
884            // Then drop the tokens of worker 0
885            if index == 0 {
886                drop(tokens)
887            }
888            // And step the dataflow some more to ensure consumers don't observe frontiers advancing.
889            for _ in 0..100 {
890                worker.step();
891            }
892        })
893        .expect("timely panicked");
894    }
895}