pub struct Worker<A: Allocate> { /* private fields */ }
Expand description
A Worker
is the entry point to a timely dataflow computation. It wraps a Allocate
,
and has a list of dataflows that it manages.
Implementations§
source§impl<A: Allocate> Worker<A>
impl<A: Allocate> Worker<A>
sourcepub fn new(config: Config, c: A) -> Worker<A>
pub fn new(config: Config, c: A) -> Worker<A>
Allocates a new Worker
bound to a channel allocator.
sourcepub fn step(&mut self) -> bool
pub fn step(&mut self) -> bool
Performs one step of the computation.
A step gives each dataflow operator a chance to run, and is the main way to ensure that a computation proceeds.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use timely::dataflow::operators::{ToStream, Inspect};
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.inspect(|x| println!("{:?}", x));
});
worker.step();
});
sourcepub fn step_or_park(&mut self, duration: Option<Duration>) -> bool
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool
Performs one step of the computation.
A step gives each dataflow operator a chance to run, and is the main way to ensure that a computation proceeds.
This method takes an optional timeout and may park the thread until
there is work to perform or until this timeout expires. A value of
None
allows the worker to park indefinitely, whereas a value of
Some(Duration::new(0, 0))
will return without parking the thread.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use std::time::Duration;
use timely::dataflow::operators::{ToStream, Inspect};
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.inspect(|x| println!("{:?}", x));
});
worker.step_or_park(Some(Duration::from_secs(1)));
});
sourcepub fn step_while<F: FnMut() -> bool>(&mut self, func: F)
pub fn step_while<F: FnMut() -> bool>(&mut self, func: F)
Calls self.step()
as long as func
evaluates to true
.
This method will continually execute even if there is not work
for the worker to perform. Consider using the similar method
Self::step_or_park_while(duration)
to allow the worker to yield
control if that is appropriate.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use timely::dataflow::operators::{ToStream, Inspect, Probe};
let probe =
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.inspect(|x| println!("{:?}", x))
.probe()
});
worker.step_while(|| probe.less_than(&0));
});
sourcepub fn step_or_park_while<F: FnMut() -> bool>(
&mut self,
duration: Option<Duration>,
func: F,
)
pub fn step_or_park_while<F: FnMut() -> bool>( &mut self, duration: Option<Duration>, func: F, )
Calls self.step_or_park(duration)
as long as func
evaluates to true
.
This method may yield whenever there is no work to perform, as performed
by Self::step_or_park()
. Please consult the documentation for further
information about that method and its behavior. In particular, the method
can park the worker indefinitely, if no new work re-awakens the worker.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
use timely::dataflow::operators::{ToStream, Inspect, Probe};
let probe =
worker.dataflow::<usize,_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.inspect(|x| println!("{:?}", x))
.probe()
});
worker.step_or_park_while(None, || probe.less_than(&0));
});
sourcepub fn index(&self) -> usize
pub fn index(&self) -> usize
The index of the worker out of its peers.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
let index = worker.index();
let peers = worker.peers();
let timer = worker.timer();
println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
});
sourcepub fn peers(&self) -> usize
pub fn peers(&self) -> usize
The total number of peer workers.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
let index = worker.index();
let peers = worker.peers();
let timer = worker.timer();
println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
});
sourcepub fn timer(&self) -> Instant
pub fn timer(&self) -> Instant
A timer started at the initiation of the timely computation.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
let index = worker.index();
let peers = worker.peers();
let timer = worker.timer();
println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
});
sourcepub fn new_identifier(&mut self) -> usize
pub fn new_identifier(&mut self) -> usize
Allocate a new worker-unique identifier.
This method is public, though it is not expected to be widely used outside of the timely dataflow system.
sourcepub fn peek_identifier(&self) -> usize
pub fn peek_identifier(&self) -> usize
The next worker-unique identifier to be allocated.
sourcepub fn log_register(&self) -> RefMut<'_, Registry<WorkerIdentifier>>
pub fn log_register(&self) -> RefMut<'_, Registry<WorkerIdentifier>>
Access to named loggers.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
worker.log_register()
.insert::<timely::logging::TimelyEvent,_>("timely", |time, data|
println!("{:?}\t{:?}", time, data)
);
});
sourcepub fn dataflow<T, R, F>(&mut self, func: F) -> R
pub fn dataflow<T, R, F>(&mut self, func: F) -> R
Construct a new dataflow.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
// We must supply the timestamp type here, although
// it would generally be determined by type inference.
worker.dataflow::<usize,_,_>(|scope| {
// uses of `scope` to build dataflow
});
});
sourcepub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
Construct a new dataflow with a (purely cosmetic) name.
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
// We must supply the timestamp type here, although
// it would generally be determined by type inference.
worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
// uses of `scope` to build dataflow
});
});
sourcepub fn dataflow_core<T, R, F, V>(
&mut self,
name: &str,
logging: Option<TimelyLogger>,
resources: V,
func: F,
) -> R
pub fn dataflow_core<T, R, F, V>( &mut self, name: &str, logging: Option<TimelyLogger>, resources: V, func: F, ) -> R
Construct a new dataflow with specific configurations.
This method constructs a new dataflow, using a name, logger, and additional resources specified as argument. The name is cosmetic, the logger is used to handle events generated by the dataflow, and the additional resources are kept alive for as long as the dataflow is alive (use case: shared library bindings).
§Examples
timely::execute_from_args(::std::env::args(), |worker| {
// We must supply the timestamp type here, although
// it would generally be determined by type inference.
worker.dataflow_core::<usize,_,_,_>(
"dataflow X", // Dataflow name
None, // Optional logger
37, // Any resources
|resources, scope| { // Closure
// uses of `resources`, `scope`to build dataflow
}
);
});
sourcepub fn drop_dataflow(&mut self, dataflow_identifier: usize)
pub fn drop_dataflow(&mut self, dataflow_identifier: usize)
Drops an identified dataflow.
This method removes the identified dataflow, which will no longer be scheduled. Various other resources will be cleaned up, though the method is currently in public beta rather than expected to work. Please report all crashes and unmet expectations!
sourcepub fn next_dataflow_index(&self) -> usize
pub fn next_dataflow_index(&self) -> usize
Returns the next index to be used for dataflow construction.
This identifier will appear in the address of contained operators, and can
be used to drop the dataflow using self.drop_dataflow()
.
sourcepub fn installed_dataflows(&self) -> Vec<usize>
pub fn installed_dataflows(&self) -> Vec<usize>
List the current dataflow indices.
sourcepub fn has_dataflows(&self) -> bool
pub fn has_dataflows(&self) -> bool
Returns true
if there is at least one dataflow under management.
Trait Implementations§
source§impl<A: Allocate> AsWorker for Worker<A>
impl<A: Allocate> AsWorker for Worker<A>
source§fn allocate<D: Exchangeable>(
&mut self,
identifier: usize,
address: Rc<[usize]>,
) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>)
fn allocate<D: Exchangeable>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>)
source§fn pipeline<T: 'static>(
&mut self,
identifier: usize,
address: Rc<[usize]>,
) -> (ThreadPusher<T>, ThreadPuller<T>)
fn pipeline<T: 'static>( &mut self, identifier: usize, address: Rc<[usize]>, ) -> (ThreadPusher<T>, ThreadPuller<T>)
source§fn new_identifier(&mut self) -> usize
fn new_identifier(&mut self) -> usize
source§fn peek_identifier(&self) -> usize
fn peek_identifier(&self) -> usize
source§fn log_register(&self) -> RefMut<'_, Registry<WorkerIdentifier>>
fn log_register(&self) -> RefMut<'_, Registry<WorkerIdentifier>>
source§fn logging(&self) -> Option<TimelyLogger>
fn logging(&self) -> Option<TimelyLogger>
source§impl<A: Allocate> Scheduler for Worker<A>
impl<A: Allocate> Scheduler for Worker<A>
source§fn activations(&self) -> Rc<RefCell<Activations>>
fn activations(&self) -> Rc<RefCell<Activations>>
source§fn activator_for(&self, path: Rc<[usize]>) -> Activator
fn activator_for(&self, path: Rc<[usize]>) -> Activator
Activator
tied to the specified operator address.source§fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator
fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator
SyncActivator
tied to the specified operator address.Auto Trait Implementations§
impl<A> Freeze for Worker<A>
impl<A> !RefUnwindSafe for Worker<A>
impl<A> !Send for Worker<A>
impl<A> !Sync for Worker<A>
impl<A> Unpin for Worker<A>
impl<A> !UnwindSafe for Worker<A>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)