pub struct Notificator<'a, T: Timestamp> { /* private fields */ }
Expand description
Tracks requests for notification and delivers available notifications.
A Notificator
represents a dynamic set of notifications and a fixed notification frontier.
One can interact with one by requesting notification with notify_at
, and retrieving notifications
with for_each
and next
. The next notification to be delivered will be the available notification
with the least timestamp, with the implication that the notifications will be non-decreasing as long
as you do not request notifications at times prior to those that have already been delivered.
Notification requests persist across uses of Notificator
, and it may help to think of Notificator
as a notification session. However, idiomatically it seems you mostly want to restrict your usage
to such sessions, which is why this is the main notificator type.
Implementations§
source§impl<'a, T: Timestamp> Notificator<'a, T>
impl<'a, T: Timestamp> Notificator<'a, T>
sourcepub fn new(
frontiers: &'a [&'a MutableAntichain<T>],
inner: &'a mut FrontierNotificator<T>,
logging: &'a Option<Logger>
) -> Self
pub fn new( frontiers: &'a [&'a MutableAntichain<T>], inner: &'a mut FrontierNotificator<T>, logging: &'a Option<Logger> ) -> Self
Allocates a new Notificator
.
This is more commonly accomplished using input.monotonic(frontiers)
.
sourcepub fn frontier(&self, input: usize) -> AntichainRef<'_, T>
pub fn frontier(&self, input: usize) -> AntichainRef<'_, T>
Reveals the elements in the frontier of the indicated input.
sourcepub fn notify_at(&mut self, cap: Capability<T>)
pub fn notify_at(&mut self, cap: Capability<T>)
Requests a notification at the time associated with capability cap
.
In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as show in the example.
Examples
use timely::dataflow::operators::ToStream;
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::example(|scope| {
(0..10).to_stream(scope)
.unary_notify(Pipeline, "example", Some(0), |input, output, notificator| {
input.for_each(|cap, data| {
output.session(&cap).give_vec(&mut data.replace(Vec::new()));
let time = cap.time().clone() + 1;
notificator.notify_at(cap.delayed(&time));
});
notificator.for_each(|cap, count, _| {
println!("done with time: {:?}, requested {} times", cap.time(), count);
assert!(*cap.time() == 0 && count == 2 || count == 1);
});
});
});
sourcepub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<'_, T>)>(
&mut self,
logic: F
)
pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<'_, T>)>( &mut self, logic: F )
Repeatedly calls logic
until exhaustion of the available notifications.
logic
receives a capability for t
, the timestamp being notified and a count
representing how many capabilities were requested for that specific timestamp.
Trait Implementations§
source§impl<'a, T: Timestamp> Iterator for Notificator<'a, T>
impl<'a, T: Timestamp> Iterator for Notificator<'a, T>
source§fn next(&mut self) -> Option<(Capability<T>, u64)>
fn next(&mut self) -> Option<(Capability<T>, u64)>
Retrieve the next available notification.
Returns None
if no notification is available. Returns Some(cap, count)
otherwise:
cap
is a capability for t
, the timestamp being notified and, count
represents
how many notifications (out of those requested) are being delivered for that specific
timestamp.
§type Item = (Capability<T>, u64)
type Item = (Capability<T>, u64)
source§fn next_chunk<const N: usize>(
&mut self
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
fn next_chunk<const N: usize>( &mut self ) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where Self: Sized,
iter_next_chunk
)N
values. Read more1.0.0 · source§fn size_hint(&self) -> (usize, Option<usize>)
fn size_hint(&self) -> (usize, Option<usize>)
1.0.0 · source§fn count(self) -> usizewhere
Self: Sized,
fn count(self) -> usizewhere Self: Sized,
1.0.0 · source§fn last(self) -> Option<Self::Item>where
Self: Sized,
fn last(self) -> Option<Self::Item>where Self: Sized,
source§fn advance_by(&mut self, n: usize) -> Result<(), NonZeroUsize>
fn advance_by(&mut self, n: usize) -> Result<(), NonZeroUsize>
iter_advance_by
)n
elements. Read more1.0.0 · source§fn nth(&mut self, n: usize) -> Option<Self::Item>
fn nth(&mut self, n: usize) -> Option<Self::Item>
n
th element of the iterator. Read more1.28.0 · source§fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
fn step_by(self, step: usize) -> StepBy<Self>where Self: Sized,
1.0.0 · source§fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator<Item = Self::Item>,
fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>where Self: Sized, U: IntoIterator<Item = Self::Item>,
1.0.0 · source§fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where Self: Sized, U: IntoIterator,
source§fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>where
Self: Sized,
G: FnMut() -> Self::Item,
fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>where Self: Sized, G: FnMut() -> Self::Item,
iter_intersperse
)separator
between adjacent items of the original iterator. Read more1.0.0 · source§fn map<B, F>(self, f: F) -> Map<Self, F>where
Self: Sized,
F: FnMut(Self::Item) -> B,
fn map<B, F>(self, f: F) -> Map<Self, F>where Self: Sized, F: FnMut(Self::Item) -> B,
1.21.0 · source§fn for_each<F>(self, f: F)where
Self: Sized,
F: FnMut(Self::Item),
fn for_each<F>(self, f: F)where Self: Sized, F: FnMut(Self::Item),
1.0.0 · source§fn filter<P>(self, predicate: P) -> Filter<Self, P>where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
fn filter<P>(self, predicate: P) -> Filter<Self, P>where Self: Sized, P: FnMut(&Self::Item) -> bool,
1.0.0 · source§fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>where Self: Sized, F: FnMut(Self::Item) -> Option<B>,
1.0.0 · source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where Self: Sized,
1.0.0 · source§fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>where Self: Sized, P: FnMut(&Self::Item) -> bool,
1.0.0 · source§fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>where Self: Sized, P: FnMut(&Self::Item) -> bool,
1.57.0 · source§fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>where
Self: Sized,
P: FnMut(Self::Item) -> Option<B>,
fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>where Self: Sized, P: FnMut(Self::Item) -> Option<B>,
1.0.0 · source§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where Self: Sized,
n
elements. Read more1.0.0 · source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where Self: Sized,
n
elements, or fewer
if the underlying iterator ends sooner. Read more1.0.0 · source§fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>where
Self: Sized,
F: FnMut(&mut St, Self::Item) -> Option<B>,
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>where Self: Sized, F: FnMut(&mut St, Self::Item) -> Option<B>,
1.0.0 · source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>where
Self: Sized,
U: IntoIterator,
F: FnMut(Self::Item) -> U,
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>where Self: Sized, U: IntoIterator, F: FnMut(Self::Item) -> U,
1.0.0 · source§fn inspect<F>(self, f: F) -> Inspect<Self, F>where
Self: Sized,
F: FnMut(&Self::Item),
fn inspect<F>(self, f: F) -> Inspect<Self, F>where Self: Sized, F: FnMut(&Self::Item),
1.0.0 · source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere Self: Sized,
1.0.0 · source§fn collect<B>(self) -> Bwhere
B: FromIterator<Self::Item>,
Self: Sized,
fn collect<B>(self) -> Bwhere B: FromIterator<Self::Item>, Self: Sized,
source§fn collect_into<E>(self, collection: &mut E) -> &mut Ewhere
E: Extend<Self::Item>,
Self: Sized,
fn collect_into<E>(self, collection: &mut E) -> &mut Ewhere E: Extend<Self::Item>, Self: Sized,
iter_collect_into
)1.0.0 · source§fn partition<B, F>(self, f: F) -> (B, B)where
Self: Sized,
B: Default + Extend<Self::Item>,
F: FnMut(&Self::Item) -> bool,
fn partition<B, F>(self, f: F) -> (B, B)where Self: Sized, B: Default + Extend<Self::Item>, F: FnMut(&Self::Item) -> bool,
source§fn is_partitioned<P>(self, predicate: P) -> boolwhere
Self: Sized,
P: FnMut(Self::Item) -> bool,
fn is_partitioned<P>(self, predicate: P) -> boolwhere Self: Sized, P: FnMut(Self::Item) -> bool,
iter_is_partitioned
)true
precede all those that return false
. Read more1.27.0 · source§fn try_fold<B, F, R>(&mut self, init: B, f: F) -> Rwhere
Self: Sized,
F: FnMut(B, Self::Item) -> R,
R: Try<Output = B>,
fn try_fold<B, F, R>(&mut self, init: B, f: F) -> Rwhere Self: Sized, F: FnMut(B, Self::Item) -> R, R: Try<Output = B>,
1.27.0 · source§fn try_for_each<F, R>(&mut self, f: F) -> Rwhere
Self: Sized,
F: FnMut(Self::Item) -> R,
R: Try<Output = ()>,
fn try_for_each<F, R>(&mut self, f: F) -> Rwhere Self: Sized, F: FnMut(Self::Item) -> R, R: Try<Output = ()>,
1.0.0 · source§fn fold<B, F>(self, init: B, f: F) -> Bwhere
Self: Sized,
F: FnMut(B, Self::Item) -> B,
fn fold<B, F>(self, init: B, f: F) -> Bwhere Self: Sized, F: FnMut(B, Self::Item) -> B,
1.51.0 · source§fn reduce<F>(self, f: F) -> Option<Self::Item>where
Self: Sized,
F: FnMut(Self::Item, Self::Item) -> Self::Item,
fn reduce<F>(self, f: F) -> Option<Self::Item>where Self: Sized, F: FnMut(Self::Item, Self::Item) -> Self::Item,
source§fn try_reduce<F, R>(
&mut self,
f: F
) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryTypewhere
Self: Sized,
F: FnMut(Self::Item, Self::Item) -> R,
R: Try<Output = Self::Item>,
<R as Try>::Residual: Residual<Option<Self::Item>>,
fn try_reduce<F, R>( &mut self, f: F ) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryTypewhere Self: Sized, F: FnMut(Self::Item, Self::Item) -> R, R: Try<Output = Self::Item>, <R as Try>::Residual: Residual<Option<Self::Item>>,
iterator_try_reduce
)1.0.0 · source§fn all<F>(&mut self, f: F) -> boolwhere
Self: Sized,
F: FnMut(Self::Item) -> bool,
fn all<F>(&mut self, f: F) -> boolwhere Self: Sized, F: FnMut(Self::Item) -> bool,
1.0.0 · source§fn any<F>(&mut self, f: F) -> boolwhere
Self: Sized,
F: FnMut(Self::Item) -> bool,
fn any<F>(&mut self, f: F) -> boolwhere Self: Sized, F: FnMut(Self::Item) -> bool,
1.0.0 · source§fn find<P>(&mut self, predicate: P) -> Option<Self::Item>where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
fn find<P>(&mut self, predicate: P) -> Option<Self::Item>where Self: Sized, P: FnMut(&Self::Item) -> bool,
1.30.0 · source§fn find_map<B, F>(&mut self, f: F) -> Option<B>where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
fn find_map<B, F>(&mut self, f: F) -> Option<B>where Self: Sized, F: FnMut(Self::Item) -> Option<B>,
source§fn try_find<F, R>(
&mut self,
f: F
) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryTypewhere
Self: Sized,
F: FnMut(&Self::Item) -> R,
R: Try<Output = bool>,
<R as Try>::Residual: Residual<Option<Self::Item>>,
fn try_find<F, R>( &mut self, f: F ) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryTypewhere Self: Sized, F: FnMut(&Self::Item) -> R, R: Try<Output = bool>, <R as Try>::Residual: Residual<Option<Self::Item>>,
try_find
)1.0.0 · source§fn position<P>(&mut self, predicate: P) -> Option<usize>where
Self: Sized,
P: FnMut(Self::Item) -> bool,
fn position<P>(&mut self, predicate: P) -> Option<usize>where Self: Sized, P: FnMut(Self::Item) -> bool,
1.6.0 · source§fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>where
B: Ord,
Self: Sized,
F: FnMut(&Self::Item) -> B,
fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>where B: Ord, Self: Sized, F: FnMut(&Self::Item) -> B,
1.15.0 · source§fn max_by<F>(self, compare: F) -> Option<Self::Item>where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
fn max_by<F>(self, compare: F) -> Option<Self::Item>where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1.6.0 · source§fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>where
B: Ord,
Self: Sized,
F: FnMut(&Self::Item) -> B,
fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>where B: Ord, Self: Sized, F: FnMut(&Self::Item) -> B,
1.15.0 · source§fn min_by<F>(self, compare: F) -> Option<Self::Item>where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
fn min_by<F>(self, compare: F) -> Option<Self::Item>where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering,
1.0.0 · source§fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)where
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
Self: Sized + Iterator<Item = (A, B)>,
fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Iterator<Item = (A, B)>,
1.36.0 · source§fn copied<'a, T>(self) -> Copied<Self>where
T: 'a + Copy,
Self: Sized + Iterator<Item = &'a T>,
fn copied<'a, T>(self) -> Copied<Self>where T: 'a + Copy, Self: Sized + Iterator<Item = &'a T>,
1.0.0 · source§fn cloned<'a, T>(self) -> Cloned<Self>where
T: 'a + Clone,
Self: Sized + Iterator<Item = &'a T>,
fn cloned<'a, T>(self) -> Cloned<Self>where T: 'a + Clone, Self: Sized + Iterator<Item = &'a T>,
source§fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where Self: Sized,
iter_array_chunks
)N
elements of the iterator at a time. Read more1.11.0 · source§fn sum<S>(self) -> Swhere
Self: Sized,
S: Sum<Self::Item>,
fn sum<S>(self) -> Swhere Self: Sized, S: Sum<Self::Item>,
1.11.0 · source§fn product<P>(self) -> Pwhere
Self: Sized,
P: Product<Self::Item>,
fn product<P>(self) -> Pwhere Self: Sized, P: Product<Self::Item>,
source§fn cmp_by<I, F>(self, other: I, cmp: F) -> Orderingwhere
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Ordering,
fn cmp_by<I, F>(self, other: I, cmp: F) -> Orderingwhere Self: Sized, I: IntoIterator, F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Ordering,
iter_order_by
)Iterator
with those
of another with respect to the specified comparison function. Read more1.5.0 · source§fn partial_cmp<I>(self, other: I) -> Option<Ordering>where
I: IntoIterator,
Self::Item: PartialOrd<<I as IntoIterator>::Item>,
Self: Sized,
fn partial_cmp<I>(self, other: I) -> Option<Ordering>where I: IntoIterator, Self::Item: PartialOrd<<I as IntoIterator>::Item>, Self: Sized,
PartialOrd
elements of
this Iterator
with those of another. The comparison works like short-circuit
evaluation, returning a result without comparing the remaining elements.
As soon as an order can be determined, the evaluation stops and a result is returned. Read moresource§fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where Self: Sized, I: IntoIterator, F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
iter_order_by
)Iterator
with those
of another with respect to the specified comparison function. Read more1.5.0 · source§fn eq<I>(self, other: I) -> boolwhere
I: IntoIterator,
Self::Item: PartialEq<<I as IntoIterator>::Item>,
Self: Sized,
fn eq<I>(self, other: I) -> boolwhere I: IntoIterator, Self::Item: PartialEq<<I as IntoIterator>::Item>, Self: Sized,
source§fn eq_by<I, F>(self, other: I, eq: F) -> boolwhere
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> bool,
fn eq_by<I, F>(self, other: I, eq: F) -> boolwhere Self: Sized, I: IntoIterator, F: FnMut(Self::Item, <I as IntoIterator>::Item) -> bool,
iter_order_by
)1.5.0 · source§fn ne<I>(self, other: I) -> boolwhere
I: IntoIterator,
Self::Item: PartialEq<<I as IntoIterator>::Item>,
Self: Sized,
fn ne<I>(self, other: I) -> boolwhere I: IntoIterator, Self::Item: PartialEq<<I as IntoIterator>::Item>, Self: Sized,
1.5.0 · source§fn lt<I>(self, other: I) -> boolwhere
I: IntoIterator,
Self::Item: PartialOrd<<I as IntoIterator>::Item>,
Self: Sized,
fn lt<I>(self, other: I) -> boolwhere I: IntoIterator, Self::Item: PartialOrd<<I as IntoIterator>::Item>, Self: Sized,
Iterator
are lexicographically
less than those of another. Read more1.5.0 · source§fn le<I>(self, other: I) -> boolwhere
I: IntoIterator,
Self::Item: PartialOrd<<I as IntoIterator>::Item>,
Self: Sized,
fn le<I>(self, other: I) -> boolwhere I: IntoIterator, Self::Item: PartialOrd<<I as IntoIterator>::Item>, Self: Sized,
Iterator
are lexicographically
less or equal to those of another. Read more1.5.0 · source§fn gt<I>(self, other: I) -> boolwhere
I: IntoIterator,
Self::Item: PartialOrd<<I as IntoIterator>::Item>,
Self: Sized,
fn gt<I>(self, other: I) -> boolwhere I: IntoIterator, Self::Item: PartialOrd<<I as IntoIterator>::Item>, Self: Sized,
Iterator
are lexicographically
greater than those of another. Read more1.5.0 · source§fn ge<I>(self, other: I) -> boolwhere
I: IntoIterator,
Self::Item: PartialOrd<<I as IntoIterator>::Item>,
Self: Sized,
fn ge<I>(self, other: I) -> boolwhere I: IntoIterator, Self::Item: PartialOrd<<I as IntoIterator>::Item>, Self: Sized,
Iterator
are lexicographically
greater than or equal to those of another. Read moresource§fn is_sorted_by<F>(self, compare: F) -> boolwhere
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Option<Ordering>,
fn is_sorted_by<F>(self, compare: F) -> boolwhere Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Option<Ordering>,
is_sorted
)source§fn is_sorted_by_key<F, K>(self, f: F) -> boolwhere
Self: Sized,
F: FnMut(Self::Item) -> K,
K: PartialOrd<K>,
fn is_sorted_by_key<F, K>(self, f: F) -> boolwhere Self: Sized, F: FnMut(Self::Item) -> K, K: PartialOrd<K>,
is_sorted
)