1use std::cell::RefCell;
19use std::rc::Rc;
20
21use timely::dataflow::Scope;
22use timely::dataflow::scopes::Child;
23use timely::dataflow::scopes::ScopeParent;
24use timely::progress::operate::SharedProgress;
25use timely::progress::timestamp::Refines;
26use timely::progress::{Operate, SubgraphBuilder, Timestamp};
27use timely::scheduling::Schedule;
28use timely::scheduling::Scheduler;
29use timely::worker::AsWorker;
30
31#[derive(Clone)]
34pub struct LabelledScope<G> {
35 label: String,
37 inner: G,
39}
40
41impl<'a, G, T> LabelledScope<Child<'a, G, T>>
42where
43 G: ScopeParent,
44 T: Timestamp + Refines<G::Timestamp>,
45{
46 pub fn parent(&self) -> &G {
48 &self.inner.parent
49 }
50}
51
52impl<G: Scheduler> Scheduler for LabelledScope<G> {
53 fn activations(&self) -> Rc<RefCell<timely::scheduling::Activations>> {
54 self.inner.activations()
55 }
56
57 fn activator_for(&self, path: Rc<[usize]>) -> timely::scheduling::Activator {
58 self.inner.activator_for(path)
59 }
60
61 fn sync_activator_for(&self, path: Vec<usize>) -> timely::scheduling::SyncActivator {
62 self.inner.sync_activator_for(path)
63 }
64}
65
66impl<G: AsWorker> AsWorker for LabelledScope<G> {
67 fn config(&self) -> &timely::WorkerConfig {
68 self.inner.config()
69 }
70
71 fn index(&self) -> usize {
72 self.inner.index()
73 }
74
75 fn peers(&self) -> usize {
76 self.inner.peers()
77 }
78
79 fn allocate<T: timely::communication::Exchangeable>(
80 &mut self,
81 identifier: usize,
82 address: Rc<[usize]>,
83 ) -> (
84 Vec<Box<dyn timely::communication::Push<T>>>,
85 Box<dyn timely::communication::Pull<T>>,
86 ) {
87 self.inner.allocate(identifier, address)
88 }
89
90 fn pipeline<T: 'static>(
91 &mut self,
92 identifier: usize,
93 address: Rc<[usize]>,
94 ) -> (
95 timely::communication::allocator::thread::ThreadPusher<T>,
96 timely::communication::allocator::thread::ThreadPuller<T>,
97 ) {
98 self.inner.pipeline(identifier, address)
99 }
100
101 fn broadcast<T: timely::communication::Exchangeable + Clone>(
102 &mut self,
103 identifier: usize,
104 address: Rc<[usize]>,
105 ) -> (
106 Box<dyn timely::communication::Push<T>>,
107 Box<dyn timely::communication::Pull<T>>,
108 ) {
109 self.inner.broadcast(identifier, address)
110 }
111
112 fn new_identifier(&mut self) -> usize {
113 self.inner.new_identifier()
114 }
115
116 fn peek_identifier(&self) -> usize {
117 self.inner.peek_identifier()
118 }
119
120 fn log_register(&self) -> Option<std::cell::RefMut<'_, timely::logging_core::Registry>> {
121 self.inner.log_register()
122 }
123
124 fn logger_for<CB: timely::ContainerBuilder>(
125 &self,
126 name: &str,
127 ) -> Option<timely::logging_core::Logger<CB>> {
128 self.inner.logger_for(name)
129 }
130
131 fn logging(&self) -> Option<timely::logging::TimelyLogger> {
132 self.inner.logging()
133 }
134}
135
136impl<G: ScopeParent> ScopeParent for LabelledScope<G> {
137 type Timestamp = G::Timestamp;
138}
139
140impl<'a, G, T> Scope for LabelledScope<Child<'a, G, T>>
141where
142 G: ScopeParent,
143 T: Timestamp + Refines<G::Timestamp>,
144{
145 fn name(&self) -> String {
146 self.inner.name()
147 }
148
149 fn addr(&self) -> Rc<[usize]> {
150 self.inner.addr()
151 }
152
153 fn addr_for_child(&self, index: usize) -> Rc<[usize]> {
154 self.inner.addr_for_child(index)
155 }
156
157 fn add_edge(&self, source: timely::progress::Source, target: timely::progress::Target) {
158 self.inner.add_edge(source, target)
159 }
160
161 fn allocate_operator_index(&mut self) -> usize {
162 self.inner.allocate_operator_index()
163 }
164
165 fn add_operator_with_indices(
166 &mut self,
167 operator: Box<dyn Operate<Self::Timestamp>>,
168 local: usize,
169 global: usize,
170 ) {
171 let operator = LabelledOperator::new(&self.label, BoxedOperator(operator));
172 self.inner
173 .add_operator_with_indices(Box::new(operator), local, global)
174 }
175
176 fn scoped<T2, R, F>(&mut self, name: &str, func: F) -> R
177 where
178 T2: Timestamp + Refines<<Self as ScopeParent>::Timestamp>,
179 F: FnOnce(&mut Child<Self, T2>) -> R,
180 {
181 let index = self.inner.subgraph.borrow_mut().allocate_child_id();
182 let identifier = self.new_identifier();
183 let path = self.addr_for_child(index);
184
185 let type_name = std::any::type_name::<T2>();
186 let progress_logging = self.logger_for(&format!("timely/progress/{type_name}"));
187 let summary_logging = self.logger_for(&format!("timely/summary/{type_name}"));
188
189 let subscope = RefCell::new(SubgraphBuilder::new_from(
190 path,
191 identifier,
192 self.logging(),
193 summary_logging,
194 name,
195 ));
196 let result = {
197 let mut builder = Child {
198 subgraph: &subscope,
199 parent: self.clone(),
200 logging: self.inner.logging.clone(),
201 progress_logging,
202 };
203 func(&mut builder)
204 };
205 let subscope = subscope.into_inner().build(self);
206 let subscope = LabelledOperator::new(&self.label, subscope);
207
208 self.inner
209 .add_operator_with_indices(Box::new(subscope), index, identifier);
210
211 result
212 }
213}
214
215pub struct LabelledOperator<O> {
218 label: String,
220 inner: O,
222}
223
224impl<O> LabelledOperator<O> {
225 fn new(label: &str, operator: O) -> Self {
226 LabelledOperator {
227 label: label.to_owned(),
228 inner: operator,
229 }
230 }
231}
232
233impl<T: Timestamp, O: Operate<T>> Operate<T> for LabelledOperator<O> {
234 fn inputs(&self) -> usize {
235 self.inner.inputs()
236 }
237
238 fn outputs(&self) -> usize {
239 self.inner.outputs()
240 }
241
242 fn get_internal_summary(
243 &mut self,
244 ) -> (
245 timely::progress::operate::Connectivity<T::Summary>,
246 Rc<RefCell<SharedProgress<T>>>,
247 ) {
248 self.inner.get_internal_summary()
249 }
250
251 fn local(&self) -> bool {
252 self.inner.local()
253 }
254
255 fn set_external_summary(&mut self) {
256 self.inner.set_external_summary()
257 }
258
259 fn notify_me(&self) -> bool {
260 self.inner.notify_me()
261 }
262}
263
264impl<O: Schedule> Schedule for LabelledOperator<O> {
265 fn name(&self) -> &str {
266 self.inner.name()
267 }
268
269 fn path(&self) -> &[usize] {
270 self.inner.path()
271 }
272
273 #[inline(always)]
274 fn schedule(&mut self) -> bool {
275 custom_labels::with_label("timely-scope", &self.label, || self.inner.schedule())
276 }
277}
278
279struct BoxedOperator<T>(Box<dyn Operate<T>>);
280
281impl<T: Timestamp> Operate<T> for BoxedOperator<T> {
282 fn inputs(&self) -> usize {
283 self.0.inputs()
284 }
285
286 fn outputs(&self) -> usize {
287 self.0.outputs()
288 }
289
290 fn get_internal_summary(
291 &mut self,
292 ) -> (
293 timely::progress::operate::Connectivity<<T as Timestamp>::Summary>,
294 Rc<RefCell<SharedProgress<T>>>,
295 ) {
296 self.0.get_internal_summary()
297 }
298
299 fn local(&self) -> bool {
300 self.0.local()
301 }
302
303 fn set_external_summary(&mut self) {
304 self.0.set_external_summary()
305 }
306
307 fn notify_me(&self) -> bool {
308 self.0.notify_me()
309 }
310}
311
312impl<T> Schedule for BoxedOperator<T> {
313 fn name(&self) -> &str {
314 self.0.name()
315 }
316
317 fn path(&self) -> &[usize] {
318 self.0.path()
319 }
320
321 #[inline(always)]
322 fn schedule(&mut self) -> bool {
323 self.0.schedule()
324 }
325}
326
327pub trait ScopeExt: Sized {
330 fn with_label(&mut self) -> LabelledScope<Self>;
331}
332
333impl<S> ScopeExt for S
334where
335 S: Scope + ScopeParent,
336{
337 fn with_label(&mut self) -> LabelledScope<Self> {
338 LabelledScope {
339 label: self.name(),
340 inner: self.clone(),
341 }
342 }
343}