pub struct HandleCore<T: Timestamp, C: Container> { /* private fields */ }
Expand description
A handle to an input Stream
, used to introduce data to a timely dataflow computation.
Implementations§
source§impl<T: Timestamp, D: Container> HandleCore<T, D>
impl<T: Timestamp, D: Container> HandleCore<T, D>
sourcepub fn new() -> Self
pub fn new() -> Self
Allocates a new input handle, from which one can create timely streams.
§Examples
use timely::*;
use timely::dataflow::operators::{Input, Inspect};
use timely::dataflow::operators::input::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});
sourcepub fn to_stream<G>(&mut self, scope: &mut G) -> StreamCore<G, D>
pub fn to_stream<G>(&mut self, scope: &mut G) -> StreamCore<G, D>
Creates an input stream from the handle in the supplied scope.
§Examples
use timely::*;
use timely::dataflow::operators::{Input, Inspect};
use timely::dataflow::operators::input::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::new();
worker.dataflow(|scope| {
input.to_stream(scope)
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});
sourcepub fn send_batch(&mut self, buffer: &mut D)
pub fn send_batch(&mut self, buffer: &mut D)
Sends a batch of records into the corresponding timely dataflow StreamCore, at the current epoch.
This method flushes single elements previously sent with send
, to keep the insertion order.
§Examples
use timely::*;
use timely::dataflow::operators::{Input, InspectCore};
use timely::dataflow::operators::input::HandleCore;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = HandleCore::new();
worker.dataflow(|scope| {
scope.input_from_core(&mut input)
.inspect_container(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send_batch(&mut vec![format!("{}", round)]);
input.advance_to(round + 1);
worker.step();
}
});
sourcepub fn advance_to(&mut self, next: T)
pub fn advance_to(&mut self, next: T)
Advances the current epoch to next
.
This method allows timely dataflow to issue progress notifications as it can now determine that this input can no longer produce data at earlier timestamps.
source§impl<T: Timestamp, D: Data> HandleCore<T, Vec<D>>
impl<T: Timestamp, D: Data> HandleCore<T, Vec<D>>
sourcepub fn send(&mut self, data: D)
pub fn send(&mut self, data: D)
Sends one record into the corresponding timely dataflow Stream
, at the current epoch.
§Examples
use timely::*;
use timely::dataflow::operators::{Input, Inspect};
use timely::dataflow::operators::input::Handle;
// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {
// add an input and base computation off of it
let mut input = Handle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.inspect(|x| println!("hello {:?}", x));
});
// introduce input, advance computation
for round in 0..10 {
input.send(round);
input.advance_to(round + 1);
worker.step();
}
});
Trait Implementations§
Auto Trait Implementations§
impl<T, C> !RefUnwindSafe for HandleCore<T, C>
impl<T, C> !Send for HandleCore<T, C>
impl<T, C> !Sync for HandleCore<T, C>
impl<T, C> Unpin for HandleCore<T, C>
impl<T, C> !UnwindSafe for HandleCore<T, C>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more