pub trait UnorderedInput<G: Scope> {
// Required method
fn new_unordered_input<CB: ContainerBuilder>(
&mut self,
) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>);
}
Expand description
Create a new Stream
and Handle
through which to supply input.
Required Methods§
Sourcefn new_unordered_input<CB: ContainerBuilder>(
&mut self,
) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>)
fn new_unordered_input<CB: ContainerBuilder>( &mut self, ) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>)
Create a new capability-based StreamCore and UnorderedHandle through which to supply input. This input supports multiple open epochs (timestamps) at the same time.
The new_unordered_input_core
method returns ((HandleCore, Capability), StreamCore)
where the StreamCore
can be used
immediately for timely dataflow construction, HandleCore
and Capability
are later used to introduce
data into the timely dataflow computation.
The Capability
returned is for the default value of the timestamp type in use. The
capability can be dropped to inform the system that the input has advanced beyond the
capability’s timestamp. To retain the ability to send, a new capability at a later timestamp
should be obtained first, via the delayed
function for Capability
.
To communicate the end-of-input drop all available capabilities.
§Examples
use std::sync::{Arc, Mutex};
use timely::*;
use timely::dataflow::operators::{capture::Extract, Capture};
use timely::dataflow::operators::core::{UnorderedInput};
use timely::dataflow::Stream;
// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));
timely::execute(Config::thread(), move |worker| {
// this is only to validate the output.
let send = send.lock().unwrap().clone();
// create and capture the unordered input.
let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
let (input, stream) = scope.new_unordered_input();
stream
.container::<Vec<_>>()
.capture_into(send);
input
});
// feed values 0..10 at times 0..10.
for round in 0..10 {
input.session(cap.clone()).give(round);
cap = cap.delayed(&(round + 1));
worker.step();
}
}).unwrap();
let extract = recv.extract();
for i in 0..10 {
assert_eq!(extract[i], (i, vec![i]));
}
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.