1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
//! Hierarchical organization of timely dataflow graphs.

use std::rc::Rc;
use crate::progress::{Timestamp, Operate, Source, Target};
use crate::order::Product;
use crate::progress::timestamp::Refines;
use crate::communication::Allocate;
use crate::worker::AsWorker;

pub mod child;

pub use self::child::Child;

/// The information a child scope needs from its parent.
pub trait ScopeParent: AsWorker+Clone {
    /// The timestamp associated with data in this scope.
    type Timestamp : Timestamp;
}

impl<A: Allocate> ScopeParent for crate::worker::Worker<A> {
    type Timestamp = ();
}


/// The fundamental operations required to add and connect operators in a timely dataflow graph.
///
/// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method
/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the
/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics.
pub trait Scope: ScopeParent {
    /// A useful name describing the scope.
    fn name(&self) -> String;

    /// A sequence of scope identifiers describing the path from the worker root to this scope.
    fn addr(&self) -> Rc<[usize]>;

    /// A sequence of scope identifiers describing the path from the worker root to the child
    /// indicated by `index`.
    fn addr_for_child(&self, index: usize) -> Rc<[usize]>;

    /// Connects a source of data with a target of the data. This only links the two for
    /// the purposes of tracking progress, rather than effect any data movement itself.
    fn add_edge(&self, source: Source, target: Target);

    /// Adds a child `Operate` to the builder's scope. Returns the new child's index.
    fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
        let index = self.allocate_operator_index();
        let global = self.new_identifier();
        self.add_operator_with_indices(operator, index, global);
        index
    }

    /// Allocates a new scope-local operator index.
    ///
    /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local
    /// operator index allocated with this method. This method does cause the scope to expect that
    /// an operator will be added, and it is an error not to eventually add such an operator.
    fn allocate_operator_index(&mut self) -> usize;

    /// Adds a child `Operate` to the builder's scope using a supplied index.
    ///
    /// This is used internally when there is a gap between allocate a child identifier and adding the
    /// child, as happens in subgraph creation.
    fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
        let global = self.new_identifier();
        self.add_operator_with_indices(operator, index, global);
    }

    /// Adds a child `Operate` to the builder's scope using supplied indices.
    ///
    /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
    fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);

    /// Creates a dataflow subgraph.
    ///
    /// This method allows the user to create a nested scope with any timestamp that
    /// "refines" the enclosing timestamp (informally: extends it in a reversible way).
    ///
    /// This is most commonly used to create new iterative contexts, and the provided
    /// method `iterative` for this task demonstrates the use of this method.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::Scope;
    /// use timely::dataflow::operators::{Input, Enter, Leave};
    /// use timely::order::Product;
    ///
    /// timely::execute_from_args(std::env::args(), |worker| {
    ///     // must specify types as nothing else drives inference.
    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
    ///         let (input, stream) = child1.new_input::<String>();
    ///         let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
    ///             stream.enter(child2).leave()
    ///         });
    ///         input
    ///     });
    /// });
    /// ```
    fn scoped<T, R, F>(&mut self, name: &str, func: F) -> R
    where
        T: Timestamp+Refines<<Self as ScopeParent>::Timestamp>,
        F: FnOnce(&mut Child<Self, T>) -> R;

    /// Creates a iterative dataflow subgraph.
    ///
    /// This method is a specialization of `scoped` which uses the `Product` timestamp
    /// combinator, suitable for iterative computations in which iterative development
    /// at some time cannot influence prior iterations at a future time.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::Scope;
    /// use timely::dataflow::operators::{Input, Enter, Leave};
    ///
    /// timely::execute_from_args(std::env::args(), |worker| {
    ///     // must specify types as nothing else drives inference.
    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
    ///         let (input, stream) = child1.new_input::<String>();
    ///         let output = child1.iterative::<u32,_,_>(|child2| {
    ///             stream.enter(child2).leave()
    ///         });
    ///         input
    ///     });
    /// });
    /// ```
    fn iterative<T, R, F>(&mut self, func: F) -> R
    where
        T: Timestamp,
        F: FnOnce(&mut Child<Self, Product<<Self as ScopeParent>::Timestamp, T>>) -> R,
    {
        self.scoped::<Product<<Self as ScopeParent>::Timestamp, T>,R,F>("Iterative", func)
    }

    /// Creates a dataflow region with the same timestamp.
    ///
    /// This method is a specialization of `scoped` which uses the same timestamp as the
    /// containing scope. It is used mainly to group regions of a dataflow computation, and
    /// provides some computational benefits by abstracting the specifics of the region.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::Scope;
    /// use timely::dataflow::operators::{Input, Enter, Leave};
    ///
    /// timely::execute_from_args(std::env::args(), |worker| {
    ///     // must specify types as nothing else drives inference.
    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
    ///         let (input, stream) = child1.new_input::<String>();
    ///         let output = child1.region(|child2| {
    ///             stream.enter(child2).leave()
    ///         });
    ///         input
    ///     });
    /// });
    /// ```
    fn region<R, F>(&mut self, func: F) -> R
    where
        F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
    {
        self.region_named("Region", func)
    }

    /// Creates a dataflow region with the same timestamp.
    ///
    /// This method is a specialization of `scoped` which uses the same timestamp as the
    /// containing scope. It is used mainly to group regions of a dataflow computation, and
    /// provides some computational benefits by abstracting the specifics of the region.
    ///
    /// This variant allows you to specify a name for the region, which can be read out in
    /// the timely logging streams.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::Scope;
    /// use timely::dataflow::operators::{Input, Enter, Leave};
    ///
    /// timely::execute_from_args(std::env::args(), |worker| {
    ///     // must specify types as nothing else drives inference.
    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
    ///         let (input, stream) = child1.new_input::<String>();
    ///         let output = child1.region_named("region", |child2| {
    ///             stream.enter(child2).leave()
    ///         });
    ///         input
    ///     });
    /// });
    /// ```
    fn region_named<R, F>(&mut self, name: &str, func: F) -> R
    where
        F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
    {
        self.scoped::<<Self as ScopeParent>::Timestamp,R,F>(name, func)
    }

}