1use std::cell::RefCell;
17use std::convert::Infallible;
18use std::rc::Rc;
19
20use timely::container::CapacityContainerBuilder;
21use timely::dataflow::operators::{CapabilitySet, InspectCore};
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::progress::Timestamp;
24use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
25use timely::scheduling::Activator;
26use timely::{Container, Data, PartialOrder};
27use tokio::sync::Notify;
28
29use crate::builder_async::OperatorBuilder as AsyncOperatorBuilder;
30
31pub trait ProbeNotify<G: Scope> {
33 fn probe_notify_with(&self, handles: Vec<Handle<G::Timestamp>>) -> Self;
35}
36
37impl<G, C> ProbeNotify<G> for StreamCore<G, C>
38where
39 G: Scope,
40 C: Container + Data,
41{
42 fn probe_notify_with(&self, mut handles: Vec<Handle<G::Timestamp>>) -> Self {
43 if handles.is_empty() {
44 return self.clone();
45 }
46 for handle in &mut handles {
49 handle.update_frontier(&[Timestamp::minimum()]);
50 }
51
52 self.inspect_container(move |update| {
55 if let Err(frontier) = update {
56 for handle in &mut handles {
57 handle.update_frontier(frontier);
58 }
59 }
60 })
61 }
62}
63
64#[derive(Debug)]
65pub struct Handle<T: Timestamp> {
66 frontier: Rc<RefCell<MutableAntichain<T>>>,
68 handle_frontier: Antichain<T>,
70 notify: Rc<Notify>,
71 activators: Rc<RefCell<Vec<Activator>>>,
73}
74
75impl<T: Timestamp> Default for Handle<T> {
76 fn default() -> Self {
77 Handle {
81 frontier: Rc::new(RefCell::new(MutableAntichain::new())),
82 handle_frontier: Antichain::new(),
83 notify: Rc::new(Notify::new()),
84 activators: Rc::default(),
85 }
86 }
87}
88
89impl<T: Timestamp> Handle<T> {
90 pub async fn progressed(&self) {
92 self.notify.notified().await
93 }
94
95 #[inline]
97 pub fn less_than(&self, time: &T) -> bool {
98 self.frontier.borrow().less_than(time)
99 }
100 #[inline]
102 pub fn less_equal(&self, time: &T) -> bool {
103 self.frontier.borrow().less_equal(time)
104 }
105 #[inline]
107 pub fn done(&self) -> bool {
108 self.frontier.borrow().is_empty()
109 }
110
111 #[inline]
125 pub fn with_frontier<R, F: FnMut(AntichainRef<T>) -> R>(&self, mut function: F) -> R {
126 function(self.frontier.borrow().frontier())
127 }
128
129 #[inline]
130 fn update_frontier(&mut self, new_frontier: &[T]) {
131 let mut frontier = self.frontier.borrow_mut();
132 let changes = frontier.update_iter(
133 self.handle_frontier
134 .iter()
135 .map(|t| (t.clone(), -1))
136 .chain(new_frontier.iter().map(|t| (t.clone(), 1))),
137 );
138 self.handle_frontier.clear();
139 self.handle_frontier.extend(new_frontier.iter().cloned());
140 if changes.count() > 0 {
141 self.notify.notify_waiters();
142 for activator in self.activators.borrow().iter() {
143 activator.activate();
144 }
145 }
146 }
147
148 pub fn activate(&self, activator: Activator) {
150 self.activators.borrow_mut().push(activator);
151 }
152}
153
154impl<T: Timestamp> Drop for Handle<T> {
155 fn drop(&mut self) {
156 self.frontier
158 .borrow_mut()
159 .update_iter(self.handle_frontier.iter().map(|t| (t.clone(), -1)));
160 }
161}
162
163impl<T: Timestamp> Clone for Handle<T> {
164 fn clone(&self) -> Self {
165 Handle {
166 frontier: Rc::clone(&self.frontier),
167 handle_frontier: Antichain::new(),
168 notify: Rc::clone(&self.notify),
169 activators: Rc::clone(&self.activators),
170 }
171 }
172}
173
174pub fn source<G, T>(scope: G, name: String, handle: Handle<T>) -> Stream<G, Infallible>
179where
180 G: Scope<Timestamp = T>,
181 T: Timestamp,
182{
183 let mut builder = AsyncOperatorBuilder::new(name, scope);
184 let (_output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
185
186 builder.build(move |capabilities| async move {
187 let mut cap_set = CapabilitySet::from(capabilities);
188 let mut frontier = Antichain::from_elem(T::minimum());
189
190 let mut downgrade_capability = |f: AntichainRef<T>| {
191 if PartialOrder::less_than(&frontier.borrow(), &f) {
192 frontier = f.to_owned();
193 cap_set.downgrade(&f);
194 }
195 !frontier.is_empty()
196 };
197
198 while handle.with_frontier(&mut downgrade_capability) {
199 handle.progressed().await;
200 }
201 });
202
203 output_stream
204}