Struct timely::dataflow::operators::generic::FrontierNotificator

source ·
pub struct FrontierNotificator<T: Timestamp> { /* private fields */ }
Expand description

Tracks requests for notification and delivers available notifications.

FrontierNotificator is meant to manage the delivery of requested notifications in the presence of inputs that may have outstanding messages to deliver. The notificator inspects the frontiers, as presented from the outside, for each input. Requested notifications can be served only once there are no frontier elements less-or-equal to them, and there are no other pending notification requests less than them. Each will be less-or-equal to itself, so we want to dodge that corner case.

§Examples

use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::execute(timely::Config::thread(), |worker| {
    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
        let (in1_handle, in1) = scope.new_input();
        let (in2_handle, in2) = scope.new_input();
        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
            let mut notificator = FrontierNotificator::new();
            let mut stash = HashMap::new();
            let mut vector1 = Vec::new();
            let mut vector2 = Vec::new();
            move |input1, input2, output| {
                while let Some((time, data)) = input1.next() {
                    data.swap(&mut vector1);
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..));
                    notificator.notify_at(time.retain());
                }
                while let Some((time, data)) = input2.next() {
                    data.swap(&mut vector2);
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..));
                    notificator.notify_at(time.retain());
                }
                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
                    if let Some(mut vec) = stash.remove(time.time()) {
                        output.session(&time).give_iterator(vec.drain(..));
                    }
                });
            }
        })
        .container::<Vec<_>>()
        .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));

        (in1_handle, in2_handle)
    });

    for i in 1..10 {
        in1.send(i - 1);
        in1.advance_to(i);
        in2.send(i - 1);
        in2.advance_to(i);
    }
    in1.close();
    in2.close();
}).unwrap();

Implementations§

source§

impl<T: Timestamp> FrontierNotificator<T>

source

pub fn new() -> Self

Allocates a new FrontierNotificator.

source

pub fn from<I: IntoIterator<Item = Capability<T>>>(iter: I) -> Self

Allocates a new FrontierNotificator with initial capabilities.

source

pub fn notify_at(&mut self, cap: Capability<T>)

Requests a notification at the time associated with capability cap. Takes ownership of the capability.

In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as shown in the example.

§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .unary_frontier(Pipeline, "example", |_, _| {
               let mut notificator = FrontierNotificator::new();
               move |input, output| {
                   input.for_each(|cap, data| {
                       output.session(&cap).give_vec(&mut data.replace(Vec::new()));
                       let time = cap.time().clone() + 1;
                       notificator.notify_at(cap.delayed(&time));
                   });
                   notificator.for_each(&[input.frontier()], |cap, _| {
                       println!("done with time: {:?}", cap.time());
                   });
               }
           });
});
source

pub fn notify_at_frontiered<'a>( &mut self, cap: Capability<T>, frontiers: &'a [&'a MutableAntichain<T>] )

Requests a notification at the time associated with capability cap.

The method takes list of frontiers from which it determines if the capability is immediately available. When used with the same frontier as make_available, this method can ensure that notifications are non-decreasing. Simply using notify_at will only insert new notifications into the list of pending notifications, which are only re-examine with calls to make_available.

source

pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>])

Enables pending notifications not in advance of any element of frontiers.

source

pub fn next_count<'a>( &mut self, frontiers: &'a [&'a MutableAntichain<T>] ) -> Option<(Capability<T>, u64)>

Returns the next available capability with respect to the supplied frontiers, if one exists, and the count of how many instances are found.

In the interest of efficiency, this method may yield capabilities in decreasing order, in certain circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i) use for_each, or (ii) call make_available first.

source

pub fn next<'a>( &mut self, frontiers: &'a [&'a MutableAntichain<T>] ) -> Option<Capability<T>>

Returns the next available capability with respect to the supplied frontiers, if one exists.

In the interest of efficiency, this method may yield capabilities in decreasing order, in certain circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i) use for_each, or (ii) call make_available first.

source

pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>( &mut self, frontiers: &'a [&'a MutableAntichain<T>], logic: F )

Repeatedly calls logic till exhaustion of the notifications made available by inspecting the frontiers.

logic receives a capability for t, the timestamp being notified.

source

pub fn monotonic<'a>( &'a mut self, frontiers: &'a [&'a MutableAntichain<T>], logging: &'a Option<Logger> ) -> Notificator<'a, T>

Creates a notificator session in which delivered notification will be non-decreasing.

This implementation can be emulated with judicious use of make_available and notify_at_frontiered, in the event that Notificator provides too restrictive an interface.

source

pub fn pending(&self) -> Iter<'_, (Capability<T>, u64)>

Iterates over pending capabilities and their count. The count represents how often a capability has been requested.

To make sure all pending capabilities are above the frontier, use for_each or exhaust next to consume all available capabilities.

§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .unary_frontier(Pipeline, "example", |_, _| {
               let mut notificator = FrontierNotificator::new();
               move |input, output| {
                   input.for_each(|cap, data| {
                       output.session(&cap).give_vec(&mut data.replace(Vec::new()));
                       let time = cap.time().clone() + 1;
                       notificator.notify_at(cap.delayed(&time));
                       assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
                   });
                   notificator.for_each(&[input.frontier()], |cap, _| {
                       println!("done with time: {:?}", cap.time());
                   });
               }
           });
});

Trait Implementations§

source§

impl<T: Debug + Timestamp> Debug for FrontierNotificator<T>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
where R: Region<Index = (usize, usize)>, O: OffsetContainer<usize>, T: CopyOnto<R>,

source§

fn copy_onto( self, target: &mut ConsecutiveOffsetPairs<R, O> ) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index

Copy self into the target container, returning an index that allows to look up the corresponding read item.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<R, T> PushInto<FlatStack<R>> for T
where R: Region + Clone + 'static, T: CopyOnto<R>,

source§

fn push_into(self, target: &mut FlatStack<R>)

Push self into the target container.
source§

impl<T> PushInto<Vec<T>> for T

source§

fn push_into(self, target: &mut Vec<T>)

Push self into the target container.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.