pub trait Probe<G: Scope, C: Container> {
// Required methods
fn probe(&self) -> Handle<G::Timestamp>;
fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, C>;
}
Expand description
Monitors progress at a Stream
.
Required Methods§
sourcefn probe(&self) -> Handle<G::Timestamp>
fn probe(&self) -> Handle<G::Timestamp>
Constructs a progress probe which indicates which timestamps have elapsed at the operator.
§Examples
use timely::*;
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Probe, Inspect};
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let (mut input, probe) = worker.dataflow(|scope| {
let (input, stream) = scope.new_input();
let probe = stream.inspect(|x| println!("hello {:?}", x))
.probe();
(input, probe)
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step_while(|| probe.less_than(input.time()));
}
}).unwrap();
sourcefn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, C>
fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, C>
Inserts a progress probe in a stream.
§Examples
use timely::*;
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Probe, Inspect};
use timely::dataflow::operators::probe::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut probe = Handle::new();
let mut input = worker.dataflow(|scope| {
let (input, stream) = scope.new_input();
stream.probe_with(&mut probe)
.inspect(|x| println!("hello {:?}", x));
input
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step_while(|| probe.less_than(input.time()));
}
}).unwrap();