1use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use differential_dataflow::containers::{Columnation, TimelyStack};
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Output;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
37use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
38use timely::dataflow::{Scope, Stream as TimelyStream, StreamVec};
39use timely::progress::{Antichain, Timestamp};
40use timely::scheduling::{Activator, SyncActivator};
41use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
42
43use crate::containers::stack::AccountedStackBuilder;
44
45pub struct OperatorBuilder<G: Scope> {
47 builder: OperatorBuilderRc<G>,
48 activator: Activator,
50 operator_waker: Arc<TimelyWaker>,
52 input_frontiers: Vec<Antichain<G::Timestamp>>,
54 input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
56 output_flushes: Vec<Box<dyn FnMut()>>,
59 shutdown_handle: ButtonHandle,
61 shutdown_button: Button,
63}
64
65trait InputQueue<T: Timestamp> {
68 fn accept_input(&mut self);
70
71 fn drain_input(&mut self);
73
74 fn notify_progress(&mut self, upper: Antichain<T>);
76}
77
78impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
79where
80 T: Timestamp,
81 D: Container,
82 C: InputConnection<T> + 'static,
83 P: Pull<Message<T, D>> + 'static,
84{
85 fn accept_input(&mut self) {
86 let mut queue = self.queue.borrow_mut();
87 let mut new_data = false;
88 self.handle.for_each(|cap, data| {
89 new_data = true;
90 let cap = self.connection.accept(cap);
91 queue.push_back(Event::Data(cap, std::mem::take(data)));
92 });
93 if new_data {
94 if let Some(waker) = self.waker.take() {
95 waker.wake();
96 }
97 }
98 }
99
100 fn drain_input(&mut self) {
101 self.queue.borrow_mut().clear();
102 self.handle.for_each(|_, _| {});
103 }
104
105 fn notify_progress(&mut self, upper: Antichain<T>) {
106 let mut queue = self.queue.borrow_mut();
107 match queue.back_mut() {
111 Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
112 _ => queue.push_back(Event::Progress(upper)),
113 }
114 if let Some(waker) = self.waker.take() {
115 waker.wake();
116 }
117 }
118}
119
120struct InputHandleQueue<
121 T: Timestamp,
122 D: Container,
123 C: InputConnection<T>,
124 P: Pull<Message<T, D>> + 'static,
125> {
126 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
127 waker: Rc<Cell<Option<Waker>>>,
128 connection: C,
129 handle: InputHandleCore<T, D, P>,
130}
131
132struct TimelyWaker {
134 activator: SyncActivator,
135 active: AtomicBool,
136 task_ready: AtomicBool,
137}
138
139impl ArcWake for TimelyWaker {
140 fn wake_by_ref(arc_self: &Arc<Self>) {
141 arc_self.task_ready.store(true, Ordering::SeqCst);
142 if !arc_self.active.load(Ordering::SeqCst) {
144 let _ = arc_self.activator.activate();
149 }
150 }
151}
152
153pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
155 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
156 waker: Rc<Cell<Option<Waker>>>,
157 done: bool,
159}
160
161impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
162 pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
163 let mut queue = self.queue.borrow_mut();
164 match queue.pop_front()? {
165 Event::Data(cap, data) => Some(Event::Data(cap, data)),
166 Event::Progress(frontier) => {
167 self.done = frontier.is_empty();
168 Some(Event::Progress(frontier))
169 }
170 }
171 }
172
173 pub async fn ready(&self) {
176 std::future::poll_fn(|cx| self.poll_ready(cx)).await
177 }
178
179 fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
180 if self.queue.borrow().is_empty() {
181 self.waker.set(Some(cx.waker().clone()));
182 Poll::Pending
183 } else {
184 Poll::Ready(())
185 }
186 }
187}
188
189impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
190 type Item = Event<T, C::Capability, D>;
191
192 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
193 if self.done {
194 return Poll::Ready(None);
195 }
196 ready!(self.poll_ready(cx));
197 Poll::Ready(self.next_sync())
198 }
199
200 fn size_hint(&self) -> (usize, Option<usize>) {
201 (self.queue.borrow().len(), None)
202 }
203}
204
205#[derive(Debug)]
207pub enum Event<T: Timestamp, C, D> {
208 Data(C, D),
210 Progress(Antichain<T>),
212}
213
214struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
216 output: Output<T, CB::Container>,
218 capability: Option<Capability<T>>,
220 builder: CB,
222}
223
224impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
225 fn flush(&mut self) {
227 while let Some(container) = self.builder.finish() {
228 self.output
229 .give(self.capability.as_ref().expect("must exist"), container);
230 }
231 }
232
233 fn cease(&mut self) {
235 self.flush();
236 let _ = self.output.activate();
237 self.capability = None;
238 }
239
240 fn give<D>(&mut self, cap: &Capability<T>, data: D)
243 where
244 CB: PushInto<D>,
245 {
246 if let Some(capability) = &self.capability
247 && cap.time() != capability.time()
248 {
249 self.flush();
250 self.capability = None;
251 }
252 if self.capability.is_none() {
253 self.capability = Some(cap.clone());
254 }
255
256 self.builder.push_into(data);
257 while let Some(container) = self.builder.extract() {
258 self.output
259 .give(self.capability.as_ref().expect("must exist"), container);
260 }
261 }
262}
263
264pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
265 inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
266 index: usize,
267}
268
269impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
270where
271 T: Timestamp,
272 C: Container + Clone + 'static,
273{
274 #[inline]
275 pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
276 let mut inner = self.inner.borrow_mut();
277 inner.flush();
278 inner.output.give(cap, container);
279 }
280}
281
282impl<T, CB> AsyncOutputHandle<T, CB>
283where
284 T: Timestamp,
285 CB: ContainerBuilder,
286{
287 fn new(output: Output<T, CB::Container>, index: usize) -> Self {
288 let inner = AsyncOutputHandleInner {
289 output,
290 capability: None,
291 builder: CB::default(),
292 };
293 Self {
294 inner: Rc::new(RefCell::new(inner)),
295 index,
296 }
297 }
298
299 fn cease(&self) {
300 self.inner.borrow_mut().cease();
301 }
302}
303
304impl<T, CB> AsyncOutputHandle<T, CB>
305where
306 T: Timestamp,
307 CB: ContainerBuilder,
308{
309 pub fn give<D>(&self, cap: &Capability<T>, data: D)
310 where
311 CB: PushInto<D>,
312 {
313 self.inner.borrow_mut().give(cap, data);
314 }
315}
316
317impl<T, D> AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<D>>>>
318where
319 D: Clone + 'static + Columnation,
320 T: Timestamp,
321{
322 pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
323
324 pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
327 where
328 TimelyStack<D>: PushInto<D2>,
329 {
330 let should_yield = {
331 let mut handle = self.inner.borrow_mut();
332 handle.give(cap, data);
333 let should_yield = handle.builder.bytes.get() > Self::MAX_OUTSTANDING_BYTES;
334 if should_yield {
335 handle.builder.bytes.set(0);
336 }
337 should_yield
338 };
339 if should_yield {
340 tokio::task::yield_now().await;
341 }
342 }
343}
344
345impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
346 fn clone(&self) -> Self {
347 Self {
348 inner: Rc::clone(&self.inner),
349 index: self.index,
350 }
351 }
352}
353
354pub trait InputConnection<T: Timestamp> {
357 type Capability;
359
360 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
362
363 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
365}
366
367pub struct Disconnected;
369
370impl<T: Timestamp> InputConnection<T> for Disconnected {
371 type Capability = T;
372
373 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
374 vec![Antichain::new(); outputs]
375 }
376
377 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
378 input_cap.time().clone()
379 }
380}
381
382pub struct ConnectedToOne(usize);
384
385impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
386 type Capability = Capability<T>;
387
388 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
389 let mut summary = vec![Antichain::new(); outputs];
390 summary[self.0] = Antichain::from_elem(T::Summary::default());
391 summary
392 }
393
394 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
395 input_cap.retain(self.0)
396 }
397}
398
399pub struct ConnectedToMany<const N: usize>([usize; N]);
401
402impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
403 type Capability = [Capability<T>; N];
404
405 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
406 let mut summary = vec![Antichain::new(); outputs];
407 for output in self.0 {
408 summary[output] = Antichain::from_elem(T::Summary::default());
409 }
410 summary
411 }
412
413 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
414 self.0.map(|output| input_cap.retain(output))
415 }
416}
417
418pub trait OutputIndex {
422 fn index(&self) -> usize;
424}
425
426impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
427 fn index(&self) -> usize {
428 self.index
429 }
430}
431
432impl<G: Scope> OperatorBuilder<G> {
433 pub fn new(name: String, mut scope: G) -> Self {
435 let builder = OperatorBuilderRc::new(name, scope.clone());
436 let info = builder.operator_info();
437 let activator = scope.activator_for(Rc::clone(&info.address));
438 let sync_activator = scope.sync_activator_for(info.address.to_vec());
439 let operator_waker = TimelyWaker {
440 activator: sync_activator,
441 active: AtomicBool::new(false),
442 task_ready: AtomicBool::new(true),
443 };
444 let (shutdown_handle, shutdown_button) = button(&mut scope, info.address);
445
446 OperatorBuilder {
447 builder,
448 activator,
449 operator_waker: Arc::new(operator_waker),
450 input_frontiers: Default::default(),
451 input_queues: Default::default(),
452 output_flushes: Default::default(),
453 shutdown_handle,
454 shutdown_button,
455 }
456 }
457
458 pub fn new_input_for<D, P>(
460 &mut self,
461 stream: TimelyStream<G, D>,
462 pact: P,
463 output: &dyn OutputIndex,
464 ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
465 where
466 D: Container + Clone + 'static,
467 P: ParallelizationContract<G::Timestamp, D>,
468 {
469 let index = output.index();
470 assert!(index < self.builder.shape().outputs());
471 self.new_input_connection(stream, pact, ConnectedToOne(index))
472 }
473
474 pub fn new_input_for_many<const N: usize, D, P>(
476 &mut self,
477 stream: TimelyStream<G, D>,
478 pact: P,
479 outputs: [&dyn OutputIndex; N],
480 ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
481 where
482 D: Container + Clone + 'static,
483 P: ParallelizationContract<G::Timestamp, D>,
484 {
485 let indices = outputs.map(|output| output.index());
486 for index in indices {
487 assert!(index < self.builder.shape().outputs());
488 }
489 self.new_input_connection(stream, pact, ConnectedToMany(indices))
490 }
491
492 pub fn new_disconnected_input<D, P>(
494 &mut self,
495 stream: TimelyStream<G, D>,
496 pact: P,
497 ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
498 where
499 D: Container + Clone + 'static,
500 P: ParallelizationContract<G::Timestamp, D>,
501 {
502 self.new_input_connection(stream, pact, Disconnected)
503 }
504
505 pub fn new_input_connection<D, P, C>(
507 &mut self,
508 stream: TimelyStream<G, D>,
509 pact: P,
510 connection: C,
511 ) -> AsyncInputHandle<G::Timestamp, D, C>
512 where
513 D: Container + Clone + 'static,
514 P: ParallelizationContract<G::Timestamp, D>,
515 C: InputConnection<G::Timestamp> + 'static,
516 {
517 self.input_frontiers
518 .push(Antichain::from_elem(G::Timestamp::minimum()));
519
520 let outputs = self.builder.shape().outputs();
521 let handle = self.builder.new_input_connection(
522 stream,
523 pact,
524 connection.describe(outputs).into_iter().enumerate(),
525 );
526
527 let waker = Default::default();
528 let queue = Default::default();
529 let input_queue = InputHandleQueue {
530 queue: Rc::clone(&queue),
531 waker: Rc::clone(&waker),
532 connection,
533 handle,
534 };
535 self.input_queues.push(Box::new(input_queue));
536
537 AsyncInputHandle {
538 queue,
539 waker,
540 done: false,
541 }
542 }
543
544 pub fn new_output<CB: ContainerBuilder>(
546 &mut self,
547 ) -> (
548 AsyncOutputHandle<G::Timestamp, CB>,
549 TimelyStream<G, CB::Container>,
550 ) {
551 let index = self.builder.shape().outputs();
552
553 let (output, stream) = self.builder.new_output_connection([]);
554
555 let handle = AsyncOutputHandle::new(output, index);
556
557 let flush_handle = handle.clone();
558 self.output_flushes
559 .push(Box::new(move || flush_handle.cease()));
560
561 (handle, stream)
562 }
563
564 pub fn build<B, L>(self, constructor: B) -> Button
569 where
570 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
571 L: Future + 'static,
572 {
573 let operator_waker = self.operator_waker;
574 let mut input_frontiers = self.input_frontiers;
575 let mut input_queues = self.input_queues;
576 let mut output_flushes = self.output_flushes;
577 let mut shutdown_handle = self.shutdown_handle;
578 self.builder.build_reschedule(move |caps| {
579 let mut logic_fut = Some(Box::pin(constructor(caps)));
580 move |new_frontiers| {
581 operator_waker.active.store(true, Ordering::SeqCst);
582 for (i, queue) in input_queues.iter_mut().enumerate() {
583 let cur = &mut input_frontiers[i];
585 let new = new_frontiers[i].frontier();
586 if PartialOrder::less_than(&cur.borrow(), &new) {
587 queue.notify_progress(new.to_owned());
588 *cur = new.to_owned();
589 }
590 queue.accept_input();
593 }
594 operator_waker.active.store(false, Ordering::SeqCst);
595
596 if shutdown_handle.local_pressed() {
600 if shutdown_handle.all_pressed() {
603 logic_fut = None;
604 for queue in input_queues.iter_mut() {
605 queue.drain_input();
606 }
607 false
608 } else {
609 true
610 }
611 } else {
612 if let Some(fut) = logic_fut.as_mut() {
614 if operator_waker.task_ready.load(Ordering::SeqCst) {
615 let waker = futures_util::task::waker_ref(&operator_waker);
616 let mut cx = Context::from_waker(&waker);
617 operator_waker.task_ready.store(false, Ordering::SeqCst);
618 if Pin::new(fut).poll(&mut cx).is_ready() {
619 logic_fut = None;
621 }
622 for flush in output_flushes.iter_mut() {
624 (flush)();
625 }
626 }
627 }
628
629 if logic_fut.is_some() {
631 true
632 } else {
633 for queue in input_queues.iter_mut() {
635 queue.drain_input();
636 }
637 false
638 }
639 }
640 }
641 });
642
643 self.shutdown_button
644 }
645
646 pub fn build_fallible<E: 'static, F>(mut self, constructor: F) -> (Button, StreamVec<G, Rc<E>>)
683 where
684 F: for<'a> FnOnce(
685 &'a mut [CapabilitySet<G::Timestamp>],
686 ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
687 + 'static,
688 {
689 let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
691 let button = self.build(|mut caps| async move {
692 let error_cap = caps.pop().unwrap();
693 let mut caps = caps
694 .into_iter()
695 .map(CapabilitySet::from_elem)
696 .collect::<Vec<_>>();
697 if let Err(err) = constructor(&mut *caps).await {
698 error_output.give(&error_cap, Rc::new(err));
699 drop(error_cap);
700 std::future::pending().await
703 }
704 });
705 (button, error_stream)
706 }
707
708 pub fn operator_info(&self) -> OperatorInfo {
710 self.builder.operator_info()
711 }
712
713 pub fn activator(&self) -> &Activator {
715 &self.activator
716 }
717}
718
719pub fn button<G: Scope>(scope: &mut G, addr: Rc<[usize]>) -> (ButtonHandle, Button) {
721 let index = scope.new_identifier();
722 let (pushers, puller) = scope.allocate(index, addr);
723
724 let local_pressed = Rc::new(Cell::new(false));
725
726 let handle = ButtonHandle {
727 buttons_remaining: scope.peers(),
728 local_pressed: Rc::clone(&local_pressed),
729 puller,
730 };
731
732 let token = Button {
733 pushers,
734 local_pressed,
735 };
736
737 (handle, token)
738}
739
740pub struct ButtonHandle {
742 buttons_remaining: usize,
744 local_pressed: Rc<Cell<bool>>,
746 puller: Box<dyn Pull<Bincode<bool>>>,
747}
748
749impl ButtonHandle {
750 pub fn local_pressed(&self) -> bool {
752 self.local_pressed.get()
753 }
754
755 pub fn all_pressed(&mut self) -> bool {
757 while self.puller.recv().is_some() {
758 self.buttons_remaining -= 1;
759 }
760 self.buttons_remaining == 0
761 }
762}
763
764pub struct Button {
765 pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
766 local_pressed: Rc<Cell<bool>>,
767}
768
769impl Button {
770 pub fn press(&mut self) {
772 for mut pusher in self.pushers.drain(..) {
773 pusher.send(Bincode::from(true));
774 pusher.done();
775 }
776 self.local_pressed.set(true);
777 }
778
779 pub fn press_on_drop(self) -> PressOnDropButton {
782 PressOnDropButton(self)
783 }
784}
785
786pub struct PressOnDropButton(Button);
787
788impl Drop for PressOnDropButton {
789 fn drop(&mut self) {
790 self.0.press();
791 }
792}
793
794#[cfg(test)]
795mod test {
796 use futures_util::StreamExt;
797 use timely::WorkerConfig;
798 use timely::dataflow::channels::pact::Pipeline;
799 use timely::dataflow::operators::Capture;
800 use timely::dataflow::operators::capture::Extract;
801 use timely::dataflow::operators::vec::ToStream;
802
803 use super::*;
804
805 #[mz_ore::test]
806 fn async_operator() {
807 let capture = timely::example(|scope| {
808 let input = (0..10).to_stream(scope);
809
810 let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
811 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
812 let mut input_handle = op.new_input_for(input, Pipeline, &output);
813
814 op.build(move |_capabilities| async move {
815 tokio::task::yield_now().await;
816 while let Some(event) = input_handle.next().await {
817 match event {
818 Event::Data(cap, data) => {
819 for item in data.iter().copied() {
820 tokio::task::yield_now().await;
821 output.give(&cap, item);
822 }
823 }
824 Event::Progress(_frontier) => {}
825 }
826 }
827 });
828
829 output_stream.capture()
830 });
831 let extracted = capture.extract();
832
833 assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
834 }
835
836 #[mz_ore::test]
837 fn gh_18837() {
838 let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
839 timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
840 let index = worker.index();
841 let tokens = worker.dataflow::<u64, _, _>(move |scope| {
842 let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
843 let (_output, output_stream) =
844 producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
845 let producer_button = producer.build(move |mut capabilities| async move {
846 let mut cap = capabilities.pop().unwrap();
847 if index != 0 {
848 return;
849 }
850 cap.downgrade(&1);
852 std::future::pending().await
853 });
854
855 let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
856 let mut input_handle = consumer.new_disconnected_input(output_stream, Pipeline);
857 let consumer_button = consumer.build(move |_| async move {
858 while let Some(event) = input_handle.next().await {
859 if let Event::Progress(frontier) = event {
860 assert!(frontier.less_equal(&1));
862 }
863 }
864 });
865
866 (
867 producer_button.press_on_drop(),
868 consumer_button.press_on_drop(),
869 )
870 });
871
872 for _ in 0..100 {
874 worker.step();
875 }
876 if index == 0 {
878 drop(tokens)
879 }
880 for _ in 0..100 {
882 worker.step();
883 }
884 })
885 .expect("timely panicked");
886 }
887}