1use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use differential_dataflow::containers::{Columnation, TimelyStack};
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Tee;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{
37 InputHandleCore, OperatorInfo, OutputHandleCore, OutputWrapper,
38};
39use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
40use timely::dataflow::{Scope, StreamCore};
41use timely::progress::{Antichain, Timestamp};
42use timely::scheduling::{Activator, SyncActivator};
43use timely::{Bincode, Container, PartialOrder};
44
45use crate::containers::stack::AccountedStackBuilder;
46
47pub struct OperatorBuilder<G: Scope> {
49 builder: OperatorBuilderRc<G>,
50 activator: Activator,
52 operator_waker: Arc<TimelyWaker>,
54 input_frontiers: Vec<Antichain<G::Timestamp>>,
56 input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
58 output_flushes: Vec<Box<dyn FnMut()>>,
61 shutdown_handle: ButtonHandle,
63 shutdown_button: Button,
65}
66
67trait InputQueue<T: Timestamp> {
70 fn accept_input(&mut self);
72
73 fn drain_input(&mut self);
75
76 fn notify_progress(&mut self, upper: Antichain<T>);
78}
79
80impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
81where
82 T: Timestamp,
83 D: Container,
84 C: InputConnection<T> + 'static,
85 P: Pull<Message<T, D>> + 'static,
86{
87 fn accept_input(&mut self) {
88 let mut queue = self.queue.borrow_mut();
89 let mut new_data = false;
90 while let Some((cap, data)) = self.handle.next() {
91 new_data = true;
92 let cap = self.connection.accept(cap);
93 queue.push_back(Event::Data(cap, std::mem::take(data)));
94 }
95 if new_data {
96 if let Some(waker) = self.waker.take() {
97 waker.wake();
98 }
99 }
100 }
101
102 fn drain_input(&mut self) {
103 self.queue.borrow_mut().clear();
104 self.handle.for_each(|_, _| {});
105 }
106
107 fn notify_progress(&mut self, upper: Antichain<T>) {
108 let mut queue = self.queue.borrow_mut();
109 match queue.back_mut() {
113 Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
114 _ => queue.push_back(Event::Progress(upper)),
115 }
116 if let Some(waker) = self.waker.take() {
117 waker.wake();
118 }
119 }
120}
121
122struct InputHandleQueue<
123 T: Timestamp,
124 D: Container,
125 C: InputConnection<T>,
126 P: Pull<Message<T, D>> + 'static,
127> {
128 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
129 waker: Rc<Cell<Option<Waker>>>,
130 connection: C,
131 handle: InputHandleCore<T, D, P>,
132}
133
134struct TimelyWaker {
136 activator: SyncActivator,
137 active: AtomicBool,
138 task_ready: AtomicBool,
139}
140
141impl ArcWake for TimelyWaker {
142 fn wake_by_ref(arc_self: &Arc<Self>) {
143 arc_self.task_ready.store(true, Ordering::SeqCst);
144 if !arc_self.active.load(Ordering::SeqCst) {
146 let _ = arc_self.activator.activate();
151 }
152 }
153}
154
155pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
157 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
158 waker: Rc<Cell<Option<Waker>>>,
159 done: bool,
161}
162
163impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
164 pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
165 let mut queue = self.queue.borrow_mut();
166 match queue.pop_front()? {
167 Event::Data(cap, data) => Some(Event::Data(cap, data)),
168 Event::Progress(frontier) => {
169 self.done = frontier.is_empty();
170 Some(Event::Progress(frontier))
171 }
172 }
173 }
174
175 pub async fn ready(&self) {
178 std::future::poll_fn(|cx| self.poll_ready(cx)).await
179 }
180
181 fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
182 if self.queue.borrow().is_empty() {
183 self.waker.set(Some(cx.waker().clone()));
184 Poll::Pending
185 } else {
186 Poll::Ready(())
187 }
188 }
189}
190
191impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
192 type Item = Event<T, C::Capability, D>;
193
194 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195 if self.done {
196 return Poll::Ready(None);
197 }
198 ready!(self.poll_ready(cx));
199 Poll::Ready(self.next_sync())
200 }
201
202 fn size_hint(&self) -> (usize, Option<usize>) {
203 (self.queue.borrow().len(), None)
204 }
205}
206
207#[derive(Debug)]
209pub enum Event<T: Timestamp, C, D> {
210 Data(C, D),
212 Progress(Antichain<T>),
214}
215
216pub struct AsyncOutputHandle<
217 T: Timestamp,
218 CB: ContainerBuilder,
219 P: Push<Message<T, CB::Container>> + 'static,
220> {
221 handle: Rc<RefCell<OutputHandleCore<'static, T, CB, P>>>,
224 wrapper: Rc<Pin<Box<OutputWrapper<T, CB, P>>>>,
225 index: usize,
226}
227
228impl<T, C, P> AsyncOutputHandle<T, CapacityContainerBuilder<C>, P>
229where
230 T: Timestamp,
231 C: Container + Clone + 'static,
232 P: Push<Message<T, C>> + 'static,
233{
234 #[inline]
235 pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
236 let mut handle = self.handle.borrow_mut();
237 handle.session_with_builder(cap).give_container(container);
238 }
239}
240
241impl<T, CB, P> AsyncOutputHandle<T, CB, P>
242where
243 T: Timestamp,
244 CB: ContainerBuilder,
245 P: Push<Message<T, CB::Container>> + 'static,
246{
247 fn new(wrapper: OutputWrapper<T, CB, P>, index: usize) -> Self {
248 let mut wrapper = Rc::new(Box::pin(wrapper));
249 let handle = unsafe {
257 let handle = Rc::get_mut(&mut wrapper)
258 .unwrap()
259 .as_mut()
260 .get_unchecked_mut()
261 .activate();
262 std::mem::transmute::<OutputHandleCore<'_, T, CB, P>, OutputHandleCore<'static, T, CB, P>>(
263 handle,
264 )
265 };
266 Self {
267 wrapper,
268 handle: Rc::new(RefCell::new(handle)),
269 index,
270 }
271 }
272
273 fn cease(&self) {
274 self.handle.borrow_mut().cease()
275 }
276}
277
278impl<T, C, P> AsyncOutputHandle<T, CapacityContainerBuilder<C>, P>
279where
280 T: Timestamp,
281 C: Container + Clone + 'static,
282 P: Push<Message<T, C>> + 'static,
283{
284 pub fn give<D>(&self, cap: &Capability<T>, data: D)
285 where
286 CapacityContainerBuilder<C>: PushInto<D>,
287 {
288 let mut handle = self.handle.borrow_mut();
289 handle.session_with_builder(cap).give(data);
290 }
291}
292
293impl<T, D, P>
294 AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<D>>>, P>
295where
296 D: timely::Data + Columnation,
297 T: Timestamp,
298 P: Push<Message<T, TimelyStack<D>>>,
299{
300 pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
301
302 pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
305 where
306 TimelyStack<D>: PushInto<D2>,
307 {
308 let should_yield = {
309 let mut handle = self.handle.borrow_mut();
310 let mut session = handle.session_with_builder(cap);
311 session.push_into(data);
312 let should_yield = session.builder().bytes.get() > Self::MAX_OUTSTANDING_BYTES;
313 if should_yield {
314 session.builder().bytes.set(0);
315 }
316 should_yield
317 };
318 if should_yield {
319 tokio::task::yield_now().await;
320 }
321 }
322}
323
324impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>> + 'static> Clone
325 for AsyncOutputHandle<T, CB, P>
326{
327 fn clone(&self) -> Self {
328 Self {
329 handle: Rc::clone(&self.handle),
330 wrapper: Rc::clone(&self.wrapper),
331 index: self.index,
332 }
333 }
334}
335
336pub trait InputConnection<T: Timestamp> {
339 type Capability;
341
342 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
344
345 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
347}
348
349pub struct Disconnected;
351
352impl<T: Timestamp> InputConnection<T> for Disconnected {
353 type Capability = T;
354
355 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
356 vec![Antichain::new(); outputs]
357 }
358
359 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
360 input_cap.time().clone()
361 }
362}
363
364pub struct ConnectedToOne(usize);
366
367impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
368 type Capability = Capability<T>;
369
370 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
371 let mut summary = vec![Antichain::new(); outputs];
372 summary[self.0] = Antichain::from_elem(T::Summary::default());
373 summary
374 }
375
376 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
377 input_cap.retain_for_output(self.0)
378 }
379}
380
381pub struct ConnectedToMany<const N: usize>([usize; N]);
383
384impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
385 type Capability = [Capability<T>; N];
386
387 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
388 let mut summary = vec![Antichain::new(); outputs];
389 for output in self.0 {
390 summary[output] = Antichain::from_elem(T::Summary::default());
391 }
392 summary
393 }
394
395 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
396 self.0
397 .map(|output| input_cap.delayed_for_output(input_cap.time(), output))
398 }
399}
400
401pub trait OutputIndex {
405 fn index(&self) -> usize;
407}
408
409impl<T: Timestamp, CB: ContainerBuilder> OutputIndex
410 for AsyncOutputHandle<T, CB, Tee<T, CB::Container>>
411{
412 fn index(&self) -> usize {
413 self.index
414 }
415}
416
417impl<G: Scope> OperatorBuilder<G> {
418 pub fn new(name: String, mut scope: G) -> Self {
420 let builder = OperatorBuilderRc::new(name, scope.clone());
421 let info = builder.operator_info();
422 let activator = scope.activator_for(Rc::clone(&info.address));
423 let sync_activator = scope.sync_activator_for(info.address.to_vec());
424 let operator_waker = TimelyWaker {
425 activator: sync_activator,
426 active: AtomicBool::new(false),
427 task_ready: AtomicBool::new(true),
428 };
429 let (shutdown_handle, shutdown_button) = button(&mut scope, info.address);
430
431 OperatorBuilder {
432 builder,
433 activator,
434 operator_waker: Arc::new(operator_waker),
435 input_frontiers: Default::default(),
436 input_queues: Default::default(),
437 output_flushes: Default::default(),
438 shutdown_handle,
439 shutdown_button,
440 }
441 }
442
443 pub fn new_input_for<D, P>(
445 &mut self,
446 stream: &StreamCore<G, D>,
447 pact: P,
448 output: &dyn OutputIndex,
449 ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
450 where
451 D: Container + 'static,
452 P: ParallelizationContract<G::Timestamp, D>,
453 {
454 let index = output.index();
455 assert!(index < self.builder.shape().outputs());
456 self.new_input_connection(stream, pact, ConnectedToOne(index))
457 }
458
459 pub fn new_input_for_many<const N: usize, D, P>(
461 &mut self,
462 stream: &StreamCore<G, D>,
463 pact: P,
464 outputs: [&dyn OutputIndex; N],
465 ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
466 where
467 D: Container + 'static,
468 P: ParallelizationContract<G::Timestamp, D>,
469 {
470 let indices = outputs.map(|output| output.index());
471 for index in indices {
472 assert!(index < self.builder.shape().outputs());
473 }
474 self.new_input_connection(stream, pact, ConnectedToMany(indices))
475 }
476
477 pub fn new_disconnected_input<D, P>(
479 &mut self,
480 stream: &StreamCore<G, D>,
481 pact: P,
482 ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
483 where
484 D: Container + 'static,
485 P: ParallelizationContract<G::Timestamp, D>,
486 {
487 self.new_input_connection(stream, pact, Disconnected)
488 }
489
490 pub fn new_input_connection<D, P, C>(
492 &mut self,
493 stream: &StreamCore<G, D>,
494 pact: P,
495 connection: C,
496 ) -> AsyncInputHandle<G::Timestamp, D, C>
497 where
498 D: Container + 'static,
499 P: ParallelizationContract<G::Timestamp, D>,
500 C: InputConnection<G::Timestamp> + 'static,
501 {
502 self.input_frontiers
503 .push(Antichain::from_elem(G::Timestamp::minimum()));
504
505 let outputs = self.builder.shape().outputs();
506 let handle = self.builder.new_input_connection(
507 stream,
508 pact,
509 connection.describe(outputs).into_iter().enumerate(),
510 );
511
512 let waker = Default::default();
513 let queue = Default::default();
514 let input_queue = InputHandleQueue {
515 queue: Rc::clone(&queue),
516 waker: Rc::clone(&waker),
517 connection,
518 handle,
519 };
520 self.input_queues.push(Box::new(input_queue));
521
522 AsyncInputHandle {
523 queue,
524 waker,
525 done: false,
526 }
527 }
528
529 pub fn new_output<CB: ContainerBuilder>(
531 &mut self,
532 ) -> (
533 AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
534 StreamCore<G, CB::Container>,
535 ) {
536 let index = self.builder.shape().outputs();
537
538 let (wrapper, stream) = self.builder.new_output_connection([]);
539
540 let handle = AsyncOutputHandle::new(wrapper, index);
541
542 let flush_handle = handle.clone();
543 self.output_flushes
544 .push(Box::new(move || flush_handle.cease()));
545
546 (handle, stream)
547 }
548
549 pub fn build<B, L>(self, constructor: B) -> Button
554 where
555 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
556 L: Future + 'static,
557 {
558 let operator_waker = self.operator_waker;
559 let mut input_frontiers = self.input_frontiers;
560 let mut input_queues = self.input_queues;
561 let mut output_flushes = self.output_flushes;
562 let mut shutdown_handle = self.shutdown_handle;
563 self.builder.build_reschedule(move |caps| {
564 let mut logic_fut = Some(Box::pin(constructor(caps)));
565 move |new_frontiers| {
566 operator_waker.active.store(true, Ordering::SeqCst);
567 for (i, queue) in input_queues.iter_mut().enumerate() {
568 let cur = &mut input_frontiers[i];
570 let new = new_frontiers[i].frontier();
571 if PartialOrder::less_than(&cur.borrow(), &new) {
572 queue.notify_progress(new.to_owned());
573 *cur = new.to_owned();
574 }
575 queue.accept_input();
578 }
579 operator_waker.active.store(false, Ordering::SeqCst);
580
581 if shutdown_handle.local_pressed() {
585 if shutdown_handle.all_pressed() {
588 logic_fut = None;
589 for queue in input_queues.iter_mut() {
590 queue.drain_input();
591 }
592 false
593 } else {
594 true
595 }
596 } else {
597 if let Some(fut) = logic_fut.as_mut() {
599 if operator_waker.task_ready.load(Ordering::SeqCst) {
600 let waker = futures_util::task::waker_ref(&operator_waker);
601 let mut cx = Context::from_waker(&waker);
602 operator_waker.task_ready.store(false, Ordering::SeqCst);
603 if Pin::new(fut).poll(&mut cx).is_ready() {
604 logic_fut = None;
606 }
607 for flush in output_flushes.iter_mut() {
609 (flush)();
610 }
611 }
612 }
613
614 if logic_fut.is_some() {
616 true
617 } else {
618 for queue in input_queues.iter_mut() {
620 queue.drain_input();
621 }
622 false
623 }
624 }
625 }
626 });
627
628 self.shutdown_button
629 }
630
631 pub fn build_fallible<E: 'static, F>(
668 mut self,
669 constructor: F,
670 ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
671 where
672 F: for<'a> FnOnce(
673 &'a mut [CapabilitySet<G::Timestamp>],
674 ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
675 + 'static,
676 {
677 let (error_output, error_stream) = self.new_output();
679 let button = self.build(|mut caps| async move {
680 let error_cap = caps.pop().unwrap();
681 let mut caps = caps
682 .into_iter()
683 .map(CapabilitySet::from_elem)
684 .collect::<Vec<_>>();
685 if let Err(err) = constructor(&mut *caps).await {
686 error_output.give(&error_cap, Rc::new(err));
687 drop(error_cap);
688 std::future::pending().await
691 }
692 });
693 (button, error_stream)
694 }
695
696 pub fn operator_info(&self) -> OperatorInfo {
698 self.builder.operator_info()
699 }
700
701 pub fn activator(&self) -> &Activator {
703 &self.activator
704 }
705}
706
707pub fn button<G: Scope>(scope: &mut G, addr: Rc<[usize]>) -> (ButtonHandle, Button) {
709 let index = scope.new_identifier();
710 let (pushers, puller) = scope.allocate(index, addr);
711
712 let local_pressed = Rc::new(Cell::new(false));
713
714 let handle = ButtonHandle {
715 buttons_remaining: scope.peers(),
716 local_pressed: Rc::clone(&local_pressed),
717 puller,
718 };
719
720 let token = Button {
721 pushers,
722 local_pressed,
723 };
724
725 (handle, token)
726}
727
728pub struct ButtonHandle {
730 buttons_remaining: usize,
732 local_pressed: Rc<Cell<bool>>,
734 puller: Box<dyn Pull<Bincode<bool>>>,
735}
736
737impl ButtonHandle {
738 pub fn local_pressed(&self) -> bool {
740 self.local_pressed.get()
741 }
742
743 pub fn all_pressed(&mut self) -> bool {
745 while self.puller.recv().is_some() {
746 self.buttons_remaining -= 1;
747 }
748 self.buttons_remaining == 0
749 }
750}
751
752pub struct Button {
753 pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
754 local_pressed: Rc<Cell<bool>>,
755}
756
757impl Button {
758 pub fn press(&mut self) {
760 for mut pusher in self.pushers.drain(..) {
761 pusher.send(Bincode::from(true));
762 pusher.done();
763 }
764 self.local_pressed.set(true);
765 }
766
767 pub fn press_on_drop(self) -> PressOnDropButton {
770 PressOnDropButton(self)
771 }
772}
773
774pub struct PressOnDropButton(Button);
775
776impl Drop for PressOnDropButton {
777 fn drop(&mut self) {
778 self.0.press();
779 }
780}
781
782#[cfg(test)]
783mod test {
784 use futures_util::StreamExt;
785 use timely::WorkerConfig;
786 use timely::dataflow::channels::pact::Pipeline;
787 use timely::dataflow::operators::capture::Extract;
788 use timely::dataflow::operators::{Capture, ToStream};
789
790 use super::*;
791
792 #[mz_ore::test]
793 fn async_operator() {
794 let capture = timely::example(|scope| {
795 let input = (0..10).to_stream(scope);
796
797 let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
798 let (output, output_stream) = op.new_output();
799 let mut input_handle = op.new_input_for(&input, Pipeline, &output);
800
801 op.build(move |_capabilities| async move {
802 tokio::task::yield_now().await;
803 while let Some(event) = input_handle.next().await {
804 match event {
805 Event::Data(cap, data) => {
806 for item in data.iter().copied() {
807 tokio::task::yield_now().await;
808 output.give(&cap, item);
809 }
810 }
811 Event::Progress(_frontier) => {}
812 }
813 }
814 });
815
816 output_stream.capture()
817 });
818 let extracted = capture.extract();
819
820 assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
821 }
822
823 #[mz_ore::test]
824 fn gh_18837() {
825 let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
826 timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
827 let index = worker.index();
828 let tokens = worker.dataflow::<u64, _, _>(move |scope| {
829 let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
830 let (_output, output_stream) =
831 producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
832 let producer_button = producer.build(move |mut capabilities| async move {
833 let mut cap = capabilities.pop().unwrap();
834 if index != 0 {
835 return;
836 }
837 cap.downgrade(&1);
839 std::future::pending().await
840 });
841
842 let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
843 let mut input_handle = consumer.new_disconnected_input(&output_stream, Pipeline);
844 let consumer_button = consumer.build(move |_| async move {
845 while let Some(event) = input_handle.next().await {
846 if let Event::Progress(frontier) = event {
847 assert!(frontier.less_equal(&1));
849 }
850 }
851 });
852
853 (
854 producer_button.press_on_drop(),
855 consumer_button.press_on_drop(),
856 )
857 });
858
859 for _ in 0..100 {
861 worker.step();
862 }
863 if index == 0 {
865 drop(tokens)
866 }
867 for _ in 0..100 {
869 worker.step();
870 }
871 })
872 .expect("timely panicked");
873 }
874}