1use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use differential_dataflow::containers::{Columnation, TimelyStack};
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Output;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
37use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
38use timely::dataflow::{Scope, StreamCore};
39use timely::progress::{Antichain, Timestamp};
40use timely::scheduling::{Activator, SyncActivator};
41use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
42
43use crate::containers::stack::AccountedStackBuilder;
44
45pub struct OperatorBuilder<G: Scope> {
47 builder: OperatorBuilderRc<G>,
48 activator: Activator,
50 operator_waker: Arc<TimelyWaker>,
52 input_frontiers: Vec<Antichain<G::Timestamp>>,
54 input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
56 output_flushes: Vec<Box<dyn FnMut()>>,
59 shutdown_handle: ButtonHandle,
61 shutdown_button: Button,
63}
64
65trait InputQueue<T: Timestamp> {
68 fn accept_input(&mut self);
70
71 fn drain_input(&mut self);
73
74 fn notify_progress(&mut self, upper: Antichain<T>);
76}
77
78impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
79where
80 T: Timestamp,
81 D: Container,
82 C: InputConnection<T> + 'static,
83 P: Pull<Message<T, D>> + 'static,
84{
85 fn accept_input(&mut self) {
86 let mut queue = self.queue.borrow_mut();
87 let mut new_data = false;
88 while let Some((cap, data)) = self.handle.next() {
89 new_data = true;
90 let cap = self.connection.accept(cap);
91 queue.push_back(Event::Data(cap, std::mem::take(data)));
92 }
93 if new_data {
94 if let Some(waker) = self.waker.take() {
95 waker.wake();
96 }
97 }
98 }
99
100 fn drain_input(&mut self) {
101 self.queue.borrow_mut().clear();
102 self.handle.for_each(|_, _| {});
103 }
104
105 fn notify_progress(&mut self, upper: Antichain<T>) {
106 let mut queue = self.queue.borrow_mut();
107 match queue.back_mut() {
111 Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
112 _ => queue.push_back(Event::Progress(upper)),
113 }
114 if let Some(waker) = self.waker.take() {
115 waker.wake();
116 }
117 }
118}
119
120struct InputHandleQueue<
121 T: Timestamp,
122 D: Container,
123 C: InputConnection<T>,
124 P: Pull<Message<T, D>> + 'static,
125> {
126 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
127 waker: Rc<Cell<Option<Waker>>>,
128 connection: C,
129 handle: InputHandleCore<T, D, P>,
130}
131
132struct TimelyWaker {
134 activator: SyncActivator,
135 active: AtomicBool,
136 task_ready: AtomicBool,
137}
138
139impl ArcWake for TimelyWaker {
140 fn wake_by_ref(arc_self: &Arc<Self>) {
141 arc_self.task_ready.store(true, Ordering::SeqCst);
142 if !arc_self.active.load(Ordering::SeqCst) {
144 let _ = arc_self.activator.activate();
149 }
150 }
151}
152
153pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
155 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
156 waker: Rc<Cell<Option<Waker>>>,
157 done: bool,
159}
160
161impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
162 pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
163 let mut queue = self.queue.borrow_mut();
164 match queue.pop_front()? {
165 Event::Data(cap, data) => Some(Event::Data(cap, data)),
166 Event::Progress(frontier) => {
167 self.done = frontier.is_empty();
168 Some(Event::Progress(frontier))
169 }
170 }
171 }
172
173 pub async fn ready(&self) {
176 std::future::poll_fn(|cx| self.poll_ready(cx)).await
177 }
178
179 fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
180 if self.queue.borrow().is_empty() {
181 self.waker.set(Some(cx.waker().clone()));
182 Poll::Pending
183 } else {
184 Poll::Ready(())
185 }
186 }
187}
188
189impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
190 type Item = Event<T, C::Capability, D>;
191
192 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193 if self.done {
194 return Poll::Ready(None);
195 }
196 ready!(self.poll_ready(cx));
197 Poll::Ready(self.next_sync())
198 }
199
200 fn size_hint(&self) -> (usize, Option<usize>) {
201 (self.queue.borrow().len(), None)
202 }
203}
204
205#[derive(Debug)]
207pub enum Event<T: Timestamp, C, D> {
208 Data(C, D),
210 Progress(Antichain<T>),
212}
213
214struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
216 output: Output<T, CB::Container>,
218 capability: Option<Capability<T>>,
220 builder: CB,
222}
223
224impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
225 fn flush(&mut self) {
227 while let Some(container) = self.builder.finish() {
228 self.output
229 .give(self.capability.as_ref().expect("must exist"), container);
230 }
231 }
232
233 fn cease(&mut self) {
235 self.flush();
236 let _ = self.output.activate();
237 self.capability = None;
238 }
239
240 fn give<D>(&mut self, cap: &Capability<T>, data: D)
243 where
244 CB: PushInto<D>,
245 {
246 if let Some(capability) = &self.capability
247 && cap.time() != capability.time()
248 {
249 self.flush();
250 self.capability = None;
251 }
252 if self.capability.is_none() {
253 self.capability = Some(cap.clone());
254 }
255
256 self.builder.push_into(data);
257 while let Some(container) = self.builder.extract() {
258 self.output
259 .give(self.capability.as_ref().expect("must exist"), container);
260 }
261 }
262}
263
264pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
265 inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
266 index: usize,
267}
268
269impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
270where
271 T: Timestamp,
272 C: Container + Clone + 'static,
273{
274 #[inline]
275 pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
276 let mut inner = self.inner.borrow_mut();
277 inner.flush();
278 inner.output.give(cap, container);
279 }
280}
281
282impl<T, CB> AsyncOutputHandle<T, CB>
283where
284 T: Timestamp,
285 CB: ContainerBuilder,
286{
287 fn new(output: Output<T, CB::Container>, index: usize) -> Self {
288 let inner = AsyncOutputHandleInner {
289 output,
290 capability: None,
291 builder: CB::default(),
292 };
293 Self {
294 inner: Rc::new(RefCell::new(inner)),
295 index,
296 }
297 }
298
299 fn cease(&self) {
300 self.inner.borrow_mut().cease();
301 }
302}
303
304impl<T, CB> AsyncOutputHandle<T, CB>
305where
306 T: Timestamp,
307 CB: ContainerBuilder,
308{
309 pub fn give<D>(&self, cap: &Capability<T>, data: D)
310 where
311 CB: PushInto<D>,
312 {
313 self.inner.borrow_mut().give(cap, data);
314 }
315}
316
317impl<T, D> AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<D>>>>
318where
319 D: timely::Data + Columnation,
320 T: Timestamp,
321{
322 pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
323
324 pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
327 where
328 TimelyStack<D>: PushInto<D2>,
329 {
330 let should_yield = {
331 let mut handle = self.inner.borrow_mut();
332 handle.give(cap, data);
333 let should_yield = handle.builder.bytes.get() > Self::MAX_OUTSTANDING_BYTES;
334 if should_yield {
335 handle.builder.bytes.set(0);
336 }
337 should_yield
338 };
339 if should_yield {
340 tokio::task::yield_now().await;
341 }
342 }
343}
344
345impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
346 fn clone(&self) -> Self {
347 Self {
348 inner: Rc::clone(&self.inner),
349 index: self.index,
350 }
351 }
352}
353
354pub trait InputConnection<T: Timestamp> {
357 type Capability;
359
360 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
362
363 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
365}
366
367pub struct Disconnected;
369
370impl<T: Timestamp> InputConnection<T> for Disconnected {
371 type Capability = T;
372
373 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
374 vec![Antichain::new(); outputs]
375 }
376
377 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
378 input_cap.time().clone()
379 }
380}
381
382pub struct ConnectedToOne(usize);
384
385impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
386 type Capability = Capability<T>;
387
388 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
389 let mut summary = vec![Antichain::new(); outputs];
390 summary[self.0] = Antichain::from_elem(T::Summary::default());
391 summary
392 }
393
394 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
395 input_cap.retain_for_output(self.0)
396 }
397}
398
399pub struct ConnectedToMany<const N: usize>([usize; N]);
401
402impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
403 type Capability = [Capability<T>; N];
404
405 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
406 let mut summary = vec![Antichain::new(); outputs];
407 for output in self.0 {
408 summary[output] = Antichain::from_elem(T::Summary::default());
409 }
410 summary
411 }
412
413 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
414 self.0
415 .map(|output| input_cap.delayed_for_output(input_cap.time(), output))
416 }
417}
418
419pub trait OutputIndex {
423 fn index(&self) -> usize;
425}
426
427impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
428 fn index(&self) -> usize {
429 self.index
430 }
431}
432
433impl<G: Scope> OperatorBuilder<G> {
434 pub fn new(name: String, mut scope: G) -> Self {
436 let builder = OperatorBuilderRc::new(name, scope.clone());
437 let info = builder.operator_info();
438 let activator = scope.activator_for(Rc::clone(&info.address));
439 let sync_activator = scope.sync_activator_for(info.address.to_vec());
440 let operator_waker = TimelyWaker {
441 activator: sync_activator,
442 active: AtomicBool::new(false),
443 task_ready: AtomicBool::new(true),
444 };
445 let (shutdown_handle, shutdown_button) = button(&mut scope, info.address);
446
447 OperatorBuilder {
448 builder,
449 activator,
450 operator_waker: Arc::new(operator_waker),
451 input_frontiers: Default::default(),
452 input_queues: Default::default(),
453 output_flushes: Default::default(),
454 shutdown_handle,
455 shutdown_button,
456 }
457 }
458
459 pub fn new_input_for<D, P>(
461 &mut self,
462 stream: &StreamCore<G, D>,
463 pact: P,
464 output: &dyn OutputIndex,
465 ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
466 where
467 D: Container + 'static,
468 P: ParallelizationContract<G::Timestamp, D>,
469 {
470 let index = output.index();
471 assert!(index < self.builder.shape().outputs());
472 self.new_input_connection(stream, pact, ConnectedToOne(index))
473 }
474
475 pub fn new_input_for_many<const N: usize, D, P>(
477 &mut self,
478 stream: &StreamCore<G, D>,
479 pact: P,
480 outputs: [&dyn OutputIndex; N],
481 ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
482 where
483 D: Container + 'static,
484 P: ParallelizationContract<G::Timestamp, D>,
485 {
486 let indices = outputs.map(|output| output.index());
487 for index in indices {
488 assert!(index < self.builder.shape().outputs());
489 }
490 self.new_input_connection(stream, pact, ConnectedToMany(indices))
491 }
492
493 pub fn new_disconnected_input<D, P>(
495 &mut self,
496 stream: &StreamCore<G, D>,
497 pact: P,
498 ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
499 where
500 D: Container + 'static,
501 P: ParallelizationContract<G::Timestamp, D>,
502 {
503 self.new_input_connection(stream, pact, Disconnected)
504 }
505
506 pub fn new_input_connection<D, P, C>(
508 &mut self,
509 stream: &StreamCore<G, D>,
510 pact: P,
511 connection: C,
512 ) -> AsyncInputHandle<G::Timestamp, D, C>
513 where
514 D: Container + 'static,
515 P: ParallelizationContract<G::Timestamp, D>,
516 C: InputConnection<G::Timestamp> + 'static,
517 {
518 self.input_frontiers
519 .push(Antichain::from_elem(G::Timestamp::minimum()));
520
521 let outputs = self.builder.shape().outputs();
522 let handle = self.builder.new_input_connection(
523 stream,
524 pact,
525 connection.describe(outputs).into_iter().enumerate(),
526 );
527
528 let waker = Default::default();
529 let queue = Default::default();
530 let input_queue = InputHandleQueue {
531 queue: Rc::clone(&queue),
532 waker: Rc::clone(&waker),
533 connection,
534 handle,
535 };
536 self.input_queues.push(Box::new(input_queue));
537
538 AsyncInputHandle {
539 queue,
540 waker,
541 done: false,
542 }
543 }
544
545 pub fn new_output<CB: ContainerBuilder>(
547 &mut self,
548 ) -> (
549 AsyncOutputHandle<G::Timestamp, CB>,
550 StreamCore<G, CB::Container>,
551 ) {
552 let index = self.builder.shape().outputs();
553
554 let (output, stream) = self.builder.new_output_connection([]);
555
556 let handle = AsyncOutputHandle::new(output, index);
557
558 let flush_handle = handle.clone();
559 self.output_flushes
560 .push(Box::new(move || flush_handle.cease()));
561
562 (handle, stream)
563 }
564
565 pub fn build<B, L>(self, constructor: B) -> Button
570 where
571 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
572 L: Future + 'static,
573 {
574 let operator_waker = self.operator_waker;
575 let mut input_frontiers = self.input_frontiers;
576 let mut input_queues = self.input_queues;
577 let mut output_flushes = self.output_flushes;
578 let mut shutdown_handle = self.shutdown_handle;
579 self.builder.build_reschedule(move |caps| {
580 let mut logic_fut = Some(Box::pin(constructor(caps)));
581 move |new_frontiers| {
582 operator_waker.active.store(true, Ordering::SeqCst);
583 for (i, queue) in input_queues.iter_mut().enumerate() {
584 let cur = &mut input_frontiers[i];
586 let new = new_frontiers[i].frontier();
587 if PartialOrder::less_than(&cur.borrow(), &new) {
588 queue.notify_progress(new.to_owned());
589 *cur = new.to_owned();
590 }
591 queue.accept_input();
594 }
595 operator_waker.active.store(false, Ordering::SeqCst);
596
597 if shutdown_handle.local_pressed() {
601 if shutdown_handle.all_pressed() {
604 logic_fut = None;
605 for queue in input_queues.iter_mut() {
606 queue.drain_input();
607 }
608 false
609 } else {
610 true
611 }
612 } else {
613 if let Some(fut) = logic_fut.as_mut() {
615 if operator_waker.task_ready.load(Ordering::SeqCst) {
616 let waker = futures_util::task::waker_ref(&operator_waker);
617 let mut cx = Context::from_waker(&waker);
618 operator_waker.task_ready.store(false, Ordering::SeqCst);
619 if Pin::new(fut).poll(&mut cx).is_ready() {
620 logic_fut = None;
622 }
623 for flush in output_flushes.iter_mut() {
625 (flush)();
626 }
627 }
628 }
629
630 if logic_fut.is_some() {
632 true
633 } else {
634 for queue in input_queues.iter_mut() {
636 queue.drain_input();
637 }
638 false
639 }
640 }
641 }
642 });
643
644 self.shutdown_button
645 }
646
647 pub fn build_fallible<E: 'static, F>(
684 mut self,
685 constructor: F,
686 ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
687 where
688 F: for<'a> FnOnce(
689 &'a mut [CapabilitySet<G::Timestamp>],
690 ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
691 + 'static,
692 {
693 let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
695 let button = self.build(|mut caps| async move {
696 let error_cap = caps.pop().unwrap();
697 let mut caps = caps
698 .into_iter()
699 .map(CapabilitySet::from_elem)
700 .collect::<Vec<_>>();
701 if let Err(err) = constructor(&mut *caps).await {
702 error_output.give(&error_cap, Rc::new(err));
703 drop(error_cap);
704 std::future::pending().await
707 }
708 });
709 (button, error_stream)
710 }
711
712 pub fn operator_info(&self) -> OperatorInfo {
714 self.builder.operator_info()
715 }
716
717 pub fn activator(&self) -> &Activator {
719 &self.activator
720 }
721}
722
723pub fn button<G: Scope>(scope: &mut G, addr: Rc<[usize]>) -> (ButtonHandle, Button) {
725 let index = scope.new_identifier();
726 let (pushers, puller) = scope.allocate(index, addr);
727
728 let local_pressed = Rc::new(Cell::new(false));
729
730 let handle = ButtonHandle {
731 buttons_remaining: scope.peers(),
732 local_pressed: Rc::clone(&local_pressed),
733 puller,
734 };
735
736 let token = Button {
737 pushers,
738 local_pressed,
739 };
740
741 (handle, token)
742}
743
744pub struct ButtonHandle {
746 buttons_remaining: usize,
748 local_pressed: Rc<Cell<bool>>,
750 puller: Box<dyn Pull<Bincode<bool>>>,
751}
752
753impl ButtonHandle {
754 pub fn local_pressed(&self) -> bool {
756 self.local_pressed.get()
757 }
758
759 pub fn all_pressed(&mut self) -> bool {
761 while self.puller.recv().is_some() {
762 self.buttons_remaining -= 1;
763 }
764 self.buttons_remaining == 0
765 }
766}
767
768pub struct Button {
769 pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
770 local_pressed: Rc<Cell<bool>>,
771}
772
773impl Button {
774 pub fn press(&mut self) {
776 for mut pusher in self.pushers.drain(..) {
777 pusher.send(Bincode::from(true));
778 pusher.done();
779 }
780 self.local_pressed.set(true);
781 }
782
783 pub fn press_on_drop(self) -> PressOnDropButton {
786 PressOnDropButton(self)
787 }
788}
789
790pub struct PressOnDropButton(Button);
791
792impl Drop for PressOnDropButton {
793 fn drop(&mut self) {
794 self.0.press();
795 }
796}
797
798#[cfg(test)]
799mod test {
800 use futures_util::StreamExt;
801 use timely::WorkerConfig;
802 use timely::dataflow::channels::pact::Pipeline;
803 use timely::dataflow::operators::capture::Extract;
804 use timely::dataflow::operators::{Capture, ToStream};
805
806 use super::*;
807
808 #[mz_ore::test]
809 fn async_operator() {
810 let capture = timely::example(|scope| {
811 let input = (0..10).to_stream(scope);
812
813 let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
814 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
815 let mut input_handle = op.new_input_for(&input, Pipeline, &output);
816
817 op.build(move |_capabilities| async move {
818 tokio::task::yield_now().await;
819 while let Some(event) = input_handle.next().await {
820 match event {
821 Event::Data(cap, data) => {
822 for item in data.iter().copied() {
823 tokio::task::yield_now().await;
824 output.give(&cap, item);
825 }
826 }
827 Event::Progress(_frontier) => {}
828 }
829 }
830 });
831
832 output_stream.capture()
833 });
834 let extracted = capture.extract();
835
836 assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
837 }
838
839 #[mz_ore::test]
840 fn gh_18837() {
841 let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
842 timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
843 let index = worker.index();
844 let tokens = worker.dataflow::<u64, _, _>(move |scope| {
845 let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
846 let (_output, output_stream) =
847 producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
848 let producer_button = producer.build(move |mut capabilities| async move {
849 let mut cap = capabilities.pop().unwrap();
850 if index != 0 {
851 return;
852 }
853 cap.downgrade(&1);
855 std::future::pending().await
856 });
857
858 let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
859 let mut input_handle = consumer.new_disconnected_input(&output_stream, Pipeline);
860 let consumer_button = consumer.build(move |_| async move {
861 while let Some(event) = input_handle.next().await {
862 if let Event::Progress(frontier) = event {
863 assert!(frontier.less_equal(&1));
865 }
866 }
867 });
868
869 (
870 producer_button.press_on_drop(),
871 consumer_button.press_on_drop(),
872 )
873 });
874
875 for _ in 0..100 {
877 worker.step();
878 }
879 if index == 0 {
881 drop(tokens)
882 }
883 for _ in 0..100 {
885 worker.step();
886 }
887 })
888 .expect("timely panicked");
889 }
890}