Skip to main content

timely/dataflow/
scope.rs

1//! A dataflow scope, used to build dataflow graphs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::activate::Activations;
7use crate::progress::{Timestamp, Operate, Subgraph, SubgraphBuilder};
8use crate::progress::{Source, Target};
9use crate::progress::timestamp::Refines;
10use crate::order::Product;
11use crate::worker::Worker;
12
13/// Type alias for an iterative scope.
14pub type Iterative<'scope, TOuter, TInner> = Scope<'scope, Product<TOuter, TInner>>;
15
16/// A `Scope` manages the creation of new dataflow scopes, of operators and edges between them.
17///
18/// This is a shared object that can be freely copied, subject to its lifetime requirements.
19/// It manages scope construction through a `RefCell`-wrapped subgraph builder, and all of
20/// this type's methods use but do not hold write access through the `RefCell`.
21pub struct Scope<'scope, T: Timestamp> {
22    /// The subgraph under assembly.
23    ///
24    /// Stored as `RefCell<...>` so that multiple `Scope` copies can work on the same subgraph.
25    /// All methods on this type must release their borrow on this field before returning.
26    pub(crate) subgraph: &'scope RefCell<SubgraphBuilder<T>>,
27    /// The worker hosting this scope.
28    pub(crate) worker:   &'scope Worker,
29}
30
31impl<'scope, T: Timestamp> Scope<'scope, T> {
32    /// Access to the underlying worker.
33    pub fn worker(&self) -> &'scope Worker { self.worker }
34    /// This worker's index out of `0 .. self.peers()`.
35    pub fn index(&self) -> usize { self.worker.index() }
36    /// The total number of workers in the computation.
37    pub fn peers(&self) -> usize { self.worker.peers() }
38    /// Provides a shared handle to the activation scheduler.
39    pub fn activations(&self) -> Rc<RefCell<Activations>> { self.worker.activations() }
40    /// Constructs an `Activator` tied to the specified operator address.
41    pub fn activator_for(&self, path: Rc<[usize]>) -> crate::scheduling::Activator { self.worker.activator_for(path) }
42
43    /// A useful name describing the scope.
44    pub fn name(&self) -> String { self.subgraph.borrow().name.clone() }
45
46    /// A sequence of scope identifiers describing the path from the worker root to this scope.
47    pub fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) }
48
49    /// Connects a source of data with a target of the data. This only links the two for
50    /// the purposes of tracking progress, rather than effect any data movement itself.
51    pub fn add_edge(&self, source: Source, target: Target) {
52        self.subgraph.borrow_mut().connect(source, target);
53    }
54
55    /// Reserves a slot for an operator in this scope.
56    ///
57    /// The returned [`OperatorSlot`] carries the scope-local index and worker-unique
58    /// identifier of the future operator. It must be consumed by [`OperatorSlot::install`]
59    /// before being dropped; otherwise it will panic, since the scope expects every
60    /// reserved slot to eventually be filled.
61    pub fn reserve_operator(&self) -> OperatorSlot<'scope, T> {
62        let index = self.subgraph.borrow_mut().allocate_child_id();
63        let identifier = self.worker().new_identifier();
64        OperatorSlot {
65            scope: *self,
66            index,
67            identifier,
68            installed: false,
69        }
70    }
71
72    /// Creates a dataflow subgraph.
73    ///
74    /// This method allows the user to create a nested scope with any timestamp that
75    /// "refines" the enclosing timestamp (informally: extends it in a reversible way).
76    ///
77    /// This is most commonly used to create new iterative contexts, and the provided
78    /// method `iterative` for this task demonstrates the use of this method.
79    ///
80    /// # Examples
81    /// ```
82    /// use timely::dataflow::operators::{Input, Enter, Leave};
83    /// use timely::order::Product;
84    ///
85    /// timely::execute_from_args(std::env::args(), |worker| {
86    ///     // must specify types as nothing else drives inference.
87    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
88    ///         let (input, stream) = child1.new_input::<Vec<String>>();
89    ///         let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
90    ///             stream.enter(child2).leave(child1)
91    ///         });
92    ///         input
93    ///     });
94    /// });
95    /// ```
96    #[inline]
97    pub fn scoped<T2, R, F>(&self, name: &str, func: F) -> R
98    where
99        T2: Timestamp + Refines<T>,
100        F: FnOnce(Scope<T2>) -> R,
101    {
102        let (result, subgraph, slot) = self.scoped_raw(name, func);
103        slot.install(Box::new(subgraph));
104        result
105    }
106
107    /// Creates a dataflow subgraph, runs a user closure, and returns a result and the to-be-assembled parts.
108    ///
109    /// The returned subgraph must be registered in the operator slot.
110    pub fn scoped_raw<T2, R, F>(&self, name: &str, func: F) -> (R, Subgraph<T, T2>, OperatorSlot<'scope, T>)
111    where
112        T2: Timestamp + Refines<T>,
113        F: FnOnce(Scope<T2>) -> R,
114    {
115        let slot = self.reserve_operator();
116        let path = slot.addr();
117        let identifier = slot.identifier();
118
119        let subgraph = RefCell::new(SubgraphBuilder::new_from(path, identifier, name));
120
121        let child = Scope { subgraph: &subgraph, worker: self.worker };
122
123        let result = func(child);
124        let subgraph = subgraph.into_inner().build(self.worker);
125        (result, subgraph, slot)
126    }
127
128    /// Creates an iterative dataflow subgraph.
129    ///
130    /// This method is a specialization of `scoped` which uses the `Product` timestamp
131    /// combinator, suitable for iterative computations in which iterative development
132    /// at some time cannot influence prior iterations at a future time.
133    ///
134    /// # Examples
135    /// ```
136    /// use timely::dataflow::operators::{Input, Enter, Leave};
137    ///
138    /// timely::execute_from_args(std::env::args(), |worker| {
139    ///     // must specify types as nothing else drives inference.
140    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
141    ///         let (input, stream) = child1.new_input::<Vec<String>>();
142    ///         let output = child1.iterative::<u32,_,_>(|child2| {
143    ///             stream.enter(child2).leave(child1)
144    ///         });
145    ///         input
146    ///     });
147    /// });
148    /// ```
149    pub fn iterative<T2, R, F>(&self, func: F) -> R
150    where
151        T2: Timestamp,
152        F: FnOnce(Scope<Product<T, T2>>) -> R,
153    {
154        self.scoped::<Product<T, T2>, R, F>("Iterative", func)
155    }
156
157    /// Creates a dataflow region with the same timestamp.
158    ///
159    /// This method is a specialization of `scoped` which uses the same timestamp as the
160    /// containing scope. It is used mainly to group regions of a dataflow computation, and
161    /// provides some computational benefits by abstracting the specifics of the region.
162    ///
163    /// # Examples
164    /// ```
165    /// use timely::dataflow::operators::{Input, Enter, Leave};
166    ///
167    /// timely::execute_from_args(std::env::args(), |worker| {
168    ///     // must specify types as nothing else drives inference.
169    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
170    ///         let (input, stream) = child1.new_input::<Vec<String>>();
171    ///         let output = child1.region(|child2| {
172    ///             stream.enter(child2).leave(child1)
173    ///         });
174    ///         input
175    ///     });
176    /// });
177    /// ```
178    pub fn region<R, F>(&self, func: F) -> R
179    where
180        F: FnOnce(Scope<T>) -> R,
181    {
182        self.region_named("Region", func)
183    }
184
185    /// Creates a dataflow region with the same timestamp and a supplied name.
186    ///
187    /// This method is a specialization of `scoped` which uses the same timestamp as the
188    /// containing scope. It is used mainly to group regions of a dataflow computation, and
189    /// provides some computational benefits by abstracting the specifics of the region.
190    ///
191    /// This variant allows you to specify a name for the region, which can be read out in
192    /// the timely logging streams.
193    ///
194    /// # Examples
195    /// ```
196    /// use timely::dataflow::operators::{Input, Enter, Leave};
197    ///
198    /// timely::execute_from_args(std::env::args(), |worker| {
199    ///     // must specify types as nothing else drives inference.
200    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
201    ///         let (input, stream) = child1.new_input::<Vec<String>>();
202    ///         let output = child1.region_named("region", |child2| {
203    ///             stream.enter(child2).leave(child1)
204    ///         });
205    ///         input
206    ///     });
207    /// });
208    /// ```
209    pub fn region_named<R, F>(&self, name: &str, func: F) -> R
210    where
211        F: FnOnce(Scope<T>) -> R,
212    {
213        self.scoped::<T, R, F>(name, func)
214    }
215}
216
217impl<'scope, T: Timestamp> Copy for Scope<'scope, T> {}
218impl<'scope, T: Timestamp> Clone for Scope<'scope, T> {
219    fn clone(&self) -> Self { *self }
220}
221
222impl<'scope, T: Timestamp> std::fmt::Debug for Scope<'scope, T> {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        f.debug_struct("Scope")
225            .field("name", &self.subgraph.borrow().name)
226            .field("path", &self.subgraph.borrow().path)
227            .finish_non_exhaustive()
228    }
229}
230
231/// A reservation for an operator at a specific position in a parent [`Scope`].
232///
233/// Returned by [`Scope::reserve_operator`] and [`SubscopeHandle::build`]. The
234/// slot carries the scope-local index and the worker-unique identifier of the
235/// operator-to-be. It must be consumed by [`OperatorSlot::install`] before
236/// being dropped; dropping an unfilled slot panics, since the parent scope
237/// expects the reserved index to eventually be filled.
238#[derive(Debug)]
239pub struct OperatorSlot<'scope, T: Timestamp> {
240    scope: Scope<'scope, T>,
241    index: usize,
242    identifier: usize,
243    installed: bool,
244}
245
246impl<'scope, T: Timestamp> OperatorSlot<'scope, T> {
247    /// The scope-local index reserved for the operator.
248    pub fn index(&self) -> usize { self.index }
249
250    /// The worker-unique identifier reserved for the operator (used for logging).
251    pub fn identifier(&self) -> usize { self.identifier }
252
253    /// The address (path from the worker root) at which the operator will live.
254    pub fn addr(&self) -> Rc<[usize]> {
255        let scope_path = &self.scope.subgraph.borrow().path[..];
256        let mut addr = Vec::with_capacity(scope_path.len() + 1);
257        addr.extend_from_slice(scope_path);
258        addr.push(self.index);
259        addr.into()
260    }
261
262    /// Installs `operator` at this slot, consuming the slot.
263    pub fn install(mut self, operator: Box<dyn Operate<T>>) {
264        // TODO: Check paths of self and operator; Operate has no such method at the moment.
265        self.scope.subgraph.borrow_mut().add_child(operator, self.index, self.identifier);
266        self.installed = true;
267    }
268}
269
270impl<'scope, T: Timestamp> Drop for OperatorSlot<'scope, T> {
271    fn drop(&mut self) {
272        if !self.installed && !std::thread::panicking() {
273            panic!(
274                "OperatorSlot for index {} dropped without `install` being called. \
275                 Every reserved operator slot must be filled.",
276                self.index,
277            );
278        }
279    }
280}
281