Trait timely::dataflow::operators::unordered_input::UnorderedInput
source · pub trait UnorderedInput<G: Scope> {
// Required method
fn new_unordered_input<D: Data>(
&mut self,
) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>);
}
Expand description
Create a new Stream
and Handle
through which to supply input.
Required Methods§
sourcefn new_unordered_input<D: Data>(
&mut self,
) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>)
fn new_unordered_input<D: Data>( &mut self, ) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>)
Create a new capability-based Stream
and Handle
through which to supply input. This
input supports multiple open epochs (timestamps) at the same time.
The new_unordered_input
method returns ((Handle, Capability), Stream)
where the Stream
can be used
immediately for timely dataflow construction, Handle
and Capability
are later used to introduce
data into the timely dataflow computation.
The Capability
returned is for the default value of the timestamp type in use. The
capability can be dropped to inform the system that the input has advanced beyond the
capability’s timestamp. To retain the ability to send, a new capability at a later timestamp
should be obtained first, via the delayed
function for Capability
.
To communicate the end-of-input drop all available capabilities.
§Examples
use std::sync::{Arc, Mutex};
use timely::*;
use timely::dataflow::operators::*;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::Stream;
// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));
timely::execute(Config::thread(), move |worker| {
// this is only to validate the output.
let send = send.lock().unwrap().clone();
// create and capture the unordered input.
let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
let (input, stream) = scope.new_unordered_input();
stream.capture_into(send);
input
});
// feed values 0..10 at times 0..10.
for round in 0..10 {
input.session(cap.clone()).give(round);
cap = cap.delayed(&(round + 1));
worker.step();
}
}).unwrap();
let extract = recv.extract();
for i in 0..10 {
assert_eq!(extract[i], (i, vec![i]));
}