1use std::cell::{Cell, RefCell};
19use std::collections::VecDeque;
20use std::future::Future;
21use std::pin::Pin;
22use std::rc::Rc;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::task::{Context, Poll, Waker, ready};
26
27use columnation::Columnation;
28use futures_util::Stream;
29use futures_util::task::ArcWake;
30use timely::communication::{Pull, Push};
31use timely::container::{CapacityContainerBuilder, PushInto};
32use timely::dataflow::channels::Message;
33use timely::dataflow::channels::pact::ParallelizationContract;
34use timely::dataflow::channels::pushers::Output;
35use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
36use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo};
37use timely::dataflow::operators::{Capability, CapabilitySet, InputCapability};
38use timely::dataflow::{Scope, Stream as TimelyStream, StreamVec};
39use timely::progress::{Antichain, Timestamp};
40use timely::scheduling::{Activator, SyncActivator};
41use timely::{Bincode, Container, ContainerBuilder, PartialOrder};
42
43use crate::columnation::ColumnationStack;
44use crate::containers::stack::AccountedStackBuilder;
45
46pub struct OperatorBuilder<'scope, T: Timestamp> {
48 builder: OperatorBuilderRc<'scope, T>,
49 activator: Activator,
51 operator_waker: Arc<TimelyWaker>,
53 input_frontiers: Vec<Antichain<T>>,
55 input_queues: Vec<Box<dyn InputQueue<T>>>,
57 output_flushes: Vec<Box<dyn FnMut()>>,
60 shutdown_handle: ButtonHandle,
62 shutdown_button: Button,
64}
65
66trait InputQueue<T: Timestamp> {
69 fn accept_input(&mut self);
71
72 fn drain_input(&mut self);
74
75 fn notify_progress(&mut self, upper: Antichain<T>);
77}
78
79impl<T, D, C, P> InputQueue<T> for InputHandleQueue<T, D, C, P>
80where
81 T: Timestamp,
82 D: Container,
83 C: InputConnection<T> + 'static,
84 P: Pull<Message<T, D>> + 'static,
85{
86 fn accept_input(&mut self) {
87 let mut queue = self.queue.borrow_mut();
88 let mut new_data = false;
89 self.handle.for_each(|cap, data| {
90 new_data = true;
91 let cap = self.connection.accept(cap);
92 queue.push_back(Event::Data(cap, std::mem::take(data)));
93 });
94 if new_data {
95 if let Some(waker) = self.waker.take() {
96 waker.wake();
97 }
98 }
99 }
100
101 fn drain_input(&mut self) {
102 self.queue.borrow_mut().clear();
103 self.handle.for_each(|_, _| {});
104 }
105
106 fn notify_progress(&mut self, upper: Antichain<T>) {
107 let mut queue = self.queue.borrow_mut();
108 match queue.back_mut() {
112 Some(&mut Event::Progress(ref mut prev_upper)) => *prev_upper = upper,
113 _ => queue.push_back(Event::Progress(upper)),
114 }
115 if let Some(waker) = self.waker.take() {
116 waker.wake();
117 }
118 }
119}
120
121struct InputHandleQueue<
122 T: Timestamp,
123 D: Container,
124 C: InputConnection<T>,
125 P: Pull<Message<T, D>> + 'static,
126> {
127 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
128 waker: Rc<Cell<Option<Waker>>>,
129 connection: C,
130 handle: InputHandleCore<T, D, P>,
131}
132
133struct TimelyWaker {
135 activator: SyncActivator,
136 active: AtomicBool,
137 task_ready: AtomicBool,
138}
139
140impl ArcWake for TimelyWaker {
141 fn wake_by_ref(arc_self: &Arc<Self>) {
142 arc_self.task_ready.store(true, Ordering::SeqCst);
143 if !arc_self.active.load(Ordering::SeqCst) {
145 let _ = arc_self.activator.activate();
150 }
151 }
152}
153
154pub struct AsyncInputHandle<T: Timestamp, D: Container, C: InputConnection<T>> {
156 queue: Rc<RefCell<VecDeque<Event<T, C::Capability, D>>>>,
157 waker: Rc<Cell<Option<Waker>>>,
158 done: bool,
160}
161
162impl<T: Timestamp, D: Container, C: InputConnection<T>> AsyncInputHandle<T, D, C> {
163 pub fn next_sync(&mut self) -> Option<Event<T, C::Capability, D>> {
164 let mut queue = self.queue.borrow_mut();
165 match queue.pop_front()? {
166 Event::Data(cap, data) => Some(Event::Data(cap, data)),
167 Event::Progress(frontier) => {
168 self.done = frontier.is_empty();
169 Some(Event::Progress(frontier))
170 }
171 }
172 }
173
174 pub async fn ready(&self) {
177 std::future::poll_fn(|cx| self.poll_ready(cx)).await
178 }
179
180 fn poll_ready(&self, cx: &Context<'_>) -> Poll<()> {
181 if self.queue.borrow().is_empty() {
182 self.waker.set(Some(cx.waker().clone()));
183 Poll::Pending
184 } else {
185 Poll::Ready(())
186 }
187 }
188}
189
190impl<T: Timestamp, D: Container, C: InputConnection<T>> Stream for AsyncInputHandle<T, D, C> {
191 type Item = Event<T, C::Capability, D>;
192
193 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
194 if self.done {
195 return Poll::Ready(None);
196 }
197 ready!(self.poll_ready(cx));
198 Poll::Ready(self.next_sync())
199 }
200
201 fn size_hint(&self) -> (usize, Option<usize>) {
202 (self.queue.borrow().len(), None)
203 }
204}
205
206#[derive(Debug)]
208pub enum Event<T: Timestamp, C, D> {
209 Data(C, D),
211 Progress(Antichain<T>),
213}
214
215struct AsyncOutputHandleInner<T: Timestamp, CB: ContainerBuilder> {
217 output: Output<T, CB::Container>,
219 capability: Option<Capability<T>>,
221 builder: CB,
223}
224
225impl<T: Timestamp, CB: ContainerBuilder> AsyncOutputHandleInner<T, CB> {
226 fn flush(&mut self) {
228 while let Some(container) = self.builder.finish() {
229 self.output
230 .give(self.capability.as_ref().expect("must exist"), container);
231 }
232 }
233
234 fn cease(&mut self) {
236 self.flush();
237 let _ = self.output.activate();
238 self.capability = None;
239 }
240
241 fn give<D>(&mut self, cap: &Capability<T>, data: D)
244 where
245 CB: PushInto<D>,
246 {
247 if let Some(capability) = &self.capability
248 && cap.time() != capability.time()
249 {
250 self.flush();
251 self.capability = None;
252 }
253 if self.capability.is_none() {
254 self.capability = Some(cap.clone());
255 }
256
257 self.builder.push_into(data);
258 while let Some(container) = self.builder.extract() {
259 self.output
260 .give(self.capability.as_ref().expect("must exist"), container);
261 }
262 }
263}
264
265pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
266 inner: Rc<RefCell<AsyncOutputHandleInner<T, CB>>>,
267 index: usize,
268}
269
270impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
271where
272 T: Timestamp,
273 C: Container + Clone + 'static,
274{
275 #[inline]
276 pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
277 let mut inner = self.inner.borrow_mut();
278 inner.flush();
279 inner.output.give(cap, container);
280 }
281}
282
283impl<T, CB> AsyncOutputHandle<T, CB>
284where
285 T: Timestamp,
286 CB: ContainerBuilder,
287{
288 fn new(output: Output<T, CB::Container>, index: usize) -> Self {
289 let inner = AsyncOutputHandleInner {
290 output,
291 capability: None,
292 builder: CB::default(),
293 };
294 Self {
295 inner: Rc::new(RefCell::new(inner)),
296 index,
297 }
298 }
299
300 fn cease(&self) {
301 self.inner.borrow_mut().cease();
302 }
303}
304
305impl<T, CB> AsyncOutputHandle<T, CB>
306where
307 T: Timestamp,
308 CB: ContainerBuilder,
309{
310 pub fn give<D>(&self, cap: &Capability<T>, data: D)
311 where
312 CB: PushInto<D>,
313 {
314 self.inner.borrow_mut().give(cap, data);
315 }
316}
317
318impl<T, D>
319 AsyncOutputHandle<T, AccountedStackBuilder<CapacityContainerBuilder<ColumnationStack<D>>>>
320where
321 D: Clone + 'static + Columnation,
322 T: Timestamp,
323{
324 pub const MAX_OUTSTANDING_BYTES: usize = 128 * 1024 * 1024;
325
326 pub async fn give_fueled<D2>(&self, cap: &Capability<T>, data: D2)
329 where
330 ColumnationStack<D>: PushInto<D2>,
331 {
332 let should_yield = {
333 let mut handle = self.inner.borrow_mut();
334 handle.give(cap, data);
335 let should_yield = handle.builder.bytes.get() > Self::MAX_OUTSTANDING_BYTES;
336 if should_yield {
337 handle.builder.bytes.set(0);
338 }
339 should_yield
340 };
341 if should_yield {
342 tokio::task::yield_now().await;
343 }
344 }
345}
346
347impl<T: Timestamp, CB: ContainerBuilder> Clone for AsyncOutputHandle<T, CB> {
348 fn clone(&self) -> Self {
349 Self {
350 inner: Rc::clone(&self.inner),
351 index: self.index,
352 }
353 }
354}
355
356pub trait InputConnection<T: Timestamp> {
359 type Capability;
361
362 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
364
365 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
367}
368
369pub struct Disconnected;
371
372impl<T: Timestamp> InputConnection<T> for Disconnected {
373 type Capability = T;
374
375 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
376 vec![Antichain::new(); outputs]
377 }
378
379 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
380 input_cap.time().clone()
381 }
382}
383
384pub struct ConnectedToOne(usize);
386
387impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
388 type Capability = Capability<T>;
389
390 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
391 let mut summary = vec![Antichain::new(); outputs];
392 summary[self.0] = Antichain::from_elem(T::Summary::default());
393 summary
394 }
395
396 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
397 input_cap.retain(self.0)
398 }
399}
400
401pub struct ConnectedToMany<const N: usize>([usize; N]);
403
404impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
405 type Capability = [Capability<T>; N];
406
407 fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
408 let mut summary = vec![Antichain::new(); outputs];
409 for output in self.0 {
410 summary[output] = Antichain::from_elem(T::Summary::default());
411 }
412 summary
413 }
414
415 fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
416 self.0.map(|output| input_cap.retain(output))
417 }
418}
419
420pub trait OutputIndex {
424 fn index(&self) -> usize;
426}
427
428impl<T: Timestamp, CB: ContainerBuilder> OutputIndex for AsyncOutputHandle<T, CB> {
429 fn index(&self) -> usize {
430 self.index
431 }
432}
433
434impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
435 pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
437 let builder = OperatorBuilderRc::new(name, scope);
438 let info = builder.operator_info();
439 let activator = scope.activator_for(Rc::clone(&info.address));
440 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
441 let operator_waker = TimelyWaker {
442 activator: sync_activator,
443 active: AtomicBool::new(false),
444 task_ready: AtomicBool::new(true),
445 };
446 let (shutdown_handle, shutdown_button) = button(scope, info.address);
447
448 OperatorBuilder {
449 builder,
450 activator,
451 operator_waker: Arc::new(operator_waker),
452 input_frontiers: Default::default(),
453 input_queues: Default::default(),
454 output_flushes: Default::default(),
455 shutdown_handle,
456 shutdown_button,
457 }
458 }
459
460 pub fn new_input_for<D, P>(
462 &mut self,
463 stream: TimelyStream<'scope, T, D>,
464 pact: P,
465 output: &dyn OutputIndex,
466 ) -> AsyncInputHandle<T, D, ConnectedToOne>
467 where
468 D: Container + Clone + 'static,
469 P: ParallelizationContract<T, D>,
470 {
471 let index = output.index();
472 assert!(index < self.builder.shape().outputs());
473 self.new_input_connection(stream, pact, ConnectedToOne(index))
474 }
475
476 pub fn new_input_for_many<const N: usize, D, P>(
478 &mut self,
479 stream: TimelyStream<'scope, T, D>,
480 pact: P,
481 outputs: [&dyn OutputIndex; N],
482 ) -> AsyncInputHandle<T, D, ConnectedToMany<N>>
483 where
484 D: Container + Clone + 'static,
485 P: ParallelizationContract<T, D>,
486 {
487 let indices = outputs.map(|output| output.index());
488 for index in indices {
489 assert!(index < self.builder.shape().outputs());
490 }
491 self.new_input_connection(stream, pact, ConnectedToMany(indices))
492 }
493
494 pub fn new_disconnected_input<D, P>(
496 &mut self,
497 stream: TimelyStream<'scope, T, D>,
498 pact: P,
499 ) -> AsyncInputHandle<T, D, Disconnected>
500 where
501 D: Container + Clone + 'static,
502 P: ParallelizationContract<T, D>,
503 {
504 self.new_input_connection(stream, pact, Disconnected)
505 }
506
507 pub fn new_input_connection<D, P, C>(
509 &mut self,
510 stream: TimelyStream<'scope, T, D>,
511 pact: P,
512 connection: C,
513 ) -> AsyncInputHandle<T, D, C>
514 where
515 D: Container + Clone + 'static,
516 P: ParallelizationContract<T, D>,
517 C: InputConnection<T> + 'static,
518 {
519 self.input_frontiers
520 .push(Antichain::from_elem(T::minimum()));
521
522 let outputs = self.builder.shape().outputs();
523 let handle = self.builder.new_input_connection(
524 stream,
525 pact,
526 connection.describe(outputs).into_iter().enumerate(),
527 );
528
529 let waker = Default::default();
530 let queue = Default::default();
531 let input_queue = InputHandleQueue {
532 queue: Rc::clone(&queue),
533 waker: Rc::clone(&waker),
534 connection,
535 handle,
536 };
537 self.input_queues.push(Box::new(input_queue));
538
539 AsyncInputHandle {
540 queue,
541 waker,
542 done: false,
543 }
544 }
545
546 pub fn new_output<CB: ContainerBuilder>(
548 &mut self,
549 ) -> (
550 AsyncOutputHandle<T, CB>,
551 TimelyStream<'scope, T, CB::Container>,
552 ) {
553 let index = self.builder.shape().outputs();
554
555 let (output, stream) = self.builder.new_output_connection([]);
556
557 let handle = AsyncOutputHandle::new(output, index);
558
559 let flush_handle = handle.clone();
560 self.output_flushes
561 .push(Box::new(move || flush_handle.cease()));
562
563 (handle, stream)
564 }
565
566 pub fn build<B, L>(self, constructor: B) -> Button
571 where
572 B: FnOnce(Vec<Capability<T>>) -> L,
573 L: Future + 'static,
574 {
575 let operator_waker = self.operator_waker;
576 let mut input_frontiers = self.input_frontiers;
577 let mut input_queues = self.input_queues;
578 let mut output_flushes = self.output_flushes;
579 let mut shutdown_handle = self.shutdown_handle;
580 self.builder.build_reschedule(move |caps| {
581 let mut logic_fut = Some(Box::pin(constructor(caps)));
582 move |new_frontiers| {
583 operator_waker.active.store(true, Ordering::SeqCst);
584 for (i, queue) in input_queues.iter_mut().enumerate() {
585 let cur = &mut input_frontiers[i];
587 let new = new_frontiers[i].frontier();
588 if PartialOrder::less_than(&cur.borrow(), &new) {
589 queue.notify_progress(new.to_owned());
590 *cur = new.to_owned();
591 }
592 queue.accept_input();
595 }
596 operator_waker.active.store(false, Ordering::SeqCst);
597
598 if shutdown_handle.local_pressed() {
602 if shutdown_handle.all_pressed() {
605 logic_fut = None;
606 for queue in input_queues.iter_mut() {
607 queue.drain_input();
608 }
609 false
610 } else {
611 true
612 }
613 } else {
614 if let Some(fut) = logic_fut.as_mut() {
616 if operator_waker.task_ready.load(Ordering::SeqCst) {
617 let waker = futures_util::task::waker_ref(&operator_waker);
618 let mut cx = Context::from_waker(&waker);
619 operator_waker.task_ready.store(false, Ordering::SeqCst);
620 if Pin::new(fut).poll(&mut cx).is_ready() {
621 logic_fut = None;
623 }
624 for flush in output_flushes.iter_mut() {
626 (flush)();
627 }
628 }
629 }
630
631 if logic_fut.is_some() {
633 true
634 } else {
635 for queue in input_queues.iter_mut() {
637 queue.drain_input();
638 }
639 false
640 }
641 }
642 }
643 });
644
645 self.shutdown_button
646 }
647
648 pub fn build_fallible<E: 'static, F>(
685 mut self,
686 constructor: F,
687 ) -> (Button, StreamVec<'scope, T, Rc<E>>)
688 where
689 F: for<'a> FnOnce(
690 &'a mut [CapabilitySet<T>],
691 ) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
692 + 'static,
693 {
694 let (error_output, error_stream) = self.new_output::<CapacityContainerBuilder<_>>();
696 let button = self.build(|mut caps| async move {
697 let error_cap = caps.pop().unwrap();
698 let mut caps = caps
699 .into_iter()
700 .map(CapabilitySet::from_elem)
701 .collect::<Vec<_>>();
702 if let Err(err) = constructor(&mut *caps).await {
703 error_output.give(&error_cap, Rc::new(err));
704 drop(error_cap);
705 std::future::pending().await
708 }
709 });
710 (button, error_stream)
711 }
712
713 pub fn operator_info(&self) -> OperatorInfo {
715 self.builder.operator_info()
716 }
717
718 pub fn activator(&self) -> &Activator {
720 &self.activator
721 }
722}
723
724pub fn button<'scope, T: Timestamp>(
726 scope: Scope<'scope, T>,
727 addr: Rc<[usize]>,
728) -> (ButtonHandle, Button) {
729 let index = scope.worker().new_identifier();
730 let (pushers, puller) = scope.worker().allocate(index, addr);
731
732 let local_pressed = Rc::new(Cell::new(false));
733
734 let handle = ButtonHandle {
735 buttons_remaining: scope.peers(),
736 local_pressed: Rc::clone(&local_pressed),
737 puller,
738 };
739
740 let token = Button {
741 pushers,
742 local_pressed,
743 };
744
745 (handle, token)
746}
747
748pub struct ButtonHandle {
750 buttons_remaining: usize,
752 local_pressed: Rc<Cell<bool>>,
754 puller: Box<dyn Pull<Bincode<bool>>>,
755}
756
757impl ButtonHandle {
758 pub fn local_pressed(&self) -> bool {
760 self.local_pressed.get()
761 }
762
763 pub fn all_pressed(&mut self) -> bool {
765 while self.puller.recv().is_some() {
766 self.buttons_remaining -= 1;
767 }
768 self.buttons_remaining == 0
769 }
770}
771
772pub struct Button {
773 pushers: Vec<Box<dyn Push<Bincode<bool>>>>,
774 local_pressed: Rc<Cell<bool>>,
775}
776
777impl Button {
778 pub fn press(&mut self) {
780 for mut pusher in self.pushers.drain(..) {
781 pusher.send(Bincode::from(true));
782 pusher.done();
783 }
784 self.local_pressed.set(true);
785 }
786
787 pub fn press_on_drop(self) -> PressOnDropButton {
790 PressOnDropButton(self)
791 }
792}
793
794pub struct PressOnDropButton(Button);
795
796impl Drop for PressOnDropButton {
797 fn drop(&mut self) {
798 self.0.press();
799 }
800}
801
802#[cfg(test)]
803mod test {
804 use futures_util::StreamExt;
805 use timely::WorkerConfig;
806 use timely::dataflow::channels::pact::Pipeline;
807 use timely::dataflow::operators::Capture;
808 use timely::dataflow::operators::capture::Extract;
809 use timely::dataflow::operators::vec::ToStream;
810
811 use super::*;
812
813 #[mz_ore::test]
814 fn async_operator() {
815 let capture = timely::example(|scope| {
816 let input = (0..10).to_stream(scope);
817
818 let mut op = OperatorBuilder::new("async_passthru".to_string(), input.scope());
819 let (output, output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
820 let mut input_handle = op.new_input_for(input, Pipeline, &output);
821
822 op.build(move |_capabilities| async move {
823 tokio::task::yield_now().await;
824 while let Some(event) = input_handle.next().await {
825 match event {
826 Event::Data(cap, data) => {
827 for item in data.iter().copied() {
828 tokio::task::yield_now().await;
829 output.give(&cap, item);
830 }
831 }
832 Event::Progress(_frontier) => {}
833 }
834 }
835 });
836
837 output_stream.capture()
838 });
839 let extracted = capture.extract();
840
841 assert_eq!(extracted, vec![(0, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]);
842 }
843
844 #[mz_ore::test]
845 fn gh_18837() {
846 let (builders, other) = timely::CommunicationConfig::Process(2).try_build().unwrap();
847 timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
848 let index = worker.index();
849 let tokens = worker.dataflow::<u64, _, _>(move |scope| {
850 let mut producer = OperatorBuilder::new("producer".to_string(), scope.clone());
851 let (_output, output_stream) =
852 producer.new_output::<CapacityContainerBuilder<Vec<usize>>>();
853 let producer_button = producer.build(move |mut capabilities| async move {
854 let mut cap = capabilities.pop().unwrap();
855 if index != 0 {
856 return;
857 }
858 cap.downgrade(&1);
860 std::future::pending().await
861 });
862
863 let mut consumer = OperatorBuilder::new("consumer".to_string(), scope.clone());
864 let mut input_handle = consumer.new_disconnected_input(output_stream, Pipeline);
865 let consumer_button = consumer.build(move |_| async move {
866 while let Some(event) = input_handle.next().await {
867 if let Event::Progress(frontier) = event {
868 assert!(frontier.less_equal(&1));
870 }
871 }
872 });
873
874 (
875 producer_button.press_on_drop(),
876 consumer_button.press_on_drop(),
877 )
878 });
879
880 for _ in 0..100 {
882 worker.step();
883 }
884 if index == 0 {
886 drop(tokens)
887 }
888 for _ in 0..100 {
890 worker.step();
891 }
892 })
893 .expect("timely panicked");
894 }
895}