pub struct Handle<T: Timestamp, CB: ContainerBuilder> { /* private fields */ }
Expand description
A handle to an input StreamCore
, used to introduce data to a timely dataflow computation.
Implementations§
source§impl<T: Timestamp, C: Container + Data> Handle<T, CapacityContainerBuilder<C>>
impl<T: Timestamp, C: Container + Data> Handle<T, CapacityContainerBuilder<C>>
sourcepub fn new() -> Self
pub fn new() -> Self
Allocates a new input handle, from which one can create timely streams.
§Examples
use timely::*;
use timely::dataflow::operators::core::{Input, Inspect};
use timely::dataflow::operators::core::input::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});
source§impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB>
impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB>
sourcepub fn new_with_builder() -> Self
pub fn new_with_builder() -> Self
Allocates a new input handle, from which one can create timely streams.
§Examples
use timely::*;
use timely::dataflow::operators::core::{Input, Inspect};
use timely::dataflow::operators::core::input::Handle;
use timely_container::CapacityContainerBuilder;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::<_, CapacityContainerBuilder<_>>::new_with_builder();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});
sourcepub fn to_stream<G>(&mut self, scope: &mut G) -> StreamCore<G, CB::Container>where
T: TotalOrder,
G: Scope<Timestamp = T>,
pub fn to_stream<G>(&mut self, scope: &mut G) -> StreamCore<G, CB::Container>where
T: TotalOrder,
G: Scope<Timestamp = T>,
Creates an input stream from the handle in the supplied scope.
§Examples
use timely::*;
use timely::dataflow::operators::core::{Input, Inspect};
use timely::dataflow::operators::core::input::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::new();
worker.dataflow(|scope| {
input.to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});
sourcepub fn send_batch(&mut self, buffer: &mut CB::Container)
pub fn send_batch(&mut self, buffer: &mut CB::Container)
Sends a batch of records into the corresponding timely dataflow StreamCore, at the current epoch.
This method flushes single elements previously sent with send
, to keep the insertion order.
§Examples
use timely::*;
use timely::dataflow::operators::core::{Input, InspectCore};
use timely::dataflow::operators::core::input::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.inspect_container(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send_batch(&mut vec![format!("{}", round)]);
input.advance_to(round + 1);
worker.step();
}
});
sourcepub fn advance_to(&mut self, next: T)
pub fn advance_to(&mut self, next: T)
Advances the current epoch to next
.
This method allows timely dataflow to issue progress notifications as it can now determine that this input can no longer produce data at earlier timestamps.
source§impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB>
impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB>
sourcepub fn send<D>(&mut self, data: D)where
CB: PushInto<D>,
pub fn send<D>(&mut self, data: D)where
CB: PushInto<D>,
Sends one record into the corresponding timely dataflow Stream
, at the current epoch.
§Examples
use timely::*;
use timely::dataflow::operators::core::{Input, Inspect};
use timely::dataflow::operators::core::input::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.container::<Vec<_>>()
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});