pub trait Probe<G: Scope, D: Container> {
    // Required methods
    fn probe(&self) -> Handle<G::Timestamp>;
    fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, D>;
}
Expand description

Monitors progress at a Stream.

Required Methods§

source

fn probe(&self) -> Handle<G::Timestamp>

Constructs a progress probe which indicates which timestamps have elapsed at the operator.

§Examples
use timely::*;
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Probe, Inspect};

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let (mut input, probe) = worker.dataflow(|scope| {
        let (input, stream) = scope.new_input();
        let probe = stream.inspect(|x| println!("hello {:?}", x))
                          .probe();
        (input, probe)
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step_while(|| probe.less_than(input.time()));
    }
}).unwrap();
source

fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, D>

Inserts a progress probe in a stream.

§Examples
use timely::*;
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Probe, Inspect};
use timely::dataflow::operators::probe::Handle;

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut probe = Handle::new();
    let mut input = worker.dataflow(|scope| {
        let (input, stream) = scope.new_input();
        stream.probe_with(&mut probe)
              .inspect(|x| println!("hello {:?}", x));

        input
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step_while(|| probe.less_than(input.time()));
    }
}).unwrap();

Implementors§

source§

impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D>