Skip to main content

timely/dataflow/operators/generic/
notificator.rs

1use crate::progress::frontier::{AntichainRef, MutableAntichain};
2use crate::progress::Timestamp;
3use crate::dataflow::operators::Capability;
4
5/// Tracks requests for notification and delivers available notifications.
6///
7/// A `Notificator` represents a dynamic set of notifications and a fixed notification frontier.
8/// One can interact with one by requesting notification with `notify_at`, and retrieving notifications
9/// with `for_each` and `next`. The next notification to be delivered will be the available notification
10/// with the least timestamp, with the implication that the notifications will be non-decreasing as long
11/// as you do not request notifications at times prior to those that have already been delivered.
12///
13/// Notification requests persist across uses of `Notificator`, and it may help to think of `Notificator`
14/// as a notification *session*. However, idiomatically it seems you mostly want to restrict your usage
15/// to such sessions, which is why this is the main notificator type.
16#[derive(Debug)]
17pub struct Notificator<'a, T: Timestamp> {
18    frontiers: &'a [&'a MutableAntichain<T>],
19    inner: &'a mut FrontierNotificator<T>,
20}
21
22impl<'a, T: Timestamp> Notificator<'a, T> {
23    /// Allocates a new `Notificator`.
24    ///
25    /// This is more commonly accomplished using `input.monotonic(frontiers)`.
26    pub fn new(
27        frontiers: &'a [&'a MutableAntichain<T>],
28        inner: &'a mut FrontierNotificator<T>,
29    ) -> Self {
30
31        inner.make_available(frontiers);
32
33        Notificator {
34            frontiers,
35            inner,
36        }
37    }
38
39    /// Reveals the elements in the frontier of the indicated input.
40    pub fn frontier(&self, input: usize) -> AntichainRef<'_, T> {
41        self.frontiers[input].frontier()
42    }
43
44    /// Requests a notification at the time associated with capability `cap`.
45    ///
46    /// In order to request a notification at future timestamp, obtain a capability for the new
47    /// timestamp first, as show in the example.
48    ///
49    /// # Examples
50    /// ```
51    /// use timely::dataflow::operators::ToStream;
52    /// use timely::dataflow::operators::generic::Operator;
53    /// use timely::dataflow::channels::pact::Pipeline;
54    ///
55    /// timely::example(|scope| {
56    ///     (0..10).to_stream(scope)
57    ///            .container::<Vec<_>>()
58    ///            .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| {
59    ///                input.for_each_time(|cap, data| {
60    ///                    output.session(&cap).give_containers(data);
61    ///                    let time = cap.time().clone() + 1;
62    ///                    notificator.notify_at(cap.delayed(&time, output.output_index()));
63    ///                });
64    ///                notificator.for_each(|cap, count, _| {
65    ///                    println!("done with time: {:?}, requested {} times", cap.time(), count);
66    ///                    assert!(*cap.time() == 0 && count == 2 || count == 1);
67    ///                });
68    ///            });
69    /// });
70    /// ```
71    #[inline]
72    pub fn notify_at(&mut self, cap: Capability<T>) {
73        self.inner.notify_at_frontiered(cap, self.frontiers);
74    }
75
76    /// Repeatedly calls `logic` until exhaustion of the available notifications.
77    ///
78    /// `logic` receives a capability for `t`, the timestamp being notified and a `count`
79    /// representing how many capabilities were requested for that specific timestamp.
80    #[inline]
81    pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
82        while let Some((cap, count)) = self.next() {
83            logic(cap, count, self);
84        }
85    }
86}
87
88impl<T: Timestamp> Iterator for Notificator<'_, T> {
89    type Item = (Capability<T>, u64);
90
91    /// Retrieve the next available notification.
92    ///
93    /// Returns `None` if no notification is available. Returns `Some(cap, count)` otherwise:
94    /// `cap` is a capability for `t`, the timestamp being notified and, `count` represents
95    /// how many notifications (out of those requested) are being delivered for that specific
96    /// timestamp.
97    #[inline]
98    fn next(&mut self) -> Option<(Capability<T>, u64)> {
99        self.inner.next_count(self.frontiers)
100    }
101}
102
103#[test]
104fn notificator_delivers_notifications_in_topo_order() {
105    use std::rc::Rc;
106    use std::cell::RefCell;
107    use crate::progress::ChangeBatch;
108    use crate::progress::frontier::MutableAntichain;
109    use crate::order::Product;
110    use crate::dataflow::operators::capability::Capability;
111
112    let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0));
113
114    let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));
115
116    // notificator.update_frontier_from_cm(&mut vec![ChangeBatch::new_from(ts_from_tuple((0, 0)), 1)]);
117    let times = [
118        Product::new(3, 5),
119        Product::new(5, 4),
120        Product::new(1, 2),
121        Product::new(1, 1),
122        Product::new(1, 1),
123        Product::new(5, 4),
124        Product::new(6, 0),
125        Product::new(6, 2),
126        Product::new(5, 8),
127    ];
128
129    // create a raw notificator with pending notifications at the times above.
130    let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t)));
131
132    // the frontier is initially (0,0), and so we should deliver no notifications.
133    assert!(frontier_notificator.monotonic(&[&frontier]).next().is_none());
134
135    // advance the frontier to [(5,7), (6,0)], opening up some notifications.
136    frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]);
137
138    {
139        let frontiers = [&frontier];
140        let mut notificator = frontier_notificator.monotonic(&frontiers);
141
142        // we should deliver the following available notifications, in this order.
143        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1));
144        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,2));
145        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(3,5));
146        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,4));
147        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,0));
148        assert_eq!(notificator.next(), None);
149    }
150
151    // advance the frontier to [(6,10)] opening up all remaining notifications.
152    frontier.update_iter(vec![(Product::new(5,7), -1), (Product::new(6,1), -1), (Product::new(6,10), 1)]);
153
154    {
155        let frontiers = [&frontier];
156        let mut notificator = frontier_notificator.monotonic(&frontiers);
157
158        // the first available notification should be (5,8). Note: before (6,0) in the total order, but not
159        // in the partial order. We don't make the promise that we respect the total order.
160        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5, 8));
161
162        // add a new notification, mid notification session.
163        notificator.notify_at(root_capability.delayed(&Product::new(5,9)));
164
165        // we expect to see (5,9) before we see (6,2) before we see None.
166        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,9));
167        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,2));
168        assert_eq!(notificator.next(), None);
169    }
170}
171
172/// Tracks requests for notification and delivers available notifications.
173///
174/// `FrontierNotificator` is meant to manage the delivery of requested notifications in the
175/// presence of inputs that may have outstanding messages to deliver.
176/// The notificator inspects the frontiers, as presented from the outside, for each input.
177/// Requested notifications can be served only once there are no frontier elements less-or-equal
178/// to them, and there are no other pending notification requests less than them. Each will be
179/// less-or-equal to itself, so we want to dodge that corner case.
180///
181/// # Examples
182/// ```
183/// use std::collections::HashMap;
184/// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
185/// use timely::dataflow::operators::generic::operator::Operator;
186/// use timely::dataflow::channels::pact::Pipeline;
187///
188/// timely::execute(timely::Config::thread(), |worker| {
189///     let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
190///         let (in1_handle, in1) = scope.new_input::<Vec<_>>();
191///         let (in2_handle, in2) = scope.new_input::<Vec<_>>();
192///         in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
193///             let mut notificator = FrontierNotificator::default();
194///             let mut stash = HashMap::new();
195///             move |(input1, frontier1), (input2, frontier2), output| {
196///                 input1.for_each_time(|time, data| {
197///                     stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
198///                     notificator.notify_at(time.retain(output.output_index()));
199///                 });
200///                 input2.for_each_time(|time, data| {
201///                     stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
202///                     notificator.notify_at(time.retain(output.output_index()));
203///                 });
204///                 notificator.for_each(&[frontier1, frontier2], |time, _| {
205///                     if let Some(mut vec) = stash.remove(time.time()) {
206///                         output.session(&time).give_iterator(vec.drain(..));
207///                     }
208///                 });
209///             }
210///         })
211///         .container::<Vec<_>>()
212///         .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
213///
214///         (in1_handle, in2_handle)
215///     });
216///
217///     for i in 1..10 {
218///         in1.send(i - 1);
219///         in1.advance_to(i);
220///         in2.send(i - 1);
221///         in2.advance_to(i);
222///     }
223///     in1.close();
224///     in2.close();
225/// }).unwrap();
226/// ```
227#[derive(Debug)]
228pub struct FrontierNotificator<T: Timestamp> {
229    pending: Vec<(Capability<T>, u64)>,
230    available: ::std::collections::BinaryHeap<OrderReversed<T>>,
231}
232
233impl<T: Timestamp> Default for FrontierNotificator<T> {
234    fn default() -> Self {
235        FrontierNotificator {
236            pending: Vec::new(),
237            available: ::std::collections::BinaryHeap::new(),
238        }
239    }
240}
241
242impl<T: Timestamp> FrontierNotificator<T> {
243    /// Allocates a new `FrontierNotificator` with initial capabilities.
244    pub fn from<I: IntoIterator<Item=Capability<T>>>(iter: I) -> Self {
245        FrontierNotificator {
246            pending: iter.into_iter().map(|x| (x,1)).collect(),
247            available: ::std::collections::BinaryHeap::new(),
248        }
249    }
250
251    /// Requests a notification at the time associated with capability `cap`. Takes ownership of
252    /// the capability.
253    ///
254    /// In order to request a notification at future timestamp, obtain a capability for the new
255    /// timestamp first, as shown in the example.
256    ///
257    /// # Examples
258    /// ```
259    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
260    /// use timely::dataflow::operators::generic::operator::Operator;
261    /// use timely::dataflow::channels::pact::Pipeline;
262    ///
263    /// timely::example(|scope| {
264    ///     (0..10).to_stream(scope)
265    ///            .container::<Vec<_>>()
266    ///            .unary_frontier(Pipeline, "example", |_, _| {
267    ///                let mut notificator = FrontierNotificator::default();
268    ///                move |(input, frontier), output| {
269    ///                    input.for_each_time(|cap, data| {
270    ///                        output.session(&cap).give_containers(data);
271    ///                        let time = cap.time().clone() + 1;
272    ///                        notificator.notify_at(cap.delayed(&time, output.output_index()));
273    ///                    });
274    ///                    notificator.for_each(&[frontier], |cap, _| {
275    ///                        println!("done with time: {:?}", cap.time());
276    ///                    });
277    ///                }
278    ///            });
279    /// });
280    /// ```
281    #[inline]
282    pub fn notify_at(&mut self, cap: Capability<T>) {
283        self.pending.push((cap,1));
284    }
285
286    /// Requests a notification at the time associated with capability `cap`.
287    ///
288    /// The method takes list of frontiers from which it determines if the capability is immediately available.
289    /// When used with the same frontier as `make_available`, this method can ensure that notifications are
290    /// non-decreasing. Simply using `notify_at` will only insert new notifications into the list of pending
291    /// notifications, which are only re-examine with calls to `make_available`.
292    #[inline]
293    pub fn notify_at_frontiered<'a>(&mut self, cap: Capability<T>, frontiers: &'a [&'a MutableAntichain<T>]) {
294        if frontiers.iter().all(|f| !f.less_equal(cap.time())) {
295            self.available.push(OrderReversed::new(cap, 1));
296        }
297        else {
298            self.pending.push((cap,1));
299        }
300    }
301
302    /// Enables pending notifications not in advance of any element of `frontiers`.
303    pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) {
304
305        // By invariant, nothing in self.available is greater_equal anything in self.pending.
306        // It should be safe to append any ordered subset of self.pending to self.available,
307        // in that the sequence of capabilities in self.available will remain non-decreasing.
308
309        if !self.pending.is_empty() {
310
311            self.pending.sort_unstable_by(|x,y| x.0.time().cmp(y.0.time()));
312            for i in 0 .. self.pending.len() - 1 {
313                if self.pending[i].0.time() == self.pending[i+1].0.time() {
314                    self.pending[i+1].1 += self.pending[i].1;
315                    self.pending[i].1 = 0;
316                }
317            }
318            self.pending.retain(|x| x.1 > 0);
319
320            for i in 0 .. self.pending.len() {
321                if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) {
322                    // TODO : This clones a capability, whereas we could move it instead.
323                    self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1));
324                    self.pending[i].1 = 0;
325                }
326            }
327            self.pending.retain(|x| x.1 > 0);
328        }
329    }
330
331    /// Returns the next available capability with respect to the supplied frontiers, if one exists,
332    /// and the count of how many instances are found.
333    ///
334    /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
335    /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
336    /// use `for_each`, or (ii) call `make_available` first.
337    #[inline]
338    pub fn next_count<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<(Capability<T>, u64)> {
339        if self.available.is_empty() {
340            self.make_available(frontiers);
341        }
342        self.available.pop().map(|front| {
343            let mut count = front.value;
344            while self.available.peek() == Some(&front) {
345                count += self.available.pop().unwrap().value;
346            }
347            (front.element, count)
348        })
349    }
350
351    /// Returns the next available capability with respect to the supplied frontiers, if one exists.
352    ///
353    /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
354    /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
355    /// use `for_each`, or (ii) call `make_available` first.
356    #[inline]
357    pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
358        self.next_count(frontiers).map(|(cap, _)| cap)
359    }
360
361    /// Repeatedly calls `logic` till exhaustion of the notifications made available by inspecting
362    /// the frontiers.
363    ///
364    /// `logic` receives a capability for `t`, the timestamp being notified.
365    #[inline]
366    pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(&mut self, frontiers: &'a [&'a MutableAntichain<T>], mut logic: F) {
367        self.make_available(frontiers);
368        while let Some(cap) = self.next(frontiers) {
369            logic(cap, self);
370        }
371    }
372
373    /// Creates a notificator session in which delivered notification will be non-decreasing.
374    ///
375    /// This implementation can be emulated with judicious use of `make_available` and `notify_at_frontiered`,
376    /// in the event that `Notificator` provides too restrictive an interface.
377    #[inline]
378    pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Notificator<'a, T> {
379        Notificator::new(frontiers, self)
380    }
381
382    /// Iterates over pending capabilities and their count. The count represents how often a
383    /// capability has been requested.
384    ///
385    /// To make sure all pending capabilities are above the frontier, use `for_each` or exhaust
386    /// `next` to consume all available capabilities.
387    ///
388    /// # Examples
389    /// ```
390    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
391    /// use timely::dataflow::operators::generic::operator::Operator;
392    /// use timely::dataflow::channels::pact::Pipeline;
393    ///
394    /// timely::example(|scope| {
395    ///     (0..10).to_stream(scope)
396    ///            .container::<Vec<_>>()
397    ///            .unary_frontier(Pipeline, "example", |_, _| {
398    ///                let mut notificator = FrontierNotificator::default();
399    ///                move |(input, frontier), output| {
400    ///                    input.for_each_time(|cap, data| {
401    ///                        output.session(&cap).give_containers(data);
402    ///                        let time = cap.time().clone() + 1;
403    ///                        notificator.notify_at(cap.delayed(&time, output.output_index()));
404    ///                        assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
405    ///                    });
406    ///                    notificator.for_each(&[frontier], |cap, _| {
407    ///                        println!("done with time: {:?}", cap.time());
408    ///                    });
409    ///                }
410    ///            });
411    /// });
412    /// ```
413    pub fn pending(&self) -> ::std::slice::Iter<'_, (Capability<T>, u64)> {
414        self.pending.iter()
415    }
416}
417
418#[derive(Debug, PartialEq, Eq)]
419struct OrderReversed<T: Timestamp> {
420    element: Capability<T>,
421    value: u64,
422}
423
424impl<T: Timestamp> OrderReversed<T> {
425    fn new(element: Capability<T>, value: u64) -> Self { OrderReversed { element, value} }
426}
427
428impl<T: Timestamp> PartialOrd for OrderReversed<T> {
429    fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
430        Some(self.cmp(other))
431    }
432}
433impl<T: Timestamp> Ord for OrderReversed<T> {
434    fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
435        other.element.time().cmp(self.element.time())
436    }
437}