Skip to main content

mz_timely_util/
builder_async.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types to build async operators with general shapes.
17
18use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use differential_dataflow::containers::{Columnation, TimelyStack};
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Output;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
37use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
38use timely::dataflow::{Scope, Stream as TimelyStream, StreamVec};
39use timely::progress::{Antichain, Timestamp};
40use timely::scheduling::{Activator, SyncActivator};
41use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
42
43use crate::containers::stack::AccountedStackBuilder;
44
45/// Builds async operators with generic shape.
46pub struct OperatorBuilder<G: Scope> {
47    builder: OperatorBuilderRc<G>,
48    /// The activator for this operator
49    activator: Activator,
50    /// The waker set up to activate this timely operator when woken
51    operator_waker: Arc<TimelyWaker>,
52    /// The currently known upper frontier of each of the input handles.
53    input_frontiers: Vec<Antichain<G::Timestamp>>,
54    /// Input queues for each of the declared inputs of the operator.
55    input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
56    /// Holds type erased closures that flush an output handle when called. These handles will be
57    /// automatically drained when the operator is scheduled after the logic future has been polled
58    output_flushes: Vec<Box<dyn FnMut()>>,
59    /// A handle to check whether all workers have pressed the shutdown button.
60    shutdown_handle: ButtonHandle,
61    /// A button to coordinate shutdown of this operator among workers.
62    shutdown_button: Button,
63}
64
65/// A helper trait abstracting over an input handle. It facilitates keeping around type erased
66/// handles for each of the operator inputs.
67trait InputQueue<T: Timestamp> {
68    /// Accepts all available input into local queues.
69    fn accept_input(&mut self);
70
71    /// Drains all available input and empties the local queue.
72    fn drain_input(&mut self);
73
74    /// Registers a frontier notification to be delivered.
75    fn notify_progress(&mut self, upper: Antichain<T>);
76}
77
78impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
79where
80    T: Timestamp,
81    D: Container,
82    C: InputConnection<T> + 'static,
83    P: Pull<Message<T, D>> + 'static,
84{
85    fn accept_input(&mut self) {
86        let mut queue = self.queue.borrow_mut();
87        let mut new_data = false;
88        self.handle.for_each(|cap, data| {
89            new_data = true;
90            let cap = self.connection.accept(cap);
91            queue.push_back(Event::Data(cap, std::mem::take(data)));
92        });
93        if new_data {
94            if let Some(waker) = self.waker.take() {
95                waker.wake();
96            }
97        }
98    }
99
100    fn drain_input(&mut self) {
101        self.queue.borrow_mut().clear();
102        self.handle.for_each(|_, _| {});
103    }
104
105    fn notify_progress(&mut self, upper: Antichain<T>) {
106        let mut queue = self.queue.borrow_mut();
107        // It's beneficial to consolidate two consecutive progress statements into one if the
108        // operator hasn't seen the previous progress yet. This also avoids accumulation of
109        // progress statements in the queue if the operator only conditionally checks this input.
110        match queue.back_mut() {
111            Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
112            _ => queue.push_back(Event::Progress(upper)),
113        }
114        if let Some(waker) = self.waker.take() {
115            waker.wake();
116        }
117    }
118}
119
120struct InputHandleQueue<
121    T: Timestamp,
122    D: Container,
123    C: InputConnection<T>,
124    P: Pull<Message<T, D>> + 'static,
125> {
126    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
127    waker: Rc<Cell<Option<Waker>>>,
128    connection: C,
129    handle: InputHandleCore<T, D, P>,
130}
131
132/// An async Waker that activates a specific operator when woken and marks the task as ready
133struct TimelyWaker {
134    activator: SyncActivator,
135    active: AtomicBool,
136    task_ready: AtomicBool,
137}
138
139impl ArcWake for TimelyWaker {
140    fn wake_by_ref(arc_self: &Arc<Self>) {
141        arc_self.task_ready.store(true, Ordering::SeqCst);
142        // Only activate the timely operator if it's not already active to avoid an infinite loop
143        if !arc_self.active.load(Ordering::SeqCst) {
144            // We don't have any guarantees about how long the Waker will be held for and so we
145            // must be prepared for the receiving end to have hung up when we finally do get woken.
146            // This can happen if by the time the waker is called the receiving timely worker has
147            // been shutdown. For this reason we ignore the activation error.
148            let _ = arc_self.activator.activate();
149        }
150    }
151}
152
153/// Async handle to an operator's input stream
154pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
155    queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
156    waker: Rc<Cell<Option<Waker>>>,
157    /// Whether this handle has finished producing data
158    done: bool,
159}
160
161impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
162    pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
163        let mut queue = self.queue.borrow_mut();
164        match queue.pop_front()? {
165            Event::Data(cap, data) => Some(Event::Data(cap, data)),
166            Event::Progress(frontier) => {
167                self.done = frontier.is_empty();
168                Some(Event::Progress(frontier))
169            }
170        }
171    }
172
173    /// Waits for the handle to have data. After this function returns it is guaranteed that at
174    /// least one call to `next_sync` will be `Some(_)`.
175    pub async fn ready(&self) {
176        std::future::poll_fn(|cx| self.poll_ready(cx)).await
177    }
178
179    fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
180        if self.queue.borrow().is_empty() {
181            self.waker.set(Some(cx.waker().clone()));
182            Poll::Pending
183        } else {
184            Poll::Ready(())
185        }
186    }
187}
188
189impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
190    type Item = Event<T, C::Capability, D>;
191
192    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193        if self.done {
194            return Poll::Ready(None);
195        }
196        ready!(self.poll_ready(cx));
197        Poll::Ready(self.next_sync())
198    }
199
200    fn size_hint(&self) -> (usize, Option<usize>) {
201        (self.queue.borrow().len(), None)
202    }
203}
204
205/// An event of an input stream
206#[derive(Debug)]
207pub enum Event<T: Timestamp, C, D> {
208    /// A data event
209    Data(C, D),
210    /// A progress event
211    Progress(Antichain<T>),
212}
213
214/// Shared part of an async output handle
215struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
216    /// Handle to write to the output stream.
217    output: Output<T, CB::Container>,
218    /// Current capability held by this output handle.
219    capability: Option<Capability<T>>,
220    /// Container builder to accumulate data before sending at `capability`.
221    builder: CB,
222}
223
224impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
225    /// Write all pending data to the output stream.
226    fn flush(&mut self) {
227        while let Some(container) = self.builder.finish() {
228            self.output
229                .give(self.capability.as_ref().expect("must exist"), container);
230        }
231    }
232
233    /// Cease this output handle, flushing all pending data and releasing its capability.
234    fn cease(&mut self) {
235        self.flush();
236        let _ = self.output.activate();
237        self.capability = None;
238    }
239
240    /// Provides data at the time specified by the capability. Flushes automatically when the
241    /// capability time changes.
242    fn give<D>(&mut self, cap: &Capability<T>, data: D)
243    where
244        CB: PushInto<D>,
245    {
246        if let Some(capability) = &self.capability
247            && cap.time() != capability.time()
248        {
249            self.flush();
250            self.capability = None;
251        }
252        if self.capability.is_none() {
253            self.capability = Some(cap.clone());
254        }
255
256        self.builder.push_into(data);
257        while let Some(container) = self.builder.extract() {
258            self.output
259                .give(self.capability.as_ref().expect("must exist"), container);
260        }
261    }
262}
263
264pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
265    inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
266    index: usize,
267}
268
269impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
270where
271    T: Timestamp,
272    C: Container + Clone + 'static,
273{
274    #[inline]
275    pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
276        let mut inner = self.inner.borrow_mut();
277        inner.flush();
278        inner.output.give(cap, container);
279    }
280}
281
282impl<T, CB> AsyncOutputHandle<T, CB>
283where
284    T: Timestamp,
285    CB: ContainerBuilder,
286{
287    fn new(output: Output<T, CB::Container>, index: usize) -> Self {
288        let inner = AsyncOutputHandleInner {
289            output,
290            capability: None,
291            builder: CB::default(),
292        };
293        Self {
294            inner: Rc::new(RefCell::new(inner)),
295            index,
296        }
297    }
298
299    fn cease(&self) {
300        self.inner.borrow_mut().cease();
301    }
302}
303
304impl<T, CB> AsyncOutputHandle<T, CB>
305where
306    T: Timestamp,
307    CB: ContainerBuilder,
308{
309    pub fn give<D>(&self, cap: &Capability<T>, data: D)
310    where
311        CB: PushInto<D>,
312    {
313        self.inner.borrow_mut().give(cap, data);
314    }
315}
316
317impl<T, D> AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<D>>>>
318where
319    D: Clone + 'static + Columnation,
320    T: Timestamp,
321{
322    pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
323
324    /// Provides one record at the time specified by the capability. This method will automatically
325    /// yield back to timely after [Self::MAX_OUTSTANDING_BYTES] have been produced.
326    pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
327    where
328        TimelyStack<D>: PushInto<D2>,
329    {
330        let should_yield = {
331            let mut handle = self.inner.borrow_mut();
332            handle.give(cap, data);
333            let should_yield = handle.builder.bytes.get() > Self::MAX_OUTSTANDING_BYTES;
334            if should_yield {
335                handle.builder.bytes.set(0);
336            }
337            should_yield
338        };
339        if should_yield {
340            tokio::task::yield_now().await;
341        }
342    }
343}
344
345impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
346    fn clone(&self) -> Self {
347        Self {
348            inner: Rc::clone(&self.inner),
349            index: self.index,
350        }
351    }
352}
353
354/// A trait describing the connection behavior between an input of an operator and zero or more of
355/// its outputs.
356pub trait InputConnection<T: Timestamp> {
357    /// The capability type associated with this connection behavior.
358    type Capability;
359
360    /// Generates a summary description of the connection behavior given the number of outputs.
361    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
362
363    /// Accepts an input capability.
364    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
365}
366
367/// A marker type representing a disconnected input.
368pub struct Disconnected;
369
370impl<T: Timestamp> InputConnection<T> for Disconnected {
371    type Capability = T;
372
373    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
374        vec![Antichain::new(); outputs]
375    }
376
377    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
378        input_cap.time().clone()
379    }
380}
381
382/// A marker type representing an input connected to exactly one output.
383pub struct ConnectedToOne(usize);
384
385impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
386    type Capability = Capability<T>;
387
388    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
389        let mut summary = vec![Antichain::new(); outputs];
390        summary[self.0] = Antichain::from_elem(T::Summary::default());
391        summary
392    }
393
394    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
395        input_cap.retain(self.0)
396    }
397}
398
399/// A marker type representing an input connected to many outputs.
400pub struct ConnectedToMany<const N: usize>([usize; N]);
401
402impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
403    type Capability = [Capability<T>; N];
404
405    fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
406        let mut summary = vec![Antichain::new(); outputs];
407        for output in self.0 {
408            summary[output] = Antichain::from_elem(T::Summary::default());
409        }
410        summary
411    }
412
413    fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
414        self.0.map(|output| input_cap.retain(output))
415    }
416}
417
418/// A helper trait abstracting over an output handle. It facilitates passing type erased
419/// output handles during operator construction.
420/// It is not meant to be implemented by users.
421pub trait OutputIndex {
422    /// The output index of this handle.
423    fn index(&self) -> usize;
424}
425
426impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
427    fn index(&self) -> usize {
428        self.index
429    }
430}
431
432impl<G: Scope> OperatorBuilder<G> {
433    /// Allocates a new generic async operator builder from its containing scope.
434    pub fn new(name: String, mut scope: G) -> Self {
435        let builder = OperatorBuilderRc::new(name, scope.clone());
436        let info = builder.operator_info();
437        let activator = scope.activator_for(Rc::clone(&info.address));
438        let sync_activator = scope.sync_activator_for(info.address.to_vec());
439        let operator_waker = TimelyWaker {
440            activator: sync_activator,
441            active: AtomicBool::new(false),
442            task_ready: AtomicBool::new(true),
443        };
444        let (shutdown_handle, shutdown_button) = button(&mut scope, info.address);
445
446        OperatorBuilder {
447            builder,
448            activator,
449            operator_waker: Arc::new(operator_waker),
450            input_frontiers: Default::default(),
451            input_queues: Default::default(),
452            output_flushes: Default::default(),
453            shutdown_handle,
454            shutdown_button,
455        }
456    }
457
458    /// Adds a new input that is connected to the specified output, returning the async input handle to use.
459    pub fn new_input_for<D, P>(
460        &mut self,
461        stream: TimelyStream<G, D>,
462        pact: P,
463        output: &dyn OutputIndex,
464    ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
465    where
466        D: Container + Clone + 'static,
467        P: ParallelizationContract<G::Timestamp, D>,
468    {
469        let index = output.index();
470        assert!(index < self.builder.shape().outputs());
471        self.new_input_connection(stream, pact, ConnectedToOne(index))
472    }
473
474    /// Adds a new input that is connected to the specified outputs, returning the async input handle to use.
475    pub fn new_input_for_many<const N: usize, D, P>(
476        &mut self,
477        stream: TimelyStream<G, D>,
478        pact: P,
479        outputs: [&dyn OutputIndex; N],
480    ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
481    where
482        D: Container + Clone + 'static,
483        P: ParallelizationContract<G::Timestamp, D>,
484    {
485        let indices = outputs.map(|output| output.index());
486        for index in indices {
487            assert!(index < self.builder.shape().outputs());
488        }
489        self.new_input_connection(stream, pact, ConnectedToMany(indices))
490    }
491
492    /// Adds a new input that is not connected to any output, returning the async input handle to use.
493    pub fn new_disconnected_input<D, P>(
494        &mut self,
495        stream: TimelyStream<G, D>,
496        pact: P,
497    ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
498    where
499        D: Container + Clone + 'static,
500        P: ParallelizationContract<G::Timestamp, D>,
501    {
502        self.new_input_connection(stream, pact, Disconnected)
503    }
504
505    /// Adds a new input with connection information, returning the async input handle to use.
506    pub fn new_input_connection<D, P, C>(
507        &mut self,
508        stream: TimelyStream<G, D>,
509        pact: P,
510        connection: C,
511    ) -> AsyncInputHandle<G::Timestamp, D, C>
512    where
513        D: Container + Clone + 'static,
514        P: ParallelizationContract<G::Timestamp, D>,
515        C: InputConnection<G::Timestamp> + 'static,
516    {
517        self.input_frontiers
518            .push(Antichain::from_elem(G::Timestamp::minimum()));
519
520        let outputs = self.builder.shape().outputs();
521        let handle = self.builder.new_input_connection(
522            stream,
523            pact,
524            connection.describe(outputs).into_iter().enumerate(),
525        );
526
527        let waker = Default::default();
528        let queue = Default::default();
529        let input_queue = InputHandleQueue {
530            queue: Rc::clone(&queue),
531            waker: Rc::clone(&waker),
532            connection,
533            handle,
534        };
535        self.input_queues.push(Box::new(input_queue));
536
537        AsyncInputHandle {
538            queue,
539            waker,
540            done: false,
541        }
542    }
543
544    /// Adds a new output, returning the output handle and stream.
545    pub fn new_output<CB: ContainerBuilder>(
546        &mut self,
547    ) -> (
548        AsyncOutputHandle<G::Timestamp, CB>,
549        TimelyStream<G, CB::Container>,
550    ) {
551        let index = self.builder.shape().outputs();
552
553        let (output, stream) = self.builder.new_output_connection([]);
554
555        let handle = AsyncOutputHandle::new(output, index);
556
557        let flush_handle = handle.clone();
558        self.output_flushes
559            .push(Box::new(move || flush_handle.cease()));
560
561        (handle, stream)
562    }
563
564    /// Creates an operator implementation from supplied logic constructor. It returns a shutdown
565    /// button that when pressed it will cause the logic future to be dropped and input handles to
566    /// be drained. The button can be converted into a token by using
567    /// [`Button::press_on_drop`]
568    pub fn build<B, L>(self, constructor: B) -> Button
569    where
570        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
571        L: Future + 'static,
572    {
573        let operator_waker = self.operator_waker;
574        let mut input_frontiers = self.input_frontiers;
575        let mut input_queues = self.input_queues;
576        let mut output_flushes = self.output_flushes;
577        let mut shutdown_handle = self.shutdown_handle;
578        self.builder.build_reschedule(move |caps| {
579            let mut logic_fut = Some(Box::pin(constructor(caps)));
580            move |new_frontiers| {
581                operator_waker.active.store(true, Ordering::SeqCst);
582                for (i, queue) in input_queues.iter_mut().enumerate() {
583                    // First, discover if there are any frontier notifications
584                    let cur = &mut input_frontiers[i];
585                    let new = new_frontiers[i].frontier();
586                    if PartialOrder::less_than(&cur.borrow(), &new) {
587                        queue.notify_progress(new.to_owned());
588                        *cur = new.to_owned();
589                    }
590                    // Then accept all input into local queues. This step registers the received
591                    // messages with progress tracking.
592                    queue.accept_input();
593                }
594                operator_waker.active.store(false, Ordering::SeqCst);
595
596                // If our worker pressed the button we stop scheduling the logic future and/or
597                // draining the input handles to stop producing data and frontier updates
598                // downstream.
599                if shutdown_handle.local_pressed() {
600                    // When all workers press their buttons we drop the logic future and start
601                    // draining the input handles.
602                    if shutdown_handle.all_pressed() {
603                        logic_fut = None;
604                        for queue in input_queues.iter_mut() {
605                            queue.drain_input();
606                        }
607                        false
608                    } else {
609                        true
610                    }
611                } else {
612                    // Schedule the logic future if any of the wakers above marked the task as ready
613                    if let Some(fut) = logic_fut.as_mut() {
614                        if operator_waker.task_ready.load(Ordering::SeqCst) {
615                            let waker = futures_util::task::waker_ref(&operator_waker);
616                            let mut cx = Context::from_waker(&waker);
617                            operator_waker.task_ready.store(false, Ordering::SeqCst);
618                            if Pin::new(fut).poll(&mut cx).is_ready() {
619                                // We're done with logic so deallocate the task
620                                logic_fut = None;
621                            }
622                            // Flush all the outputs before exiting
623                            for flush in output_flushes.iter_mut() {
624                                (flush)();
625                            }
626                        }
627                    }
628
629                    // The timely operator needs to be kept alive if the task is pending
630                    if logic_fut.is_some() {
631                        true
632                    } else {
633                        // Othewise we should keep draining all inputs
634                        for queue in input_queues.iter_mut() {
635                            queue.drain_input();
636                        }
637                        false
638                    }
639                }
640            }
641        });
642
643        self.shutdown_button
644    }
645
646    /// Creates a fallible operator implementation from supplied logic constructor. If the `Future`
647    /// resolves to an error it will be emitted in the returned error stream and then the operator
648    /// will wait indefinitely until the shutdown button is pressed.
649    ///
650    /// # Capability handling
651    ///
652    /// Unlike [`OperatorBuilder::build`], this method does not give owned capabilities to the
653    /// constructor. All initial capabilities are wrapped in a `CapabilitySet` and a mutable
654    /// reference to them is given instead. This is done to avoid storing owned capabilities in the
655    /// state of the logic future which would make using the `?` operator unsafe, since the
656    /// frontiers would incorrectly advance, potentially causing incorrect actions downstream.
657    ///
658    /// ```ignore
659    /// builder.build_fallible(|caps| Box::pin(async move {
660    ///     // Assert that we have the number of capabilities we expect
661    ///     // `cap` will be a `&mut Option<Capability<T>>`:
662    ///     let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
663    ///
664    ///     // Using cap to send data:
665    ///     output.give(&cap_set[0], 42);
666    ///
667    ///     // Using cap_set to downgrade it:
668    ///     cap_set.downgrade([]);
669    ///
670    ///     // Explicitly dropping the capability:
671    ///     // Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
672    ///     *cap_set = CapabilitySet::new();
673    ///
674    ///     // !! BIG WARNING !!:
675    ///     // It is tempting to `take` the capability out of the set for convenience. This will
676    ///     // move the capability into the future state, tying its lifetime to it, which will get
677    ///     // dropped when an error is hit, causing incorrect progress statements.
678    ///     let cap = cap_set.delayed(&Timestamp::minimum());
679    ///     *cap_set = CapabilitySet::new(); // DO NOT DO THIS
680    /// }));
681    /// ```
682    pub fn build_fallible<E: 'static, F>(mut self, constructor: F) -> (Button, StreamVec<G, Rc<E>>)
683    where
684        F: for<'a> FnOnce(
685                &'a mut [CapabilitySet<G::Timestamp>],
686            ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
687            + 'static,
688    {
689        // Create a new completely disconnected output
690        let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
691        let button = self.build(|mut caps| async move {
692            let error_cap = caps.pop().unwrap();
693            let mut caps = caps
694                .into_iter()
695                .map(CapabilitySet::from_elem)
696                .collect::<Vec<_>>();
697            if let Err(err) = constructor(&mut *caps).await {
698                error_output.give(&error_cap, Rc::new(err));
699                drop(error_cap);
700                // IMPORTANT: wedge this operator until the button is pressed. Returning would drop
701                // the capabilities and could produce incorrect progress statements.
702                std::future::pending().await
703            }
704        });
705        (button, error_stream)
706    }
707
708    /// Creates operator info for the operator.
709    pub fn operator_info(&self) -> OperatorInfo {
710        self.builder.operator_info()
711    }
712
713    /// Returns the activator for the operator.
714    pub fn activator(&self) -> &Activator {
715        &self.activator
716    }
717}
718
719/// Creates a new coordinated button the worker configuration described by `scope`.
720pub fn button<G: Scope>(scope: &mut G, addr: Rc<[usize]>) -> (ButtonHandle, Button) {
721    let index = scope.new_identifier();
722    let (pushers, puller) = scope.allocate(index, addr);
723
724    let local_pressed = Rc::new(Cell::new(false));
725
726    let handle = ButtonHandle {
727        buttons_remaining: scope.peers(),
728        local_pressed: Rc::clone(&local_pressed),
729        puller,
730    };
731
732    let token = Button {
733        pushers,
734        local_pressed,
735    };
736
737    (handle, token)
738}
739
740/// A button that can be used to coordinate an action after all workers have pressed it.
741pub struct ButtonHandle {
742    /// The number of buttons still unpressed among workers.
743    buttons_remaining: usize,
744    /// A flag indicating whether this worker has pressed its button.
745    local_pressed: Rc<Cell<bool>>,
746    puller: Box<dyn Pull<Bincode<bool>>>,
747}
748
749impl ButtonHandle {
750    /// Returns whether this worker has pressed its button.
751    pub fn local_pressed(&self) -> bool {
752        self.local_pressed.get()
753    }
754
755    /// Returns whether all workers have pressed their buttons.
756    pub fn all_pressed(&mut self) -> bool {
757        while self.puller.recv().is_some() {
758            self.buttons_remaining -= 1;
759        }
760        self.buttons_remaining == 0
761    }
762}
763
764pub struct Button {
765    pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
766    local_pressed: Rc<Cell<bool>>,
767}
768
769impl Button {
770    /// Presses the button. It is safe to call this function multiple times.
771    pub fn press(&mut self) {
772        for mut pusher in self.pushers.drain(..) {
773            pusher.send(Bincode::from(true));
774            pusher.done();
775        }
776        self.local_pressed.set(true);
777    }
778
779    /// Converts this button into a deadman's switch that will automatically press the button when
780    /// dropped.
781    pub fn press_on_drop(self) -> PressOnDropButton {
782        PressOnDropButton(self)
783    }
784}
785
786pub struct PressOnDropButton(Button);
787
788impl Drop for PressOnDropButton {
789    fn drop(&mut self) {
790        self.0.press();
791    }
792}
793
794#[cfg(test)]
795mod test {
796    use futures_util::StreamExt;
797    use timely::WorkerConfig;
798    use timely::dataflow::channels::pact::Pipeline;
799    use timely::dataflow::operators::Capture;
800    use timely::dataflow::operators::capture::Extract;
801    use timely::dataflow::operators::vec::ToStream;
802
803    use super::*;
804
805    #[mz_ore::test]
806    fn async_operator() {
807        let capture = timely::example(|scope| {
808            let input = (0..10).to_stream(scope);
809
810            let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
811            let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
812            let mut input_handle = op.new_input_for(input, Pipeline, &output);
813
814            op.build(move |_capabilities| async move {
815                tokio::task::yield_now().await;
816                while let Some(event) = input_handle.next().await {
817                    match event {
818                        Event::Data(cap, data) => {
819                            for item in data.iter().copied() {
820                                tokio::task::yield_now().await;
821                                output.give(&cap, item);
822                            }
823                        }
824                        Event::Progress(_frontier) => {}
825                    }
826                }
827            });
828
829            output_stream.capture()
830        });
831        let extracted = capture.extract();
832
833        assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
834    }
835
836    #[mz_ore::test]
837    fn gh_18837() {
838        let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
839        timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
840            let index = worker.index();
841            let tokens = worker.dataflow::<u64, _, _>(move |scope| {
842                let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
843                let (_output, output_stream) =
844                    producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
845                let producer_button = producer.build(move |mut capabilities| async move {
846                    let mut cap = capabilities.pop().unwrap();
847                    if index != 0 {
848                        return;
849                    }
850                    // Worker 0 downgrades to 1 and keeps the capability around forever
851                    cap.downgrade(&1);
852                    std::future::pending().await
853                });
854
855                let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
856                let mut input_handle = consumer.new_disconnected_input(output_stream, Pipeline);
857                let consumer_button = consumer.build(move |_| async move {
858                    while let Some(event) = input_handle.next().await {
859                        if let Event::Progress(frontier) = event {
860                            // We should never observe a frontier greater than [1]
861                            assert!(frontier.less_equal(&1));
862                        }
863                    }
864                });
865
866                (
867                    producer_button.press_on_drop(),
868                    consumer_button.press_on_drop(),
869                )
870            });
871
872            // Run dataflow until only worker 0 holds the frontier to [1]
873            for _ in 0..100 {
874                worker.step();
875            }
876            // Then drop the tokens of worker 0
877            if index == 0 {
878                drop(tokens)
879            }
880            // And step the dataflow some more to ensure consumers don't observe frontiers advancing.
881            for _ in 0..100 {
882                worker.step();
883            }
884        })
885        .expect("timely panicked");
886    }
887}