timely/dataflow/scope.rs
1//! A dataflow scope, used to build dataflow graphs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::activate::Activations;
7use crate::progress::{Timestamp, Operate, Subgraph, SubgraphBuilder};
8use crate::progress::{Source, Target};
9use crate::progress::timestamp::Refines;
10use crate::order::Product;
11use crate::worker::Worker;
12
13/// Type alias for an iterative scope.
14pub type Iterative<'scope, TOuter, TInner> = Scope<'scope, Product<TOuter, TInner>>;
15
16/// A `Scope` manages the creation of new dataflow scopes, of operators and edges between them.
17///
18/// This is a shared object that can be freely copied, subject to its lifetime requirements.
19/// It manages scope construction through a `RefCell`-wrapped subgraph builder, and all of
20/// this type's methods use but do not hold write access through the `RefCell`.
21pub struct Scope<'scope, T: Timestamp> {
22 /// The subgraph under assembly.
23 ///
24 /// Stored as `RefCell<...>` so that multiple `Scope` copies can work on the same subgraph.
25 /// All methods on this type must release their borrow on this field before returning.
26 pub(crate) subgraph: &'scope RefCell<SubgraphBuilder<T>>,
27 /// The worker hosting this scope.
28 pub(crate) worker: &'scope Worker,
29}
30
31impl<'scope, T: Timestamp> Scope<'scope, T> {
32 /// Access to the underlying worker.
33 pub fn worker(&self) -> &'scope Worker { self.worker }
34 /// This worker's index out of `0 .. self.peers()`.
35 pub fn index(&self) -> usize { self.worker.index() }
36 /// The total number of workers in the computation.
37 pub fn peers(&self) -> usize { self.worker.peers() }
38 /// Provides a shared handle to the activation scheduler.
39 pub fn activations(&self) -> Rc<RefCell<Activations>> { self.worker.activations() }
40 /// Constructs an `Activator` tied to the specified operator address.
41 pub fn activator_for(&self, path: Rc<[usize]>) -> crate::scheduling::Activator { self.worker.activator_for(path) }
42
43 /// A useful name describing the scope.
44 pub fn name(&self) -> String { self.subgraph.borrow().name.clone() }
45
46 /// A sequence of scope identifiers describing the path from the worker root to this scope.
47 pub fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) }
48
49 /// Connects a source of data with a target of the data. This only links the two for
50 /// the purposes of tracking progress, rather than effect any data movement itself.
51 pub fn add_edge(&self, source: Source, target: Target) {
52 self.subgraph.borrow_mut().connect(source, target);
53 }
54
55 /// Reserves a slot for an operator in this scope.
56 ///
57 /// The returned [`OperatorSlot`] carries the scope-local index and worker-unique
58 /// identifier of the future operator. It must be consumed by [`OperatorSlot::install`]
59 /// before being dropped; otherwise it will panic, since the scope expects every
60 /// reserved slot to eventually be filled.
61 pub fn reserve_operator(&self) -> OperatorSlot<'scope, T> {
62 let index = self.subgraph.borrow_mut().allocate_child_id();
63 let identifier = self.worker().new_identifier();
64 OperatorSlot {
65 scope: *self,
66 index,
67 identifier,
68 installed: false,
69 }
70 }
71
72 /// Creates a dataflow subgraph.
73 ///
74 /// This method allows the user to create a nested scope with any timestamp that
75 /// "refines" the enclosing timestamp (informally: extends it in a reversible way).
76 ///
77 /// This is most commonly used to create new iterative contexts, and the provided
78 /// method `iterative` for this task demonstrates the use of this method.
79 ///
80 /// # Examples
81 /// ```
82 /// use timely::dataflow::operators::{Input, Enter, Leave};
83 /// use timely::order::Product;
84 ///
85 /// timely::execute_from_args(std::env::args(), |worker| {
86 /// // must specify types as nothing else drives inference.
87 /// let input = worker.dataflow::<u64,_,_>(|child1| {
88 /// let (input, stream) = child1.new_input::<Vec<String>>();
89 /// let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
90 /// stream.enter(child2).leave(child1)
91 /// });
92 /// input
93 /// });
94 /// });
95 /// ```
96 #[inline]
97 pub fn scoped<T2, R, F>(&self, name: &str, func: F) -> R
98 where
99 T2: Timestamp + Refines<T>,
100 F: FnOnce(Scope<T2>) -> R,
101 {
102 let (result, subgraph, slot) = self.scoped_raw(name, func);
103 slot.install(Box::new(subgraph));
104 result
105 }
106
107 /// Creates a dataflow subgraph, runs a user closure, and returns a result and the to-be-assembled parts.
108 ///
109 /// The returned subgraph must be registered in the operator slot.
110 pub fn scoped_raw<T2, R, F>(&self, name: &str, func: F) -> (R, Subgraph<T, T2>, OperatorSlot<'scope, T>)
111 where
112 T2: Timestamp + Refines<T>,
113 F: FnOnce(Scope<T2>) -> R,
114 {
115 let slot = self.reserve_operator();
116 let path = slot.addr();
117 let identifier = slot.identifier();
118
119 let subgraph = RefCell::new(SubgraphBuilder::new_from(path, identifier, name));
120
121 let child = Scope { subgraph: &subgraph, worker: self.worker };
122
123 let result = func(child);
124 let subgraph = subgraph.into_inner().build(self.worker);
125 (result, subgraph, slot)
126 }
127
128 /// Creates an iterative dataflow subgraph.
129 ///
130 /// This method is a specialization of `scoped` which uses the `Product` timestamp
131 /// combinator, suitable for iterative computations in which iterative development
132 /// at some time cannot influence prior iterations at a future time.
133 ///
134 /// # Examples
135 /// ```
136 /// use timely::dataflow::operators::{Input, Enter, Leave};
137 ///
138 /// timely::execute_from_args(std::env::args(), |worker| {
139 /// // must specify types as nothing else drives inference.
140 /// let input = worker.dataflow::<u64,_,_>(|child1| {
141 /// let (input, stream) = child1.new_input::<Vec<String>>();
142 /// let output = child1.iterative::<u32,_,_>(|child2| {
143 /// stream.enter(child2).leave(child1)
144 /// });
145 /// input
146 /// });
147 /// });
148 /// ```
149 pub fn iterative<T2, R, F>(&self, func: F) -> R
150 where
151 T2: Timestamp,
152 F: FnOnce(Scope<Product<T, T2>>) -> R,
153 {
154 self.scoped::<Product<T, T2>, R, F>("Iterative", func)
155 }
156
157 /// Creates a dataflow region with the same timestamp.
158 ///
159 /// This method is a specialization of `scoped` which uses the same timestamp as the
160 /// containing scope. It is used mainly to group regions of a dataflow computation, and
161 /// provides some computational benefits by abstracting the specifics of the region.
162 ///
163 /// # Examples
164 /// ```
165 /// use timely::dataflow::operators::{Input, Enter, Leave};
166 ///
167 /// timely::execute_from_args(std::env::args(), |worker| {
168 /// // must specify types as nothing else drives inference.
169 /// let input = worker.dataflow::<u64,_,_>(|child1| {
170 /// let (input, stream) = child1.new_input::<Vec<String>>();
171 /// let output = child1.region(|child2| {
172 /// stream.enter(child2).leave(child1)
173 /// });
174 /// input
175 /// });
176 /// });
177 /// ```
178 pub fn region<R, F>(&self, func: F) -> R
179 where
180 F: FnOnce(Scope<T>) -> R,
181 {
182 self.region_named("Region", func)
183 }
184
185 /// Creates a dataflow region with the same timestamp and a supplied name.
186 ///
187 /// This method is a specialization of `scoped` which uses the same timestamp as the
188 /// containing scope. It is used mainly to group regions of a dataflow computation, and
189 /// provides some computational benefits by abstracting the specifics of the region.
190 ///
191 /// This variant allows you to specify a name for the region, which can be read out in
192 /// the timely logging streams.
193 ///
194 /// # Examples
195 /// ```
196 /// use timely::dataflow::operators::{Input, Enter, Leave};
197 ///
198 /// timely::execute_from_args(std::env::args(), |worker| {
199 /// // must specify types as nothing else drives inference.
200 /// let input = worker.dataflow::<u64,_,_>(|child1| {
201 /// let (input, stream) = child1.new_input::<Vec<String>>();
202 /// let output = child1.region_named("region", |child2| {
203 /// stream.enter(child2).leave(child1)
204 /// });
205 /// input
206 /// });
207 /// });
208 /// ```
209 pub fn region_named<R, F>(&self, name: &str, func: F) -> R
210 where
211 F: FnOnce(Scope<T>) -> R,
212 {
213 self.scoped::<T, R, F>(name, func)
214 }
215}
216
217impl<'scope, T: Timestamp> Copy for Scope<'scope, T> {}
218impl<'scope, T: Timestamp> Clone for Scope<'scope, T> {
219 fn clone(&self) -> Self { *self }
220}
221
222impl<'scope, T: Timestamp> std::fmt::Debug for Scope<'scope, T> {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 f.debug_struct("Scope")
225 .field("name", &self.subgraph.borrow().name)
226 .field("path", &self.subgraph.borrow().path)
227 .finish_non_exhaustive()
228 }
229}
230
231/// A reservation for an operator at a specific position in a parent [`Scope`].
232///
233/// Returned by [`Scope::reserve_operator`] and [`SubscopeHandle::build`]. The
234/// slot carries the scope-local index and the worker-unique identifier of the
235/// operator-to-be. It must be consumed by [`OperatorSlot::install`] before
236/// being dropped; dropping an unfilled slot panics, since the parent scope
237/// expects the reserved index to eventually be filled.
238#[derive(Debug)]
239pub struct OperatorSlot<'scope, T: Timestamp> {
240 scope: Scope<'scope, T>,
241 index: usize,
242 identifier: usize,
243 installed: bool,
244}
245
246impl<'scope, T: Timestamp> OperatorSlot<'scope, T> {
247 /// The scope-local index reserved for the operator.
248 pub fn index(&self) -> usize { self.index }
249
250 /// The worker-unique identifier reserved for the operator (used for logging).
251 pub fn identifier(&self) -> usize { self.identifier }
252
253 /// The address (path from the worker root) at which the operator will live.
254 pub fn addr(&self) -> Rc<[usize]> {
255 let scope_path = &self.scope.subgraph.borrow().path[..];
256 let mut addr = Vec::with_capacity(scope_path.len() + 1);
257 addr.extend_from_slice(scope_path);
258 addr.push(self.index);
259 addr.into()
260 }
261
262 /// Installs `operator` at this slot, consuming the slot.
263 pub fn install(mut self, operator: Box<dyn Operate<T>>) {
264 // TODO: Check paths of self and operator; Operate has no such method at the moment.
265 self.scope.subgraph.borrow_mut().add_child(operator, self.index, self.identifier);
266 self.installed = true;
267 }
268}
269
270impl<'scope, T: Timestamp> Drop for OperatorSlot<'scope, T> {
271 fn drop(&mut self) {
272 if !self.installed && !std::thread::panicking() {
273 panic!(
274 "OperatorSlot for index {} dropped without `install` being called. \
275 Every reserved operator slot must be filled.",
276 self.index,
277 );
278 }
279 }
280}
281