1use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use futures_util::Stream;
28use futures_util::task::ArcWake;
29use timely::communication::{Pull, Push};
30use timely::container::{CapacityContainerBuilder, PushInto};
31use timely::dataflow::channels::Message;
32use timely::dataflow::channels::pact::ParallelizationContract;
33use timely::dataflow::channels::pushers::Output;
34use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
35use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
36use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
37use timely::dataflow::{Scope, Stream as TimelyStream, StreamVec};
38use timely::progress::{Antichain, Timestamp};
39use timely::scheduling::{Activator, SyncActivator};
40use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
41
42use crate::containers::stack::FueledBuilder;
43
44pub struct OperatorBuilder<'scope, T: Timestamp> {
46 builder: OperatorBuilderRc<'scope, T>,
47 activator: Activator,
49 operator_waker: Arc<TimelyWaker>,
51 input_frontiers: Vec<Antichain<T>>,
53 input_queues: Vec<Box<dyn InputQueue<T>>>,
55 output_flushes: Vec<Box<dyn FnMut()>>,
58 shutdown_handle: ButtonHandle,
60 shutdown_button: Button,
62}
63
64trait InputQueue<T: Timestamp> {
67 fn accept_input(&mut self);
69
70 fn drain_input(&mut self);
72
73 fn notify_progress(&mut self, upper: Antichain<T>);
75}
76
77impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
78where
79 T: Timestamp,
80 D: Container,
81 C: InputConnection<T> + 'static,
82 P: Pull<Message<T, D>> + 'static,
83{
84 fn accept_input(&mut self) {
85 let mut queue = self.queue.borrow_mut();
86 let mut new_data = false;
87 self.handle.for_each(|cap, data| {
88 new_data = true;
89 let cap = self.connection.accept(cap);
90 queue.push_back(Event::Data(cap, std::mem::take(data)));
91 });
92 if new_data {
93 if let Some(waker) = self.waker.take() {
94 waker.wake();
95 }
96 }
97 }
98
99 fn drain_input(&mut self) {
100 self.queue.borrow_mut().clear();
101 self.handle.for_each(|_, _| {});
102 }
103
104 fn notify_progress(&mut self, upper: Antichain<T>) {
105 let mut queue = self.queue.borrow_mut();
106 match queue.back_mut() {
110 Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
111 _ => queue.push_back(Event::Progress(upper)),
112 }
113 if let Some(waker) = self.waker.take() {
114 waker.wake();
115 }
116 }
117}
118
119struct InputHandleQueue<
120 T: Timestamp,
121 D: Container,
122 C: InputConnection<T>,
123 P: Pull<Message<T, D>> + 'static,
124> {
125 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
126 waker: Rc<Cell<Option<Waker>>>,
127 connection: C,
128 handle: InputHandleCore<T, D, P>,
129}
130
131struct TimelyWaker {
133 activator: SyncActivator,
134 active: AtomicBool,
135 task_ready: AtomicBool,
136}
137
138impl ArcWake for TimelyWaker {
139 fn wake_by_ref(arc_self: &Arc<Self>) {
140 arc_self.task_ready.store(true, Ordering::SeqCst);
141 if !arc_self.active.load(Ordering::SeqCst) {
143 let _ = arc_self.activator.activate();
148 }
149 }
150}
151
152pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
154 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
155 waker: Rc<Cell<Option<Waker>>>,
156 done: bool,
158}
159
160impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
161 pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
162 let mut queue = self.queue.borrow_mut();
163 match queue.pop_front()? {
164 Event::Data(cap, data) => Some(Event::Data(cap, data)),
165 Event::Progress(frontier) => {
166 self.done = frontier.is_empty();
167 Some(Event::Progress(frontier))
168 }
169 }
170 }
171
172 pub async fn ready(&self) {
175 std::future::poll_fn(|cx| self.poll_ready(cx)).await
176 }
177
178 fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
179 if self.queue.borrow().is_empty() {
180 self.waker.set(Some(cx.waker().clone()));
181 Poll::Pending
182 } else {
183 Poll::Ready(())
184 }
185 }
186}
187
188impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
189 type Item = Event<T, C::Capability, D>;
190
191 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192 if self.done {
193 return Poll::Ready(None);
194 }
195 ready!(self.poll_ready(cx));
196 Poll::Ready(self.next_sync())
197 }
198
199 fn size_hint(&self) -> (usize, Option<usize>) {
200 (self.queue.borrow().len(), None)
201 }
202}
203
204#[derive(Debug)]
206pub enum Event<T: Timestamp, C, D> {
207 Data(C, D),
209 Progress(Antichain<T>),
211}
212
213struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
215 output: Output<T, CB::Container>,
217 capability: Option<Capability<T>>,
219 builder: CB,
221}
222
223impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
224 fn flush(&mut self) {
226 while let Some(container) = self.builder.finish() {
227 self.output
228 .give(self.capability.as_ref().expect("must exist"), container);
229 }
230 }
231
232 fn cease(&mut self) {
234 self.flush();
235 let _ = self.output.activate();
236 self.capability = None;
237 }
238
239 fn give<D>(&mut self, cap: &Capability<T>, data: D)
242 where
243 CB: PushInto<D>,
244 {
245 if let Some(capability) = &self.capability
246 && cap.time() != capability.time()
247 {
248 self.flush();
249 self.capability = None;
250 }
251 if self.capability.is_none() {
252 self.capability = Some(cap.clone());
253 }
254
255 self.builder.push_into(data);
256 while let Some(container) = self.builder.extract() {
257 self.output
258 .give(self.capability.as_ref().expect("must exist"), container);
259 }
260 }
261}
262
263pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
264 inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
265 index: usize,
266}
267
268impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
269where
270 T: Timestamp,
271 C: Container + Clone + 'static,
272{
273 #[inline]
274 pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
275 let mut inner = self.inner.borrow_mut();
276 inner.flush();
277 inner.output.give(cap, container);
278 }
279}
280
281impl<T, CB> AsyncOutputHandle<T, CB>
282where
283 T: Timestamp,
284 CB: ContainerBuilder,
285{
286 fn new(output: Output<T, CB::Container>, index: usize) -> Self {
287 let inner = AsyncOutputHandleInner {
288 output,
289 capability: None,
290 builder: CB::default(),
291 };
292 Self {
293 inner: Rc::new(RefCell::new(inner)),
294 index,
295 }
296 }
297
298 fn cease(&self) {
299 self.inner.borrow_mut().cease();
300 }
301}
302
303impl<T, CB> AsyncOutputHandle<T, CB>
304where
305 T: Timestamp,
306 CB: ContainerBuilder,
307{
308 pub fn give<D>(&self, cap: &Capability<T>, data: D)
309 where
310 CB: PushInto<D>,
311 {
312 self.inner.borrow_mut().give(cap, data);
313 }
314}
315
316impl<T, D> AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<D>>>>
317where
318 D: Clone + 'static,
319 T: Timestamp,
320{
321 pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
322
323 pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2, size_bytes: usize)
328 where
329 FueledBuilder<CapacityContainerBuilder<Vec<D>>>: PushInto<D2>,
330 {
331 let should_yield = {
332 let mut handle = self.inner.borrow_mut();
333 handle.give(cap, data);
334 let bytes = &handle.builder.bytes;
335 bytes.set(bytes.get() + size_bytes);
336 let should_yield = bytes.get() > Self::MAX_OUTSTANDING_BYTES;
337 if should_yield {
338 bytes.set(0);
339 }
340 should_yield
341 };
342 if should_yield {
343 tokio::task::yield_now().await;
344 }
345 }
346}
347
348impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
349 fn clone(&self) -> Self {
350 Self {
351 inner: Rc::clone(&self.inner),
352 index: self.index,
353 }
354 }
355}
356
357pub trait InputConnection<T: Timestamp> {
360 type Capability;
362
363 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
365
366 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
368}
369
370pub struct Disconnected;
372
373impl<T: Timestamp> InputConnection<T> for Disconnected {
374 type Capability = T;
375
376 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
377 vec![Antichain::new(); outputs]
378 }
379
380 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
381 input_cap.time().clone()
382 }
383}
384
385pub struct ConnectedToOne(usize);
387
388impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
389 type Capability = Capability<T>;
390
391 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
392 let mut summary = vec![Antichain::new(); outputs];
393 summary[self.0] = Antichain::from_elem(T::Summary::default());
394 summary
395 }
396
397 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
398 input_cap.retain(self.0)
399 }
400}
401
402pub struct ConnectedToMany<const N: usize>([usize; N]);
404
405impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
406 type Capability = [Capability<T>; N];
407
408 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
409 let mut summary = vec![Antichain::new(); outputs];
410 for output in self.0 {
411 summary[output] = Antichain::from_elem(T::Summary::default());
412 }
413 summary
414 }
415
416 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
417 self.0.map(|output| input_cap.retain(output))
418 }
419}
420
421pub trait OutputIndex {
425 fn index(&self) -> usize;
427}
428
429impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
430 fn index(&self) -> usize {
431 self.index
432 }
433}
434
435impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
436 pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
438 let builder = OperatorBuilderRc::new(name, scope);
439 let info = builder.operator_info();
440 let activator = scope.activator_for(Rc::clone(&info.address));
441 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
442 let operator_waker = TimelyWaker {
443 activator: sync_activator,
444 active: AtomicBool::new(false),
445 task_ready: AtomicBool::new(true),
446 };
447 let (shutdown_handle, shutdown_button) = button(scope, info.address);
448
449 OperatorBuilder {
450 builder,
451 activator,
452 operator_waker: Arc::new(operator_waker),
453 input_frontiers: Default::default(),
454 input_queues: Default::default(),
455 output_flushes: Default::default(),
456 shutdown_handle,
457 shutdown_button,
458 }
459 }
460
461 pub fn new_input_for<D, P>(
463 &mut self,
464 stream: TimelyStream<'scope, T, D>,
465 pact: P,
466 output: &dyn OutputIndex,
467 ) -> AsyncInputHandle<T, D, ConnectedToOne>
468 where
469 D: Container + Clone + 'static,
470 P: ParallelizationContract<T, D>,
471 {
472 let index = output.index();
473 assert!(index < self.builder.shape().outputs());
474 self.new_input_connection(stream, pact, ConnectedToOne(index))
475 }
476
477 pub fn new_input_for_many<const N: usize, D, P>(
479 &mut self,
480 stream: TimelyStream<'scope, T, D>,
481 pact: P,
482 outputs: [&dyn OutputIndex; N],
483 ) -> AsyncInputHandle<T, D, ConnectedToMany<N>>
484 where
485 D: Container + Clone + 'static,
486 P: ParallelizationContract<T, D>,
487 {
488 let indices = outputs.map(|output| output.index());
489 for index in indices {
490 assert!(index < self.builder.shape().outputs());
491 }
492 self.new_input_connection(stream, pact, ConnectedToMany(indices))
493 }
494
495 pub fn new_disconnected_input<D, P>(
497 &mut self,
498 stream: TimelyStream<'scope, T, D>,
499 pact: P,
500 ) -> AsyncInputHandle<T, D, Disconnected>
501 where
502 D: Container + Clone + 'static,
503 P: ParallelizationContract<T, D>,
504 {
505 self.new_input_connection(stream, pact, Disconnected)
506 }
507
508 pub fn new_input_connection<D, P, C>(
510 &mut self,
511 stream: TimelyStream<'scope, T, D>,
512 pact: P,
513 connection: C,
514 ) -> AsyncInputHandle<T, D, C>
515 where
516 D: Container + Clone + 'static,
517 P: ParallelizationContract<T, D>,
518 C: InputConnection<T> + 'static,
519 {
520 self.input_frontiers
521 .push(Antichain::from_elem(T::minimum()));
522
523 let outputs = self.builder.shape().outputs();
524 let handle = self.builder.new_input_connection(
525 stream,
526 pact,
527 connection.describe(outputs).into_iter().enumerate(),
528 );
529
530 let waker = Default::default();
531 let queue = Default::default();
532 let input_queue = InputHandleQueue {
533 queue: Rc::clone(&queue),
534 waker: Rc::clone(&waker),
535 connection,
536 handle,
537 };
538 self.input_queues.push(Box::new(input_queue));
539
540 AsyncInputHandle {
541 queue,
542 waker,
543 done: false,
544 }
545 }
546
547 pub fn new_output<CB: ContainerBuilder>(
549 &mut self,
550 ) -> (
551 AsyncOutputHandle<T, CB>,
552 TimelyStream<'scope, T, CB::Container>,
553 ) {
554 let index = self.builder.shape().outputs();
555
556 let (output, stream) = self.builder.new_output_connection([]);
557
558 let handle = AsyncOutputHandle::new(output, index);
559
560 let flush_handle = handle.clone();
561 self.output_flushes
562 .push(Box::new(move || flush_handle.cease()));
563
564 (handle, stream)
565 }
566
567 pub fn build<B, L>(self, constructor: B) -> Button
572 where
573 B: FnOnce(Vec<Capability<T>>) -> L,
574 L: Future + 'static,
575 {
576 let operator_waker = self.operator_waker;
577 let mut input_frontiers = self.input_frontiers;
578 let mut input_queues = self.input_queues;
579 let mut output_flushes = self.output_flushes;
580 let mut shutdown_handle = self.shutdown_handle;
581 self.builder.build_reschedule(move |caps| {
582 let mut logic_fut = Some(Box::pin(constructor(caps)));
583 move |new_frontiers| {
584 operator_waker.active.store(true, Ordering::SeqCst);
585 for (i, queue) in input_queues.iter_mut().enumerate() {
586 let cur = &mut input_frontiers[i];
588 let new = new_frontiers[i].frontier();
589 if PartialOrder::less_than(&cur.borrow(), &new) {
590 queue.notify_progress(new.to_owned());
591 *cur = new.to_owned();
592 }
593 queue.accept_input();
596 }
597 operator_waker.active.store(false, Ordering::SeqCst);
598
599 if shutdown_handle.local_pressed() {
603 if shutdown_handle.all_pressed() {
606 logic_fut = None;
607 for queue in input_queues.iter_mut() {
608 queue.drain_input();
609 }
610 false
611 } else {
612 true
613 }
614 } else {
615 if let Some(fut) = logic_fut.as_mut() {
617 if operator_waker.task_ready.load(Ordering::SeqCst) {
618 let waker = futures_util::task::waker_ref(&operator_waker);
619 let mut cx = Context::from_waker(&waker);
620 operator_waker.task_ready.store(false, Ordering::SeqCst);
621 if Pin::new(fut).poll(&mut cx).is_ready() {
622 logic_fut = None;
624 }
625 for flush in output_flushes.iter_mut() {
627 (flush)();
628 }
629 }
630 }
631
632 if logic_fut.is_some() {
634 true
635 } else {
636 for queue in input_queues.iter_mut() {
638 queue.drain_input();
639 }
640 false
641 }
642 }
643 }
644 });
645
646 self.shutdown_button
647 }
648
649 pub fn build_fallible<E: 'static, F>(
686 mut self,
687 constructor: F,
688 ) -> (Button, StreamVec<'scope, T, Rc<E>>)
689 where
690 F: for<'a> FnOnce(
691 &'a mut [CapabilitySet<T>],
692 ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
693 + 'static,
694 {
695 let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
697 let button = self.build(|mut caps| async move {
698 let error_cap = caps.pop().unwrap();
699 let mut caps = caps
700 .into_iter()
701 .map(CapabilitySet::from_elem)
702 .collect::<Vec<_>>();
703 if let Err(err) = constructor(&mut *caps).await {
704 error_output.give(&error_cap, Rc::new(err));
705 drop(error_cap);
706 std::future::pending().await
709 }
710 });
711 (button, error_stream)
712 }
713
714 pub fn operator_info(&self) -> OperatorInfo {
716 self.builder.operator_info()
717 }
718
719 pub fn activator(&self) -> &Activator {
721 &self.activator
722 }
723}
724
725pub fn button<'scope, T: Timestamp>(
727 scope: Scope<'scope, T>,
728 addr: Rc<[usize]>,
729) -> (ButtonHandle, Button) {
730 let index = scope.worker().new_identifier();
731 let (pushers, puller) = scope.worker().allocate(index, addr);
732
733 let local_pressed = Rc::new(Cell::new(false));
734
735 let handle = ButtonHandle {
736 buttons_remaining: scope.peers(),
737 local_pressed: Rc::clone(&local_pressed),
738 puller,
739 };
740
741 let token = Button {
742 pushers,
743 local_pressed,
744 };
745
746 (handle, token)
747}
748
749pub struct ButtonHandle {
751 buttons_remaining: usize,
753 local_pressed: Rc<Cell<bool>>,
755 puller: Box<dyn Pull<Bincode<bool>>>,
756}
757
758impl ButtonHandle {
759 pub fn local_pressed(&self) -> bool {
761 self.local_pressed.get()
762 }
763
764 pub fn all_pressed(&mut self) -> bool {
766 while self.puller.recv().is_some() {
767 self.buttons_remaining -= 1;
768 }
769 self.buttons_remaining == 0
770 }
771}
772
773pub struct Button {
774 pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
775 local_pressed: Rc<Cell<bool>>,
776}
777
778impl Button {
779 pub fn press(&mut self) {
781 for mut pusher in self.pushers.drain(..) {
782 pusher.send(Bincode::from(true));
783 pusher.done();
784 }
785 self.local_pressed.set(true);
786 }
787
788 pub fn press_on_drop(self) -> PressOnDropButton {
791 PressOnDropButton(self)
792 }
793}
794
795pub struct PressOnDropButton(Button);
796
797impl Drop for PressOnDropButton {
798 fn drop(&mut self) {
799 self.0.press();
800 }
801}
802
803#[cfg(test)]
804mod test {
805 use futures_util::StreamExt;
806 use timely::WorkerConfig;
807 use timely::dataflow::channels::pact::Pipeline;
808 use timely::dataflow::operators::Capture;
809 use timely::dataflow::operators::capture::Extract;
810 use timely::dataflow::operators::vec::ToStream;
811
812 use super::*;
813
814 #[mz_ore::test]
815 fn async_operator() {
816 let capture = timely::example(|scope| {
817 let input = (0..10).to_stream(scope);
818
819 let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
820 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
821 let mut input_handle = op.new_input_for(input, Pipeline, &output);
822
823 op.build(move |_capabilities| async move {
824 tokio::task::yield_now().await;
825 while let Some(event) = input_handle.next().await {
826 match event {
827 Event::Data(cap, data) => {
828 for item in data.iter().copied() {
829 tokio::task::yield_now().await;
830 output.give(&cap, item);
831 }
832 }
833 Event::Progress(_frontier) => {}
834 }
835 }
836 });
837
838 output_stream.capture()
839 });
840 let extracted = capture.extract();
841
842 assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
843 }
844
845 #[mz_ore::test]
846 fn gh_18837() {
847 let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
848 timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
849 let index = worker.index();
850 let tokens = worker.dataflow::<u64, _, _>(move |scope| {
851 let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
852 let (_output, output_stream) =
853 producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
854 let producer_button = producer.build(move |mut capabilities| async move {
855 let mut cap = capabilities.pop().unwrap();
856 if index != 0 {
857 return;
858 }
859 cap.downgrade(&1);
861 std::future::pending().await
862 });
863
864 let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
865 let mut input_handle = consumer.new_disconnected_input(output_stream, Pipeline);
866 let consumer_button = consumer.build(move |_| async move {
867 while let Some(event) = input_handle.next().await {
868 if let Event::Progress(frontier) = event {
869 assert!(frontier.less_equal(&1));
871 }
872 }
873 });
874
875 (
876 producer_button.press_on_drop(),
877 consumer_button.press_on_drop(),
878 )
879 });
880
881 for _ in 0..100 {
883 worker.step();
884 }
885 if index == 0 {
887 drop(tokens)
888 }
889 for _ in 0..100 {
891 worker.step();
892 }
893 })
894 .expect("timely panicked");
895 }
896}