timely/dataflow/scopes/mod.rs
1//! Hierarchical organization of timely dataflow graphs.
2
3use std::rc::Rc;
4use crate::progress::{Timestamp, Operate, Source, Target};
5use crate::order::Product;
6use crate::progress::timestamp::Refines;
7use crate::communication::Allocate;
8use crate::worker::AsWorker;
9
10pub mod child;
11
12pub use self::child::Child;
13
14/// The information a child scope needs from its parent.
15pub trait ScopeParent: AsWorker+Clone {
16 /// The timestamp associated with data in this scope.
17 type Timestamp : Timestamp;
18}
19
20impl<A: Allocate> ScopeParent for crate::worker::Worker<A> {
21 type Timestamp = ();
22}
23
24
25/// The fundamental operations required to add and connect operators in a timely dataflow graph.
26///
27/// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method
28/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the
29/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics.
30pub trait Scope: ScopeParent {
31 /// A useful name describing the scope.
32 fn name(&self) -> String;
33
34 /// A sequence of scope identifiers describing the path from the worker root to this scope.
35 fn addr(&self) -> Rc<[usize]>;
36
37 /// A sequence of scope identifiers describing the path from the worker root to the child
38 /// indicated by `index`.
39 fn addr_for_child(&self, index: usize) -> Rc<[usize]>;
40
41 /// Connects a source of data with a target of the data. This only links the two for
42 /// the purposes of tracking progress, rather than effect any data movement itself.
43 fn add_edge(&self, source: Source, target: Target);
44
45 /// Adds a child `Operate` to the builder's scope. Returns the new child's index.
46 fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
47 let index = self.allocate_operator_index();
48 let global = self.new_identifier();
49 self.add_operator_with_indices(operator, index, global);
50 index
51 }
52
53 /// Allocates a new scope-local operator index.
54 ///
55 /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local
56 /// operator index allocated with this method. This method does cause the scope to expect that
57 /// an operator will be added, and it is an error not to eventually add such an operator.
58 fn allocate_operator_index(&mut self) -> usize;
59
60 /// Adds a child `Operate` to the builder's scope using a supplied index.
61 ///
62 /// This is used internally when there is a gap between allocate a child identifier and adding the
63 /// child, as happens in subgraph creation.
64 fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
65 let global = self.new_identifier();
66 self.add_operator_with_indices(operator, index, global);
67 }
68
69 /// Adds a child `Operate` to the builder's scope using supplied indices.
70 ///
71 /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
72 fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);
73
74 /// Creates a dataflow subgraph.
75 ///
76 /// This method allows the user to create a nested scope with any timestamp that
77 /// "refines" the enclosing timestamp (informally: extends it in a reversible way).
78 ///
79 /// This is most commonly used to create new iterative contexts, and the provided
80 /// method `iterative` for this task demonstrates the use of this method.
81 ///
82 /// # Examples
83 /// ```
84 /// use timely::dataflow::Scope;
85 /// use timely::dataflow::operators::{Input, Enter, Leave};
86 /// use timely::order::Product;
87 ///
88 /// timely::execute_from_args(std::env::args(), |worker| {
89 /// // must specify types as nothing else drives inference.
90 /// let input = worker.dataflow::<u64,_,_>(|child1| {
91 /// let (input, stream) = child1.new_input::<String>();
92 /// let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
93 /// stream.enter(child2).leave()
94 /// });
95 /// input
96 /// });
97 /// });
98 /// ```
99 fn scoped<T, R, F>(&mut self, name: &str, func: F) -> R
100 where
101 T: Timestamp+Refines<<Self as ScopeParent>::Timestamp>,
102 F: FnOnce(&mut Child<Self, T>) -> R;
103
104 /// Creates a iterative dataflow subgraph.
105 ///
106 /// This method is a specialization of `scoped` which uses the `Product` timestamp
107 /// combinator, suitable for iterative computations in which iterative development
108 /// at some time cannot influence prior iterations at a future time.
109 ///
110 /// # Examples
111 /// ```
112 /// use timely::dataflow::Scope;
113 /// use timely::dataflow::operators::{Input, Enter, Leave};
114 ///
115 /// timely::execute_from_args(std::env::args(), |worker| {
116 /// // must specify types as nothing else drives inference.
117 /// let input = worker.dataflow::<u64,_,_>(|child1| {
118 /// let (input, stream) = child1.new_input::<String>();
119 /// let output = child1.iterative::<u32,_,_>(|child2| {
120 /// stream.enter(child2).leave()
121 /// });
122 /// input
123 /// });
124 /// });
125 /// ```
126 fn iterative<T, R, F>(&mut self, func: F) -> R
127 where
128 T: Timestamp,
129 F: FnOnce(&mut Child<Self, Product<<Self as ScopeParent>::Timestamp, T>>) -> R,
130 {
131 self.scoped::<Product<<Self as ScopeParent>::Timestamp, T>,R,F>("Iterative", func)
132 }
133
134 /// Creates a dataflow region with the same timestamp.
135 ///
136 /// This method is a specialization of `scoped` which uses the same timestamp as the
137 /// containing scope. It is used mainly to group regions of a dataflow computation, and
138 /// provides some computational benefits by abstracting the specifics of the region.
139 ///
140 /// # Examples
141 /// ```
142 /// use timely::dataflow::Scope;
143 /// use timely::dataflow::operators::{Input, Enter, Leave};
144 ///
145 /// timely::execute_from_args(std::env::args(), |worker| {
146 /// // must specify types as nothing else drives inference.
147 /// let input = worker.dataflow::<u64,_,_>(|child1| {
148 /// let (input, stream) = child1.new_input::<String>();
149 /// let output = child1.region(|child2| {
150 /// stream.enter(child2).leave()
151 /// });
152 /// input
153 /// });
154 /// });
155 /// ```
156 fn region<R, F>(&mut self, func: F) -> R
157 where
158 F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
159 {
160 self.region_named("Region", func)
161 }
162
163 /// Creates a dataflow region with the same timestamp.
164 ///
165 /// This method is a specialization of `scoped` which uses the same timestamp as the
166 /// containing scope. It is used mainly to group regions of a dataflow computation, and
167 /// provides some computational benefits by abstracting the specifics of the region.
168 ///
169 /// This variant allows you to specify a name for the region, which can be read out in
170 /// the timely logging streams.
171 ///
172 /// # Examples
173 /// ```
174 /// use timely::dataflow::Scope;
175 /// use timely::dataflow::operators::{Input, Enter, Leave};
176 ///
177 /// timely::execute_from_args(std::env::args(), |worker| {
178 /// // must specify types as nothing else drives inference.
179 /// let input = worker.dataflow::<u64,_,_>(|child1| {
180 /// let (input, stream) = child1.new_input::<String>();
181 /// let output = child1.region_named("region", |child2| {
182 /// stream.enter(child2).leave()
183 /// });
184 /// input
185 /// });
186 /// });
187 /// ```
188 fn region_named<R, F>(&mut self, name: &str, func: F) -> R
189 where
190 F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
191 {
192 self.scoped::<<Self as ScopeParent>::Timestamp,R,F>(name, func)
193 }
194
195}