pub struct FrontierNotificator<T: Timestamp> { /* private fields */ }
Expand description
Tracks requests for notification and delivers available notifications.
FrontierNotificator
is meant to manage the delivery of requested notifications in the
presence of inputs that may have outstanding messages to deliver.
The notificator inspects the frontiers, as presented from the outside, for each input.
Requested notifications can be served only once there are no frontier elements less-or-equal
to them, and there are no other pending notification requests less than them. Each will be
less-or-equal to itself, so we want to dodge that corner case.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::execute(timely::Config::thread(), |worker| {
let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
let (in1_handle, in1) = scope.new_input();
let (in2_handle, in2) = scope.new_input();
in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
let mut notificator = FrontierNotificator::default();
let mut stash = HashMap::new();
move |input1, input2, output| {
while let Some((time, data)) = input1.next() {
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
notificator.notify_at(time.retain());
}
while let Some((time, data)) = input2.next() {
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
notificator.notify_at(time.retain());
}
notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
if let Some(mut vec) = stash.remove(time.time()) {
output.session(&time).give_iterator(vec.drain(..));
}
});
}
})
.container::<Vec<_>>()
.inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
(in1_handle, in2_handle)
});
for i in 1..10 {
in1.send(i - 1);
in1.advance_to(i);
in2.send(i - 1);
in2.advance_to(i);
}
in1.close();
in2.close();
}).unwrap();
Implementations§
source§impl<T: Timestamp> FrontierNotificator<T>
impl<T: Timestamp> FrontierNotificator<T>
sourcepub fn from<I: IntoIterator<Item = Capability<T>>>(iter: I) -> Self
pub fn from<I: IntoIterator<Item = Capability<T>>>(iter: I) -> Self
Allocates a new FrontierNotificator
with initial capabilities.
sourcepub fn notify_at(&mut self, cap: Capability<T>)
pub fn notify_at(&mut self, cap: Capability<T>)
Requests a notification at the time associated with capability cap
. Takes ownership of
the capability.
In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as shown in the example.
§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::example(|scope| {
(0..10).to_stream(scope)
.unary_frontier(Pipeline, "example", |_, _| {
let mut notificator = FrontierNotificator::default();
move |input, output| {
input.for_each(|cap, data| {
output.session(&cap).give_container(data);
let time = cap.time().clone() + 1;
notificator.notify_at(cap.delayed(&time));
});
notificator.for_each(&[input.frontier()], |cap, _| {
println!("done with time: {:?}", cap.time());
});
}
});
});
sourcepub fn notify_at_frontiered<'a>(
&mut self,
cap: Capability<T>,
frontiers: &'a [&'a MutableAntichain<T>],
)
pub fn notify_at_frontiered<'a>( &mut self, cap: Capability<T>, frontiers: &'a [&'a MutableAntichain<T>], )
Requests a notification at the time associated with capability cap
.
The method takes list of frontiers from which it determines if the capability is immediately available.
When used with the same frontier as make_available
, this method can ensure that notifications are
non-decreasing. Simply using notify_at
will only insert new notifications into the list of pending
notifications, which are only re-examine with calls to make_available
.
sourcepub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>])
pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>])
Enables pending notifications not in advance of any element of frontiers
.
sourcepub fn next_count<'a>(
&mut self,
frontiers: &'a [&'a MutableAntichain<T>],
) -> Option<(Capability<T>, u64)>
pub fn next_count<'a>( &mut self, frontiers: &'a [&'a MutableAntichain<T>], ) -> Option<(Capability<T>, u64)>
Returns the next available capability with respect to the supplied frontiers, if one exists, and the count of how many instances are found.
In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
use for_each
, or (ii) call make_available
first.
sourcepub fn next<'a>(
&mut self,
frontiers: &'a [&'a MutableAntichain<T>],
) -> Option<Capability<T>>
pub fn next<'a>( &mut self, frontiers: &'a [&'a MutableAntichain<T>], ) -> Option<Capability<T>>
Returns the next available capability with respect to the supplied frontiers, if one exists.
In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
use for_each
, or (ii) call make_available
first.
sourcepub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(
&mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logic: F,
)
pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>( &mut self, frontiers: &'a [&'a MutableAntichain<T>], logic: F, )
Repeatedly calls logic
till exhaustion of the notifications made available by inspecting
the frontiers.
logic
receives a capability for t
, the timestamp being notified.
sourcepub fn monotonic<'a>(
&'a mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logging: &'a Option<Logger>,
) -> Notificator<'a, T> ⓘ
pub fn monotonic<'a>( &'a mut self, frontiers: &'a [&'a MutableAntichain<T>], logging: &'a Option<Logger>, ) -> Notificator<'a, T> ⓘ
Creates a notificator session in which delivered notification will be non-decreasing.
This implementation can be emulated with judicious use of make_available
and notify_at_frontiered
,
in the event that Notificator
provides too restrictive an interface.
sourcepub fn pending(&self) -> Iter<'_, (Capability<T>, u64)>
pub fn pending(&self) -> Iter<'_, (Capability<T>, u64)>
Iterates over pending capabilities and their count. The count represents how often a capability has been requested.
To make sure all pending capabilities are above the frontier, use for_each
or exhaust
next
to consume all available capabilities.
§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::example(|scope| {
(0..10).to_stream(scope)
.unary_frontier(Pipeline, "example", |_, _| {
let mut notificator = FrontierNotificator::default();
move |input, output| {
input.for_each(|cap, data| {
output.session(&cap).give_container(data);
let time = cap.time().clone() + 1;
notificator.notify_at(cap.delayed(&time));
assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
});
notificator.for_each(&[input.frontier()], |cap, _| {
println!("done with time: {:?}", cap.time());
});
}
});
});