timely/dataflow/operators/generic/notificator.rs
1use crate::progress::frontier::{AntichainRef, MutableAntichain};
2use crate::progress::Timestamp;
3use crate::dataflow::operators::Capability;
4
5/// Tracks requests for notification and delivers available notifications.
6///
7/// A `Notificator` represents a dynamic set of notifications and a fixed notification frontier.
8/// One can interact with one by requesting notification with `notify_at`, and retrieving notifications
9/// with `for_each` and `next`. The next notification to be delivered will be the available notification
10/// with the least timestamp, with the implication that the notifications will be non-decreasing as long
11/// as you do not request notifications at times prior to those that have already been delivered.
12///
13/// Notification requests persist across uses of `Notificator`, and it may help to think of `Notificator`
14/// as a notification *session*. However, idiomatically it seems you mostly want to restrict your usage
15/// to such sessions, which is why this is the main notificator type.
16#[derive(Debug)]
17pub struct Notificator<'a, T: Timestamp> {
18 frontiers: &'a [&'a MutableAntichain<T>],
19 inner: &'a mut FrontierNotificator<T>,
20}
21
22impl<'a, T: Timestamp> Notificator<'a, T> {
23 /// Allocates a new `Notificator`.
24 ///
25 /// This is more commonly accomplished using `input.monotonic(frontiers)`.
26 pub fn new(
27 frontiers: &'a [&'a MutableAntichain<T>],
28 inner: &'a mut FrontierNotificator<T>,
29 ) -> Self {
30
31 inner.make_available(frontiers);
32
33 Notificator {
34 frontiers,
35 inner,
36 }
37 }
38
39 /// Reveals the elements in the frontier of the indicated input.
40 pub fn frontier(&self, input: usize) -> AntichainRef<'_, T> {
41 self.frontiers[input].frontier()
42 }
43
44 /// Requests a notification at the time associated with capability `cap`.
45 ///
46 /// In order to request a notification at future timestamp, obtain a capability for the new
47 /// timestamp first, as show in the example.
48 ///
49 /// # Examples
50 /// ```
51 /// use timely::dataflow::operators::ToStream;
52 /// use timely::dataflow::operators::generic::Operator;
53 /// use timely::dataflow::channels::pact::Pipeline;
54 ///
55 /// timely::example(|scope| {
56 /// (0..10).to_stream(scope)
57 /// .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| {
58 /// input.for_each(|cap, data| {
59 /// output.session(&cap).give_container(data);
60 /// let time = cap.time().clone() + 1;
61 /// notificator.notify_at(cap.delayed(&time));
62 /// });
63 /// notificator.for_each(|cap, count, _| {
64 /// println!("done with time: {:?}, requested {} times", cap.time(), count);
65 /// assert!(*cap.time() == 0 && count == 2 || count == 1);
66 /// });
67 /// });
68 /// });
69 /// ```
70 #[inline]
71 pub fn notify_at(&mut self, cap: Capability<T>) {
72 self.inner.notify_at_frontiered(cap, self.frontiers);
73 }
74
75 /// Repeatedly calls `logic` until exhaustion of the available notifications.
76 ///
77 /// `logic` receives a capability for `t`, the timestamp being notified and a `count`
78 /// representing how many capabilities were requested for that specific timestamp.
79 #[inline]
80 pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
81 while let Some((cap, count)) = self.next() {
82 logic(cap, count, self);
83 }
84 }
85}
86
87impl<T: Timestamp> Iterator for Notificator<'_, T> {
88 type Item = (Capability<T>, u64);
89
90 /// Retrieve the next available notification.
91 ///
92 /// Returns `None` if no notification is available. Returns `Some(cap, count)` otherwise:
93 /// `cap` is a capability for `t`, the timestamp being notified and, `count` represents
94 /// how many notifications (out of those requested) are being delivered for that specific
95 /// timestamp.
96 #[inline]
97 fn next(&mut self) -> Option<(Capability<T>, u64)> {
98 self.inner.next_count(self.frontiers)
99 }
100}
101
102#[test]
103fn notificator_delivers_notifications_in_topo_order() {
104 use std::rc::Rc;
105 use std::cell::RefCell;
106 use crate::progress::ChangeBatch;
107 use crate::progress::frontier::MutableAntichain;
108 use crate::order::Product;
109 use crate::dataflow::operators::capability::Capability;
110
111 let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0));
112
113 let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));
114
115 // notificator.update_frontier_from_cm(&mut vec![ChangeBatch::new_from(ts_from_tuple((0, 0)), 1)]);
116 let times = [
117 Product::new(3, 5),
118 Product::new(5, 4),
119 Product::new(1, 2),
120 Product::new(1, 1),
121 Product::new(1, 1),
122 Product::new(5, 4),
123 Product::new(6, 0),
124 Product::new(6, 2),
125 Product::new(5, 8),
126 ];
127
128 // create a raw notificator with pending notifications at the times above.
129 let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t)));
130
131 // the frontier is initially (0,0), and so we should deliver no notifications.
132 assert!(frontier_notificator.monotonic(&[&frontier]).next().is_none());
133
134 // advance the frontier to [(5,7), (6,0)], opening up some notifications.
135 frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]);
136
137 {
138 let frontiers = [&frontier];
139 let mut notificator = frontier_notificator.monotonic(&frontiers);
140
141 // we should deliver the following available notifications, in this order.
142 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1));
143 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,2));
144 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(3,5));
145 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,4));
146 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,0));
147 assert_eq!(notificator.next(), None);
148 }
149
150 // advance the frontier to [(6,10)] opening up all remaining notifications.
151 frontier.update_iter(vec![(Product::new(5,7), -1), (Product::new(6,1), -1), (Product::new(6,10), 1)]);
152
153 {
154 let frontiers = [&frontier];
155 let mut notificator = frontier_notificator.monotonic(&frontiers);
156
157 // the first available notification should be (5,8). Note: before (6,0) in the total order, but not
158 // in the partial order. We don't make the promise that we respect the total order.
159 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5, 8));
160
161 // add a new notification, mid notification session.
162 notificator.notify_at(root_capability.delayed(&Product::new(5,9)));
163
164 // we expect to see (5,9) before we see (6,2) before we see None.
165 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,9));
166 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,2));
167 assert_eq!(notificator.next(), None);
168 }
169}
170
171/// Tracks requests for notification and delivers available notifications.
172///
173/// `FrontierNotificator` is meant to manage the delivery of requested notifications in the
174/// presence of inputs that may have outstanding messages to deliver.
175/// The notificator inspects the frontiers, as presented from the outside, for each input.
176/// Requested notifications can be served only once there are no frontier elements less-or-equal
177/// to them, and there are no other pending notification requests less than them. Each will be
178/// less-or-equal to itself, so we want to dodge that corner case.
179///
180/// # Examples
181/// ```
182/// use std::collections::HashMap;
183/// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
184/// use timely::dataflow::operators::generic::operator::Operator;
185/// use timely::dataflow::channels::pact::Pipeline;
186///
187/// timely::execute(timely::Config::thread(), |worker| {
188/// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
189/// let (in1_handle, in1) = scope.new_input();
190/// let (in2_handle, in2) = scope.new_input();
191/// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
192/// let mut notificator = FrontierNotificator::default();
193/// let mut stash = HashMap::new();
194/// move |input1, input2, output| {
195/// while let Some((time, data)) = input1.next() {
196/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
197/// notificator.notify_at(time.retain());
198/// }
199/// while let Some((time, data)) = input2.next() {
200/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
201/// notificator.notify_at(time.retain());
202/// }
203/// notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
204/// if let Some(mut vec) = stash.remove(time.time()) {
205/// output.session(&time).give_iterator(vec.drain(..));
206/// }
207/// });
208/// }
209/// })
210/// .container::<Vec<_>>()
211/// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
212///
213/// (in1_handle, in2_handle)
214/// });
215///
216/// for i in 1..10 {
217/// in1.send(i - 1);
218/// in1.advance_to(i);
219/// in2.send(i - 1);
220/// in2.advance_to(i);
221/// }
222/// in1.close();
223/// in2.close();
224/// }).unwrap();
225/// ```
226#[derive(Debug)]
227pub struct FrontierNotificator<T: Timestamp> {
228 pending: Vec<(Capability<T>, u64)>,
229 available: ::std::collections::BinaryHeap<OrderReversed<T>>,
230}
231
232impl<T: Timestamp> Default for FrontierNotificator<T> {
233 fn default() -> Self {
234 FrontierNotificator {
235 pending: Vec::new(),
236 available: ::std::collections::BinaryHeap::new(),
237 }
238 }
239}
240
241impl<T: Timestamp> FrontierNotificator<T> {
242 /// Allocates a new `FrontierNotificator` with initial capabilities.
243 pub fn from<I: IntoIterator<Item=Capability<T>>>(iter: I) -> Self {
244 FrontierNotificator {
245 pending: iter.into_iter().map(|x| (x,1)).collect(),
246 available: ::std::collections::BinaryHeap::new(),
247 }
248 }
249
250 /// Requests a notification at the time associated with capability `cap`. Takes ownership of
251 /// the capability.
252 ///
253 /// In order to request a notification at future timestamp, obtain a capability for the new
254 /// timestamp first, as shown in the example.
255 ///
256 /// # Examples
257 /// ```
258 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
259 /// use timely::dataflow::operators::generic::operator::Operator;
260 /// use timely::dataflow::channels::pact::Pipeline;
261 ///
262 /// timely::example(|scope| {
263 /// (0..10).to_stream(scope)
264 /// .unary_frontier(Pipeline, "example", |_, _| {
265 /// let mut notificator = FrontierNotificator::default();
266 /// move |input, output| {
267 /// input.for_each(|cap, data| {
268 /// output.session(&cap).give_container(data);
269 /// let time = cap.time().clone() + 1;
270 /// notificator.notify_at(cap.delayed(&time));
271 /// });
272 /// notificator.for_each(&[input.frontier()], |cap, _| {
273 /// println!("done with time: {:?}", cap.time());
274 /// });
275 /// }
276 /// });
277 /// });
278 /// ```
279 #[inline]
280 pub fn notify_at(&mut self, cap: Capability<T>) {
281 self.pending.push((cap,1));
282 }
283
284 /// Requests a notification at the time associated with capability `cap`.
285 ///
286 /// The method takes list of frontiers from which it determines if the capability is immediately available.
287 /// When used with the same frontier as `make_available`, this method can ensure that notifications are
288 /// non-decreasing. Simply using `notify_at` will only insert new notifications into the list of pending
289 /// notifications, which are only re-examine with calls to `make_available`.
290 #[inline]
291 pub fn notify_at_frontiered<'a>(&mut self, cap: Capability<T>, frontiers: &'a [&'a MutableAntichain<T>]) {
292 if frontiers.iter().all(|f| !f.less_equal(cap.time())) {
293 self.available.push(OrderReversed::new(cap, 1));
294 }
295 else {
296 self.pending.push((cap,1));
297 }
298 }
299
300 /// Enables pending notifications not in advance of any element of `frontiers`.
301 pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) {
302
303 // By invariant, nothing in self.available is greater_equal anything in self.pending.
304 // It should be safe to append any ordered subset of self.pending to self.available,
305 // in that the sequence of capabilities in self.available will remain non-decreasing.
306
307 if !self.pending.is_empty() {
308
309 self.pending.sort_by(|x,y| x.0.time().cmp(y.0.time()));
310 for i in 0 .. self.pending.len() - 1 {
311 if self.pending[i].0.time() == self.pending[i+1].0.time() {
312 self.pending[i+1].1 += self.pending[i].1;
313 self.pending[i].1 = 0;
314 }
315 }
316 self.pending.retain(|x| x.1 > 0);
317
318 for i in 0 .. self.pending.len() {
319 if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) {
320 // TODO : This clones a capability, whereas we could move it instead.
321 self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1));
322 self.pending[i].1 = 0;
323 }
324 }
325 self.pending.retain(|x| x.1 > 0);
326 }
327 }
328
329 /// Returns the next available capability with respect to the supplied frontiers, if one exists,
330 /// and the count of how many instances are found.
331 ///
332 /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
333 /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
334 /// use `for_each`, or (ii) call `make_available` first.
335 #[inline]
336 pub fn next_count<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<(Capability<T>, u64)> {
337 if self.available.is_empty() {
338 self.make_available(frontiers);
339 }
340 self.available.pop().map(|front| {
341 let mut count = front.value;
342 while self.available.peek() == Some(&front) {
343 count += self.available.pop().unwrap().value;
344 }
345 (front.element, count)
346 })
347 }
348
349 /// Returns the next available capability with respect to the supplied frontiers, if one exists.
350 ///
351 /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
352 /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
353 /// use `for_each`, or (ii) call `make_available` first.
354 #[inline]
355 pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
356 self.next_count(frontiers).map(|(cap, _)| cap)
357 }
358
359 /// Repeatedly calls `logic` till exhaustion of the notifications made available by inspecting
360 /// the frontiers.
361 ///
362 /// `logic` receives a capability for `t`, the timestamp being notified.
363 #[inline]
364 pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(&mut self, frontiers: &'a [&'a MutableAntichain<T>], mut logic: F) {
365 self.make_available(frontiers);
366 while let Some(cap) = self.next(frontiers) {
367 logic(cap, self);
368 }
369 }
370
371 /// Creates a notificator session in which delivered notification will be non-decreasing.
372 ///
373 /// This implementation can be emulated with judicious use of `make_available` and `notify_at_frontiered`,
374 /// in the event that `Notificator` provides too restrictive an interface.
375 #[inline]
376 pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Notificator<'a, T> {
377 Notificator::new(frontiers, self)
378 }
379
380 /// Iterates over pending capabilities and their count. The count represents how often a
381 /// capability has been requested.
382 ///
383 /// To make sure all pending capabilities are above the frontier, use `for_each` or exhaust
384 /// `next` to consume all available capabilities.
385 ///
386 /// # Examples
387 /// ```
388 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
389 /// use timely::dataflow::operators::generic::operator::Operator;
390 /// use timely::dataflow::channels::pact::Pipeline;
391 ///
392 /// timely::example(|scope| {
393 /// (0..10).to_stream(scope)
394 /// .unary_frontier(Pipeline, "example", |_, _| {
395 /// let mut notificator = FrontierNotificator::default();
396 /// move |input, output| {
397 /// input.for_each(|cap, data| {
398 /// output.session(&cap).give_container(data);
399 /// let time = cap.time().clone() + 1;
400 /// notificator.notify_at(cap.delayed(&time));
401 /// assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
402 /// });
403 /// notificator.for_each(&[input.frontier()], |cap, _| {
404 /// println!("done with time: {:?}", cap.time());
405 /// });
406 /// }
407 /// });
408 /// });
409 /// ```
410 pub fn pending(&self) -> ::std::slice::Iter<'_, (Capability<T>, u64)> {
411 self.pending.iter()
412 }
413}
414
415#[derive(Debug, PartialEq, Eq)]
416struct OrderReversed<T: Timestamp> {
417 element: Capability<T>,
418 value: u64,
419}
420
421impl<T: Timestamp> OrderReversed<T> {
422 fn new(element: Capability<T>, value: u64) -> Self { OrderReversed { element, value} }
423}
424
425impl<T: Timestamp> PartialOrd for OrderReversed<T> {
426 fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
427 Some(self.cmp(other))
428 }
429}
430impl<T: Timestamp> Ord for OrderReversed<T> {
431 fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
432 other.element.time().cmp(self.element.time())
433 }
434}