1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
//! Operators acting on timestamps to logically delay records
use std::collections::HashMap;
use crate::Data;
use crate::order::{PartialOrder, TotalOrder};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
/// Methods to advance the timestamps of records or batches of records.
pub trait Delay<G: Scope, D: Data> {
/// Advances the timestamp of records using a supplied function.
///
/// The function *must* advance the timestamp; the operator will test that the
/// new timestamp is greater or equal to the old timestamp, and will assert if
/// it is not.
///
/// # Examples
///
/// The following example takes the sequence `0..10` at time `0`
/// and delays each element `i` to time `i`.
///
/// ```
/// use timely::dataflow::operators::{ToStream, Delay, Operator};
/// use timely::dataflow::channels::pact::Pipeline;
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .delay(|data, time| *data)
/// .sink(Pipeline, "example", |input| {
/// input.for_each(|time, data| {
/// println!("data at time: {:?}", time);
/// });
/// });
/// });
/// ```
fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
/// Advances the timestamp of records using a supplied function.
///
/// This method is a specialization of `delay` for when the timestamp is totally
/// ordered. In this case, we can use a priority queue rather than an unsorted
/// list to manage the potentially available timestamps.
///
/// # Examples
///
/// The following example takes the sequence `0..10` at time `0`
/// and delays each element `i` to time `i`.
///
/// ```
/// use timely::dataflow::operators::{ToStream, Delay, Operator};
/// use timely::dataflow::channels::pact::Pipeline;
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .delay(|data, time| *data)
/// .sink(Pipeline, "example", |input| {
/// input.for_each(|time, data| {
/// println!("data at time: {:?}", time);
/// });
/// });
/// });
/// ```
fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
where G::Timestamp: TotalOrder;
/// Advances the timestamp of batches of records using a supplied function.
///
/// The operator will test that the new timestamp is greater or equal to the
/// old timestamp, and will assert if it is not. The batch version does not
/// consult the data, and may only view the timestamp itself.
///
/// # Examples
///
/// The following example takes the sequence `0..10` at time `0`
/// and delays each batch (there is just one) to time `1`.
///
/// ```
/// use timely::dataflow::operators::{ToStream, Delay, Operator};
/// use timely::dataflow::channels::pact::Pipeline;
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .delay_batch(|time| time + 1)
/// .sink(Pipeline, "example", |input| {
/// input.for_each(|time, data| {
/// println!("data at time: {:?}", time);
/// });
/// });
/// });
/// ```
fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
}
impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> {
fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
let mut elements = HashMap::new();
self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
input.for_each(|time, data| {
for datum in data.drain(..) {
let new_time = func(&datum, &time);
assert!(time.time().less_equal(&new_time));
elements.entry(new_time.clone())
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
.push(datum);
}
});
// for each available notification, send corresponding set
notificator.for_each(|time,_,_| {
if let Some(mut data) = elements.remove(&time) {
output.session(&time).give_iterator(data.drain(..));
}
});
})
}
fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
where G::Timestamp: TotalOrder
{
self.delay(func)
}
fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
let mut elements = HashMap::new();
self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
input.for_each(|time, data| {
let new_time = func(&time);
assert!(time.time().less_equal(&new_time));
elements.entry(new_time.clone())
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
.push(std::mem::take(data));
});
// for each available notification, send corresponding set
notificator.for_each(|time,_,_| {
if let Some(mut datas) = elements.remove(&time) {
for mut data in datas.drain(..) {
output.session(&time).give_container(&mut data);
}
}
});
})
}
}