1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
//! Hierarchical organization of timely dataflow graphs.
use std::rc::Rc;
use crate::progress::{Timestamp, Operate, Source, Target};
use crate::order::Product;
use crate::progress::timestamp::Refines;
use crate::communication::Allocate;
use crate::worker::AsWorker;
pub mod child;
pub use self::child::Child;
/// The information a child scope needs from its parent.
pub trait ScopeParent: AsWorker+Clone {
/// The timestamp associated with data in this scope.
type Timestamp : Timestamp;
}
impl<A: Allocate> ScopeParent for crate::worker::Worker<A> {
type Timestamp = ();
}
/// The fundamental operations required to add and connect operators in a timely dataflow graph.
///
/// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method
/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the
/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics.
pub trait Scope: ScopeParent {
/// A useful name describing the scope.
fn name(&self) -> String;
/// A sequence of scope identifiers describing the path from the worker root to this scope.
fn addr(&self) -> Rc<[usize]>;
/// A sequence of scope identifiers describing the path from the worker root to the child
/// indicated by `index`.
fn addr_for_child(&self, index: usize) -> Rc<[usize]>;
/// Connects a source of data with a target of the data. This only links the two for
/// the purposes of tracking progress, rather than effect any data movement itself.
fn add_edge(&self, source: Source, target: Target);
/// Adds a child `Operate` to the builder's scope. Returns the new child's index.
fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
let index = self.allocate_operator_index();
let global = self.new_identifier();
self.add_operator_with_indices(operator, index, global);
index
}
/// Allocates a new scope-local operator index.
///
/// This method is meant for use with `add_operator_with_index`, which accepts a scope-local
/// operator index allocated with this method. This method does cause the scope to expect that
/// an operator will be added, and it is an error not to eventually add such an operator.
fn allocate_operator_index(&mut self) -> usize;
/// Adds a child `Operate` to the builder's scope using a supplied index.
///
/// This is used internally when there is a gap between allocate a child identifier and adding the
/// child, as happens in subgraph creation.
fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
let global = self.new_identifier();
self.add_operator_with_indices(operator, index, global);
}
/// Adds a child `Operate` to the builder's scope using supplied indices.
///
/// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);
/// Creates a dataflow subgraph.
///
/// This method allows the user to create a nested scope with any timestamp that
/// "refines" the enclosing timestamp (informally: extends it in a reversible way).
///
/// This is most commonly used to create new iterative contexts, and the provided
/// method `iterative` for this task demonstrates the use of this method.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Input, Enter, Leave};
/// use timely::order::Product;
///
/// timely::execute_from_args(std::env::args(), |worker| {
/// // must specify types as nothing else drives inference.
/// let input = worker.dataflow::<u64,_,_>(|child1| {
/// let (input, stream) = child1.new_input::<String>();
/// let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
/// stream.enter(child2).leave()
/// });
/// input
/// });
/// });
/// ```
fn scoped<T, R, F>(&mut self, name: &str, func: F) -> R
where
T: Timestamp+Refines<<Self as ScopeParent>::Timestamp>,
F: FnOnce(&mut Child<Self, T>) -> R;
/// Creates a iterative dataflow subgraph.
///
/// This method is a specialization of `scoped` which uses the `Product` timestamp
/// combinator, suitable for iterative computations in which iterative development
/// at some time cannot influence prior iterations at a future time.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Input, Enter, Leave};
///
/// timely::execute_from_args(std::env::args(), |worker| {
/// // must specify types as nothing else drives inference.
/// let input = worker.dataflow::<u64,_,_>(|child1| {
/// let (input, stream) = child1.new_input::<String>();
/// let output = child1.iterative::<u32,_,_>(|child2| {
/// stream.enter(child2).leave()
/// });
/// input
/// });
/// });
/// ```
fn iterative<T, R, F>(&mut self, func: F) -> R
where
T: Timestamp,
F: FnOnce(&mut Child<Self, Product<<Self as ScopeParent>::Timestamp, T>>) -> R,
{
self.scoped::<Product<<Self as ScopeParent>::Timestamp, T>,R,F>("Iterative", func)
}
/// Creates a dataflow region with the same timestamp.
///
/// This method is a specialization of `scoped` which uses the same timestamp as the
/// containing scope. It is used mainly to group regions of a dataflow computation, and
/// provides some computational benefits by abstracting the specifics of the region.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Input, Enter, Leave};
///
/// timely::execute_from_args(std::env::args(), |worker| {
/// // must specify types as nothing else drives inference.
/// let input = worker.dataflow::<u64,_,_>(|child1| {
/// let (input, stream) = child1.new_input::<String>();
/// let output = child1.region(|child2| {
/// stream.enter(child2).leave()
/// });
/// input
/// });
/// });
/// ```
fn region<R, F>(&mut self, func: F) -> R
where
F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
{
self.region_named("Region", func)
}
/// Creates a dataflow region with the same timestamp.
///
/// This method is a specialization of `scoped` which uses the same timestamp as the
/// containing scope. It is used mainly to group regions of a dataflow computation, and
/// provides some computational benefits by abstracting the specifics of the region.
///
/// This variant allows you to specify a name for the region, which can be read out in
/// the timely logging streams.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Input, Enter, Leave};
///
/// timely::execute_from_args(std::env::args(), |worker| {
/// // must specify types as nothing else drives inference.
/// let input = worker.dataflow::<u64,_,_>(|child1| {
/// let (input, stream) = child1.new_input::<String>();
/// let output = child1.region_named("region", |child2| {
/// stream.enter(child2).leave()
/// });
/// input
/// });
/// });
/// ```
fn region_named<R, F>(&mut self, name: &str, func: F) -> R
where
F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
{
self.scoped::<<Self as ScopeParent>::Timestamp,R,F>(name, func)
}
}