timely/dataflow/scopes/
mod.rs

1//! Hierarchical organization of timely dataflow graphs.
2
3use std::rc::Rc;
4use crate::progress::{Timestamp, Operate, Source, Target};
5use crate::order::Product;
6use crate::progress::timestamp::Refines;
7use crate::communication::Allocate;
8use crate::worker::AsWorker;
9
10pub mod child;
11
12pub use self::child::Child;
13
14/// The information a child scope needs from its parent.
15pub trait ScopeParent: AsWorker+Clone {
16    /// The timestamp associated with data in this scope.
17    type Timestamp : Timestamp;
18}
19
20impl<A: Allocate> ScopeParent for crate::worker::Worker<A> {
21    type Timestamp = ();
22}
23
24
25/// The fundamental operations required to add and connect operators in a timely dataflow graph.
26///
27/// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method
28/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the
29/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics.
30pub trait Scope: ScopeParent {
31    /// A useful name describing the scope.
32    fn name(&self) -> String;
33
34    /// A sequence of scope identifiers describing the path from the worker root to this scope.
35    fn addr(&self) -> Rc<[usize]>;
36
37    /// A sequence of scope identifiers describing the path from the worker root to the child
38    /// indicated by `index`.
39    fn addr_for_child(&self, index: usize) -> Rc<[usize]>;
40
41    /// Connects a source of data with a target of the data. This only links the two for
42    /// the purposes of tracking progress, rather than effect any data movement itself.
43    fn add_edge(&self, source: Source, target: Target);
44
45    /// Adds a child `Operate` to the builder's scope. Returns the new child's index.
46    fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
47        let index = self.allocate_operator_index();
48        let global = self.new_identifier();
49        self.add_operator_with_indices(operator, index, global);
50        index
51    }
52
53    /// Allocates a new scope-local operator index.
54    ///
55    /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local
56    /// operator index allocated with this method. This method does cause the scope to expect that
57    /// an operator will be added, and it is an error not to eventually add such an operator.
58    fn allocate_operator_index(&mut self) -> usize;
59
60    /// Adds a child `Operate` to the builder's scope using a supplied index.
61    ///
62    /// This is used internally when there is a gap between allocate a child identifier and adding the
63    /// child, as happens in subgraph creation.
64    fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
65        let global = self.new_identifier();
66        self.add_operator_with_indices(operator, index, global);
67    }
68
69    /// Adds a child `Operate` to the builder's scope using supplied indices.
70    ///
71    /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
72    fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);
73
74    /// Creates a dataflow subgraph.
75    ///
76    /// This method allows the user to create a nested scope with any timestamp that
77    /// "refines" the enclosing timestamp (informally: extends it in a reversible way).
78    ///
79    /// This is most commonly used to create new iterative contexts, and the provided
80    /// method `iterative` for this task demonstrates the use of this method.
81    ///
82    /// # Examples
83    /// ```
84    /// use timely::dataflow::Scope;
85    /// use timely::dataflow::operators::{Input, Enter, Leave};
86    /// use timely::order::Product;
87    ///
88    /// timely::execute_from_args(std::env::args(), |worker| {
89    ///     // must specify types as nothing else drives inference.
90    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
91    ///         let (input, stream) = child1.new_input::<String>();
92    ///         let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
93    ///             stream.enter(child2).leave()
94    ///         });
95    ///         input
96    ///     });
97    /// });
98    /// ```
99    fn scoped<T, R, F>(&mut self, name: &str, func: F) -> R
100    where
101        T: Timestamp+Refines<<Self as ScopeParent>::Timestamp>,
102        F: FnOnce(&mut Child<Self, T>) -> R;
103
104    /// Creates a iterative dataflow subgraph.
105    ///
106    /// This method is a specialization of `scoped` which uses the `Product` timestamp
107    /// combinator, suitable for iterative computations in which iterative development
108    /// at some time cannot influence prior iterations at a future time.
109    ///
110    /// # Examples
111    /// ```
112    /// use timely::dataflow::Scope;
113    /// use timely::dataflow::operators::{Input, Enter, Leave};
114    ///
115    /// timely::execute_from_args(std::env::args(), |worker| {
116    ///     // must specify types as nothing else drives inference.
117    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
118    ///         let (input, stream) = child1.new_input::<String>();
119    ///         let output = child1.iterative::<u32,_,_>(|child2| {
120    ///             stream.enter(child2).leave()
121    ///         });
122    ///         input
123    ///     });
124    /// });
125    /// ```
126    fn iterative<T, R, F>(&mut self, func: F) -> R
127    where
128        T: Timestamp,
129        F: FnOnce(&mut Child<Self, Product<<Self as ScopeParent>::Timestamp, T>>) -> R,
130    {
131        self.scoped::<Product<<Self as ScopeParent>::Timestamp, T>,R,F>("Iterative", func)
132    }
133
134    /// Creates a dataflow region with the same timestamp.
135    ///
136    /// This method is a specialization of `scoped` which uses the same timestamp as the
137    /// containing scope. It is used mainly to group regions of a dataflow computation, and
138    /// provides some computational benefits by abstracting the specifics of the region.
139    ///
140    /// # Examples
141    /// ```
142    /// use timely::dataflow::Scope;
143    /// use timely::dataflow::operators::{Input, Enter, Leave};
144    ///
145    /// timely::execute_from_args(std::env::args(), |worker| {
146    ///     // must specify types as nothing else drives inference.
147    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
148    ///         let (input, stream) = child1.new_input::<String>();
149    ///         let output = child1.region(|child2| {
150    ///             stream.enter(child2).leave()
151    ///         });
152    ///         input
153    ///     });
154    /// });
155    /// ```
156    fn region<R, F>(&mut self, func: F) -> R
157    where
158        F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
159    {
160        self.region_named("Region", func)
161    }
162
163    /// Creates a dataflow region with the same timestamp.
164    ///
165    /// This method is a specialization of `scoped` which uses the same timestamp as the
166    /// containing scope. It is used mainly to group regions of a dataflow computation, and
167    /// provides some computational benefits by abstracting the specifics of the region.
168    ///
169    /// This variant allows you to specify a name for the region, which can be read out in
170    /// the timely logging streams.
171    ///
172    /// # Examples
173    /// ```
174    /// use timely::dataflow::Scope;
175    /// use timely::dataflow::operators::{Input, Enter, Leave};
176    ///
177    /// timely::execute_from_args(std::env::args(), |worker| {
178    ///     // must specify types as nothing else drives inference.
179    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
180    ///         let (input, stream) = child1.new_input::<String>();
181    ///         let output = child1.region_named("region", |child2| {
182    ///             stream.enter(child2).leave()
183    ///         });
184    ///         input
185    ///     });
186    /// });
187    /// ```
188    fn region_named<R, F>(&mut self, name: &str, func: F) -> R
189    where
190        F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
191    {
192        self.scoped::<<Self as ScopeParent>::Timestamp,R,F>(name, func)
193    }
194
195}