mz_timely_util/
builder_async.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types to build async operators with general shapes.
17
18use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use differential_dataflow::containers::{Columnation, TimelyStack};
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Output;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
37use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
38use timely::dataflow::{Scope, StreamCore};
39use timely::progress::{Antichain, Timestamp};
40use timely::scheduling::{Activator, SyncActivator};
41use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
42
43use crate::containers::stack::AccountedStackBuilder;
44
45/// Builds async operators with generic shape.
46pub struct OperatorBuilder<G: Scope> {
47    builder: OperatorBuilderRc<G>,
48    /// The activator for this operator
49    activator: Activator,
50    /// The waker set up to activate this timely operator when woken
51    operator_waker: Arc<TimelyWaker>,
52    /// The currently known upper frontier of each of the input handles.
53    input_frontiers: Vec<Antichain<G::Timestamp>>,
54    /// Input queues for each of the declared inputs of the operator.
55    input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
56    /// Holds type erased closures that flush an output handle when called. These handles will be
57    /// automatically drained when the operator is scheduled after the logic future has been polled
58    output_flushes: Vec<Box<dyn FnMut()>>,
59    /// A handle to check whether all workers have pressed the shutdown button.
60    shutdown_handle: ButtonHandle,
61    /// A button to coordinate shutdown of this operator among workers.
62    shutdown_button: Button,
63}
64
65/// A helper trait abstracting over an input handle. It facilitates keeping around type erased
66/// handles for each of the operator inputs.
67trait InputQueue<T: Timestamp> {
68    /// Accepts all available input into local queues.
69    fn accept_input(&mut self);
70
71    /// Drains all available input and empties the local queue.
72    fn drain_input(&mut self);
73
74    /// Registers a frontier notification to be delivered.
75    fn notify_progress(&mut self, upper: Antichain<T>);
76}
77
78impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
79where
80    T: Timestamp,
81    D: Container,
82    C: InputConnection<T> + 'static,
83    P: Pull<Message<T, D>> + 'static,
84{
85    fn accept_input(&mut self) {
86        let mut queue = self.queue.borrow_mut();
87        let mut new_data = false;
88        while let Some((cap, data)) = self.handle.next() {
89            new_data = true;
90            let cap = self.connection.accept(cap);
91            queue.push_back(Event::Data(cap, std::mem::take(data)));
92        }
93        if new_data {
94            if let Some(waker) = self.waker.take() {
95                waker.wake();
96            }
97        }
98    }
99
100    fn drain_input(&mut self) {
101        self.queue.borrow_mut().clear();
102        self.handle.for_each(|_, _| {});
103    }
104
105    fn notify_progress(&mut self, upper: Antichain<T>) {
106        let mut queue = self.queue.borrow_mut();
107        // It's beneficial to consolidate two consecutive progress statements into one if the
108        // operator hasn't seen the previous progress yet. This also avoids accumulation of
109        // progress statements in the queue if the operator only conditionally checks this input.
110        match queue.back_mut() {
111            Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
112            _ => queue.push_back(Event::Progress(upper)),
113        }
114        if let Some(waker) = self.waker.take() {
115            waker.wake();
116        }
117    }
118}
119
120struct InputHandleQueue<
121    T: Timestamp,
122    D: Container,
123    C: InputConnection<T>,
124    P: Pull<Message<T, D>> + 'static,
125> {
126    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
127    waker: Rc<Cell<Option<Waker>>>,
128    connection: C,
129    handle: InputHandleCore<T, D, P>,
130}
131
132/// An async Waker that activates a specific operator when woken and marks the task as ready
133struct TimelyWaker {
134    activator: SyncActivator,
135    active: AtomicBool,
136    task_ready: AtomicBool,
137}
138
139impl ArcWake for TimelyWaker {
140    fn wake_by_ref(arc_self: &Arc<Self>) {
141        arc_self.task_ready.store(true, Ordering::SeqCst);
142        // Only activate the timely operator if it's not already active to avoid an infinite loop
143        if !arc_self.active.load(Ordering::SeqCst) {
144            // We don't have any guarantees about how long the Waker will be held for and so we
145            // must be prepared for the receiving end to have hung up when we finally do get woken.
146            // This can happen if by the time the waker is called the receiving timely worker has
147            // been shutdown. For this reason we ignore the activation error.
148            let _ = arc_self.activator.activate();
149        }
150    }
151}
152
153/// Async handle to an operator's input stream
154pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
155    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
156    waker: Rc<Cell<Option<Waker>>>,
157    /// Whether this handle has finished producing data
158    done: bool,
159}
160
161impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
162    pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
163        let mut queue = self.queue.borrow_mut();
164        match queue.pop_front()? {
165            Event::Data(cap, data) => Some(Event::Data(cap, data)),
166            Event::Progress(frontier) => {
167                self.done = frontier.is_empty();
168                Some(Event::Progress(frontier))
169            }
170        }
171    }
172
173    /// Waits for the handle to have data. After this function returns it is guaranteed that at
174    /// least one call to `next_sync` will be `Some(_)`.
175    pub async fn ready(&self) {
176        std::future::poll_fn(|cx| self.poll_ready(cx)).await
177    }
178
179    fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
180        if self.queue.borrow().is_empty() {
181            self.waker.set(Some(cx.waker().clone()));
182            Poll::Pending
183        } else {
184            Poll::Ready(())
185        }
186    }
187}
188
189impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
190    type Item = Event<T, C::Capability, D>;
191
192    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193        if self.done {
194            return Poll::Ready(None);
195        }
196        ready!(self.poll_ready(cx));
197        Poll::Ready(self.next_sync())
198    }
199
200    fn size_hint(&self) -> (usize, Option<usize>) {
201        (self.queue.borrow().len(), None)
202    }
203}
204
205/// An event of an input stream
206#[derive(Debug)]
207pub enum Event<T: Timestamp, C, D> {
208    /// A data event
209    Data(C, D),
210    /// A progress event
211    Progress(Antichain<T>),
212}
213
214/// Shared part of an async output handle
215struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
216    /// Handle to write to the output stream.
217    output: Output<T, CB::Container>,
218    /// Current capability held by this output handle.
219    capability: Option<Capability<T>>,
220    /// Container builder to accumulate data before sending at `capability`.
221    builder: CB,
222}
223
224impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
225    /// Write all pending data to the output stream.
226    fn flush(&mut self) {
227        while let Some(container) = self.builder.finish() {
228            self.output
229                .give(self.capability.as_ref().expect("must exist"), container);
230        }
231    }
232
233    /// Cease this output handle, flushing all pending data and releasing its capability.
234    fn cease(&mut self) {
235        self.flush();
236        let _ = self.output.activate();
237        self.capability = None;
238    }
239
240    /// Provides data at the time specified by the capability. Flushes automatically when the
241    /// capability time changes.
242    fn give<D>(&mut self, cap: &Capability<T>, data: D)
243    where
244        CB: PushInto<D>,
245    {
246        if let Some(capability) = &self.capability
247            && cap.time() != capability.time()
248        {
249            self.flush();
250            self.capability = None;
251        }
252        if self.capability.is_none() {
253            self.capability = Some(cap.clone());
254        }
255
256        self.builder.push_into(data);
257        while let Some(container) = self.builder.extract() {
258            self.output
259                .give(self.capability.as_ref().expect("must exist"), container);
260        }
261    }
262}
263
264pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
265    inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
266    index: usize,
267}
268
269impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
270where
271    T: Timestamp,
272    C: Container + Clone + 'static,
273{
274    #[inline]
275    pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
276        let mut inner = self.inner.borrow_mut();
277        inner.flush();
278        inner.output.give(cap, container);
279    }
280}
281
282impl<T, CB> AsyncOutputHandle<T, CB>
283where
284    T: Timestamp,
285    CB: ContainerBuilder,
286{
287    fn new(output: Output<T, CB::Container>, index: usize) -> Self {
288        let inner = AsyncOutputHandleInner {
289            output,
290            capability: None,
291            builder: CB::default(),
292        };
293        Self {
294            inner: Rc::new(RefCell::new(inner)),
295            index,
296        }
297    }
298
299    fn cease(&self) {
300        self.inner.borrow_mut().cease();
301    }
302}
303
304impl<T, CB> AsyncOutputHandle<T, CB>
305where
306    T: Timestamp,
307    CB: ContainerBuilder,
308{
309    pub fn give<D>(&self, cap: &Capability<T>, data: D)
310    where
311        CB: PushInto<D>,
312    {
313        self.inner.borrow_mut().give(cap, data);
314    }
315}
316
317impl<T, D> AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<D>>>>
318where
319    D: timely::Data + Columnation,
320    T: Timestamp,
321{
322    pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
323
324    /// Provides one record at the time specified by the capability. This method will automatically
325    /// yield back to timely after [Self::MAX_OUTSTANDING_BYTES] have been produced.
326    pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
327    where
328        TimelyStack<D>: PushInto<D2>,
329    {
330        let should_yield = {
331            let mut handle = self.inner.borrow_mut();
332            handle.give(cap, data);
333            let should_yield = handle.builder.bytes.get() > Self::MAX_OUTSTANDING_BYTES;
334            if should_yield {
335                handle.builder.bytes.set(0);
336            }
337            should_yield
338        };
339        if should_yield {
340            tokio::task::yield_now().await;
341        }
342    }
343}
344
345impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
346    fn clone(&self) -> Self {
347        Self {
348            inner: Rc::clone(&self.inner),
349            index: self.index,
350        }
351    }
352}
353
354/// A trait describing the connection behavior between an input of an operator and zero or more of
355/// its outputs.
356pub trait InputConnection<T: Timestamp> {
357    /// The capability type associated with this connection behavior.
358    type Capability;
359
360    /// Generates a summary description of the connection behavior given the number of outputs.
361    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
362
363    /// Accepts an input capability.
364    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
365}
366
367/// A marker type representing a disconnected input.
368pub struct Disconnected;
369
370impl<T: Timestamp> InputConnection<T> for Disconnected {
371    type Capability = T;
372
373    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
374        vec![Antichain::new(); outputs]
375    }
376
377    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
378        input_cap.time().clone()
379    }
380}
381
382/// A marker type representing an input connected to exactly one output.
383pub struct ConnectedToOne(usize);
384
385impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
386    type Capability = Capability<T>;
387
388    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
389        let mut summary = vec![Antichain::new(); outputs];
390        summary[self.0] = Antichain::from_elem(T::Summary::default());
391        summary
392    }
393
394    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
395        input_cap.retain_for_output(self.0)
396    }
397}
398
399/// A marker type representing an input connected to many outputs.
400pub struct ConnectedToMany<const N: usize>([usize; N]);
401
402impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
403    type Capability = [Capability<T>; N];
404
405    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
406        let mut summary = vec![Antichain::new(); outputs];
407        for output in self.0 {
408            summary[output] = Antichain::from_elem(T::Summary::default());
409        }
410        summary
411    }
412
413    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
414        self.0
415            .map(|output| input_cap.delayed_for_output(input_cap.time(), output))
416    }
417}
418
419/// A helper trait abstracting over an output handle. It facilitates passing type erased
420/// output handles during operator construction.
421/// It is not meant to be implemented by users.
422pub trait OutputIndex {
423    /// The output index of this handle.
424    fn index(&self) -> usize;
425}
426
427impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
428    fn index(&self) -> usize {
429        self.index
430    }
431}
432
433impl<G: Scope> OperatorBuilder<G> {
434    /// Allocates a new generic async operator builder from its containing scope.
435    pub fn new(name: String, mut scope: G) -> Self {
436        let builder = OperatorBuilderRc::new(name, scope.clone());
437        let info = builder.operator_info();
438        let activator = scope.activator_for(Rc::clone(&info.address));
439        let sync_activator = scope.sync_activator_for(info.address.to_vec());
440        let operator_waker = TimelyWaker {
441            activator: sync_activator,
442            active: AtomicBool::new(false),
443            task_ready: AtomicBool::new(true),
444        };
445        let (shutdown_handle, shutdown_button) = button(&mut scope, info.address);
446
447        OperatorBuilder {
448            builder,
449            activator,
450            operator_waker: Arc::new(operator_waker),
451            input_frontiers: Default::default(),
452            input_queues: Default::default(),
453            output_flushes: Default::default(),
454            shutdown_handle,
455            shutdown_button,
456        }
457    }
458
459    /// Adds a new input that is connected to the specified output, returning the async input handle to use.
460    pub fn new_input_for<D, P>(
461        &mut self,
462        stream: &StreamCore<G, D>,
463        pact: P,
464        output: &dyn OutputIndex,
465    ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
466    where
467        D: Container + 'static,
468        P: ParallelizationContract<G::Timestamp, D>,
469    {
470        let index = output.index();
471        assert!(index < self.builder.shape().outputs());
472        self.new_input_connection(stream, pact, ConnectedToOne(index))
473    }
474
475    /// Adds a new input that is connected to the specified outputs, returning the async input handle to use.
476    pub fn new_input_for_many<const N: usize, D, P>(
477        &mut self,
478        stream: &StreamCore<G, D>,
479        pact: P,
480        outputs: [&dyn OutputIndex; N],
481    ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
482    where
483        D: Container + 'static,
484        P: ParallelizationContract<G::Timestamp, D>,
485    {
486        let indices = outputs.map(|output| output.index());
487        for index in indices {
488            assert!(index < self.builder.shape().outputs());
489        }
490        self.new_input_connection(stream, pact, ConnectedToMany(indices))
491    }
492
493    /// Adds a new input that is not connected to any output, returning the async input handle to use.
494    pub fn new_disconnected_input<D, P>(
495        &mut self,
496        stream: &StreamCore<G, D>,
497        pact: P,
498    ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
499    where
500        D: Container + 'static,
501        P: ParallelizationContract<G::Timestamp, D>,
502    {
503        self.new_input_connection(stream, pact, Disconnected)
504    }
505
506    /// Adds a new input with connection information, returning the async input handle to use.
507    pub fn new_input_connection<D, P, C>(
508        &mut self,
509        stream: &StreamCore<G, D>,
510        pact: P,
511        connection: C,
512    ) -> AsyncInputHandle<G::Timestamp, D, C>
513    where
514        D: Container + 'static,
515        P: ParallelizationContract<G::Timestamp, D>,
516        C: InputConnection<G::Timestamp> + 'static,
517    {
518        self.input_frontiers
519            .push(Antichain::from_elem(G::Timestamp::minimum()));
520
521        let outputs = self.builder.shape().outputs();
522        let handle = self.builder.new_input_connection(
523            stream,
524            pact,
525            connection.describe(outputs).into_iter().enumerate(),
526        );
527
528        let waker = Default::default();
529        let queue = Default::default();
530        let input_queue = InputHandleQueue {
531            queue: Rc::clone(&queue),
532            waker: Rc::clone(&waker),
533            connection,
534            handle,
535        };
536        self.input_queues.push(Box::new(input_queue));
537
538        AsyncInputHandle {
539            queue,
540            waker,
541            done: false,
542        }
543    }
544
545    /// Adds a new output, returning the output handle and stream.
546    pub fn new_output<CB: ContainerBuilder>(
547        &mut self,
548    ) -> (
549        AsyncOutputHandle<G::Timestamp, CB>,
550        StreamCore<G, CB::Container>,
551    ) {
552        let index = self.builder.shape().outputs();
553
554        let (output, stream) = self.builder.new_output_connection([]);
555
556        let handle = AsyncOutputHandle::new(output, index);
557
558        let flush_handle = handle.clone();
559        self.output_flushes
560            .push(Box::new(move || flush_handle.cease()));
561
562        (handle, stream)
563    }
564
565    /// Creates an operator implementation from supplied logic constructor. It returns a shutdown
566    /// button that when pressed it will cause the logic future to be dropped and input handles to
567    /// be drained. The button can be converted into a token by using
568    /// [`Button::press_on_drop`]
569    pub fn build<B, L>(self, constructor: B) -> Button
570    where
571        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
572        L: Future + 'static,
573    {
574        let operator_waker = self.operator_waker;
575        let mut input_frontiers = self.input_frontiers;
576        let mut input_queues = self.input_queues;
577        let mut output_flushes = self.output_flushes;
578        let mut shutdown_handle = self.shutdown_handle;
579        self.builder.build_reschedule(move |caps| {
580            let mut logic_fut = Some(Box::pin(constructor(caps)));
581            move |new_frontiers| {
582                operator_waker.active.store(true, Ordering::SeqCst);
583                for (i, queue) in input_queues.iter_mut().enumerate() {
584                    // First, discover if there are any frontier notifications
585                    let cur = &mut input_frontiers[i];
586                    let new = new_frontiers[i].frontier();
587                    if PartialOrder::less_than(&cur.borrow(), &new) {
588                        queue.notify_progress(new.to_owned());
589                        *cur = new.to_owned();
590                    }
591                    // Then accept all input into local queues. This step registers the received
592                    // messages with progress tracking.
593                    queue.accept_input();
594                }
595                operator_waker.active.store(false, Ordering::SeqCst);
596
597                // If our worker pressed the button we stop scheduling the logic future and/or
598                // draining the input handles to stop producing data and frontier updates
599                // downstream.
600                if shutdown_handle.local_pressed() {
601                    // When all workers press their buttons we drop the logic future and start
602                    // draining the input handles.
603                    if shutdown_handle.all_pressed() {
604                        logic_fut = None;
605                        for queue in input_queues.iter_mut() {
606                            queue.drain_input();
607                        }
608                        false
609                    } else {
610                        true
611                    }
612                } else {
613                    // Schedule the logic future if any of the wakers above marked the task as ready
614                    if let Some(fut) = logic_fut.as_mut() {
615                        if operator_waker.task_ready.load(Ordering::SeqCst) {
616                            let waker = futures_util::task::waker_ref(&operator_waker);
617                            let mut cx = Context::from_waker(&waker);
618                            operator_waker.task_ready.store(false, Ordering::SeqCst);
619                            if Pin::new(fut).poll(&mut cx).is_ready() {
620                                // We're done with logic so deallocate the task
621                                logic_fut = None;
622                            }
623                            // Flush all the outputs before exiting
624                            for flush in output_flushes.iter_mut() {
625                                (flush)();
626                            }
627                        }
628                    }
629
630                    // The timely operator needs to be kept alive if the task is pending
631                    if logic_fut.is_some() {
632                        true
633                    } else {
634                        // Othewise we should keep draining all inputs
635                        for queue in input_queues.iter_mut() {
636                            queue.drain_input();
637                        }
638                        false
639                    }
640                }
641            }
642        });
643
644        self.shutdown_button
645    }
646
647    /// Creates a fallible operator implementation from supplied logic constructor. If the `Future`
648    /// resolves to an error it will be emitted in the returned error stream and then the operator
649    /// will wait indefinitely until the shutdown button is pressed.
650    ///
651    /// # Capability handling
652    ///
653    /// Unlike [`OperatorBuilder::build`], this method does not give owned capabilities to the
654    /// constructor. All initial capabilities are wrapped in a `CapabilitySet` and a mutable
655    /// reference to them is given instead. This is done to avoid storing owned capabilities in the
656    /// state of the logic future which would make using the `?` operator unsafe, since the
657    /// frontiers would incorrectly advance, potentially causing incorrect actions downstream.
658    ///
659    /// ```ignore
660    /// builder.build_fallible(|caps| Box::pin(async move {
661    ///     // Assert that we have the number of capabilities we expect
662    ///     // `cap` will be a `&mut Option<Capability<T>>`:
663    ///     let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
664    ///
665    ///     // Using cap to send data:
666    ///     output.give(&cap_set[0], 42);
667    ///
668    ///     // Using cap_set to downgrade it:
669    ///     cap_set.downgrade([]);
670    ///
671    ///     // Explicitly dropping the capability:
672    ///     // Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
673    ///     *cap_set = CapabilitySet::new();
674    ///
675    ///     // !! BIG WARNING !!:
676    ///     // It is tempting to `take` the capability out of the set for convenience. This will
677    ///     // move the capability into the future state, tying its lifetime to it, which will get
678    ///     // dropped when an error is hit, causing incorrect progress statements.
679    ///     let cap = cap_set.delayed(&Timestamp::minimum());
680    ///     *cap_set = CapabilitySet::new(); // DO NOT DO THIS
681    /// }));
682    /// ```
683    pub fn build_fallible<E: 'static, F>(
684        mut self,
685        constructor: F,
686    ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
687    where
688        F: for<'a> FnOnce(
689                &'a mut [CapabilitySet<G::Timestamp>],
690            ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
691            + 'static,
692    {
693        // Create a new completely disconnected output
694        let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
695        let button = self.build(|mut caps| async move {
696            let error_cap = caps.pop().unwrap();
697            let mut caps = caps
698                .into_iter()
699                .map(CapabilitySet::from_elem)
700                .collect::<Vec<_>>();
701            if let Err(err) = constructor(&mut *caps).await {
702                error_output.give(&error_cap, Rc::new(err));
703                drop(error_cap);
704                // IMPORTANT: wedge this operator until the button is pressed. Returning would drop
705                // the capabilities and could produce incorrect progress statements.
706                std::future::pending().await
707            }
708        });
709        (button, error_stream)
710    }
711
712    /// Creates operator info for the operator.
713    pub fn operator_info(&self) -> OperatorInfo {
714        self.builder.operator_info()
715    }
716
717    /// Returns the activator for the operator.
718    pub fn activator(&self) -> &Activator {
719        &self.activator
720    }
721}
722
723/// Creates a new coordinated button the worker configuration described by `scope`.
724pub fn button<G: Scope>(scope: &mut G, addr: Rc<[usize]>) -> (ButtonHandle, Button) {
725    let index = scope.new_identifier();
726    let (pushers, puller) = scope.allocate(index, addr);
727
728    let local_pressed = Rc::new(Cell::new(false));
729
730    let handle = ButtonHandle {
731        buttons_remaining: scope.peers(),
732        local_pressed: Rc::clone(&local_pressed),
733        puller,
734    };
735
736    let token = Button {
737        pushers,
738        local_pressed,
739    };
740
741    (handle, token)
742}
743
744/// A button that can be used to coordinate an action after all workers have pressed it.
745pub struct ButtonHandle {
746    /// The number of buttons still unpressed among workers.
747    buttons_remaining: usize,
748    /// A flag indicating whether this worker has pressed its button.
749    local_pressed: Rc<Cell<bool>>,
750    puller: Box<dyn Pull<Bincode<bool>>>,
751}
752
753impl ButtonHandle {
754    /// Returns whether this worker has pressed its button.
755    pub fn local_pressed(&self) -> bool {
756        self.local_pressed.get()
757    }
758
759    /// Returns whether all workers have pressed their buttons.
760    pub fn all_pressed(&mut self) -> bool {
761        while self.puller.recv().is_some() {
762            self.buttons_remaining -= 1;
763        }
764        self.buttons_remaining == 0
765    }
766}
767
768pub struct Button {
769    pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
770    local_pressed: Rc<Cell<bool>>,
771}
772
773impl Button {
774    /// Presses the button. It is safe to call this function multiple times.
775    pub fn press(&mut self) {
776        for mut pusher in self.pushers.drain(..) {
777            pusher.send(Bincode::from(true));
778            pusher.done();
779        }
780        self.local_pressed.set(true);
781    }
782
783    /// Converts this button into a deadman's switch that will automatically press the button when
784    /// dropped.
785    pub fn press_on_drop(self) -> PressOnDropButton {
786        PressOnDropButton(self)
787    }
788}
789
790pub struct PressOnDropButton(Button);
791
792impl Drop for PressOnDropButton {
793    fn drop(&mut self) {
794        self.0.press();
795    }
796}
797
798#[cfg(test)]
799mod test {
800    use futures_util::StreamExt;
801    use timely::WorkerConfig;
802    use timely::dataflow::channels::pact::Pipeline;
803    use timely::dataflow::operators::capture::Extract;
804    use timely::dataflow::operators::{Capture, ToStream};
805
806    use super::*;
807
808    #[mz_ore::test]
809    fn async_operator() {
810        let capture = timely::example(|scope| {
811            let input = (0..10).to_stream(scope);
812
813            let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
814            let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
815            let mut input_handle = op.new_input_for(&input, Pipeline, &output);
816
817            op.build(move |_capabilities| async move {
818                tokio::task::yield_now().await;
819                while let Some(event) = input_handle.next().await {
820                    match event {
821                        Event::Data(cap, data) => {
822                            for item in data.iter().copied() {
823                                tokio::task::yield_now().await;
824                                output.give(&cap, item);
825                            }
826                        }
827                        Event::Progress(_frontier) => {}
828                    }
829                }
830            });
831
832            output_stream.capture()
833        });
834        let extracted = capture.extract();
835
836        assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
837    }
838
839    #[mz_ore::test]
840    fn gh_18837() {
841        let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
842        timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
843            let index = worker.index();
844            let tokens = worker.dataflow::<u64, _, _>(move |scope| {
845                let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
846                let (_output, output_stream) =
847                    producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
848                let producer_button = producer.build(move |mut capabilities| async move {
849                    let mut cap = capabilities.pop().unwrap();
850                    if index != 0 {
851                        return;
852                    }
853                    // Worker 0 downgrades to 1 and keeps the capability around forever
854                    cap.downgrade(&1);
855                    std::future::pending().await
856                });
857
858                let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
859                let mut input_handle = consumer.new_disconnected_input(&output_stream, Pipeline);
860                let consumer_button = consumer.build(move |_| async move {
861                    while let Some(event) = input_handle.next().await {
862                        if let Event::Progress(frontier) = event {
863                            // We should never observe a frontier greater than [1]
864                            assert!(frontier.less_equal(&1));
865                        }
866                    }
867                });
868
869                (
870                    producer_button.press_on_drop(),
871                    consumer_button.press_on_drop(),
872                )
873            });
874
875            // Run dataflow until only worker 0 holds the frontier to [1]
876            for _ in 0..100 {
877                worker.step();
878            }
879            // Then drop the tokens of worker 0
880            if index == 0 {
881                drop(tokens)
882            }
883            // And step the dataflow some more to ensure consumers don't observe frontiers advancing.
884            for _ in 0..100 {
885                worker.step();
886            }
887        })
888        .expect("timely panicked");
889    }
890}