Skip to main content

mz_timely_util/
scope_label.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Scopes with profiling labels set at schedule time.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20
21use timely::dataflow::Scope;
22use timely::dataflow::scopes::Child;
23use timely::dataflow::scopes::ScopeParent;
24use timely::progress::operate::SharedProgress;
25use timely::progress::timestamp::Refines;
26use timely::progress::{Operate, SubgraphBuilder, Timestamp};
27use timely::scheduling::Schedule;
28use timely::scheduling::Scheduler;
29use timely::worker::AsWorker;
30
31/// A wrapper around a timely [`Scope`] that that sets its name as a profiling label before
32/// scheduling its child operators.
33#[derive(Clone)]
34pub struct LabelledScope<G> {
35    /// Label value to set when an operator is scheduled.
36    label: String,
37    /// The inner scope.
38    inner: G,
39}
40
41impl<'a, G, T> LabelledScope<Child<'a, G, T>>
42where
43    G: ScopeParent,
44    T: Timestamp + Refines<G::Timestamp>,
45{
46    /// A reference of the child’s parent scope.
47    pub fn parent(&self) -> &G {
48        &self.inner.parent
49    }
50}
51
52impl<G: Scheduler> Scheduler for LabelledScope<G> {
53    fn activations(&self) -> Rc<RefCell<timely::scheduling::Activations>> {
54        self.inner.activations()
55    }
56
57    fn activator_for(&self, path: Rc<[usize]>) -> timely::scheduling::Activator {
58        self.inner.activator_for(path)
59    }
60
61    fn sync_activator_for(&self, path: Vec<usize>) -> timely::scheduling::SyncActivator {
62        self.inner.sync_activator_for(path)
63    }
64}
65
66impl<G: AsWorker> AsWorker for LabelledScope<G> {
67    fn config(&self) -> &timely::WorkerConfig {
68        self.inner.config()
69    }
70
71    fn index(&self) -> usize {
72        self.inner.index()
73    }
74
75    fn peers(&self) -> usize {
76        self.inner.peers()
77    }
78
79    fn allocate<T: timely::communication::Exchangeable>(
80        &mut self,
81        identifier: usize,
82        address: Rc<[usize]>,
83    ) -> (
84        Vec<Box<dyn timely::communication::Push<T>>>,
85        Box<dyn timely::communication::Pull<T>>,
86    ) {
87        self.inner.allocate(identifier, address)
88    }
89
90    fn pipeline<T: 'static>(
91        &mut self,
92        identifier: usize,
93        address: Rc<[usize]>,
94    ) -> (
95        timely::communication::allocator::thread::ThreadPusher<T>,
96        timely::communication::allocator::thread::ThreadPuller<T>,
97    ) {
98        self.inner.pipeline(identifier, address)
99    }
100
101    fn broadcast<T: timely::communication::Exchangeable + Clone>(
102        &mut self,
103        identifier: usize,
104        address: Rc<[usize]>,
105    ) -> (
106        Box<dyn timely::communication::Push<T>>,
107        Box<dyn timely::communication::Pull<T>>,
108    ) {
109        self.inner.broadcast(identifier, address)
110    }
111
112    fn new_identifier(&mut self) -> usize {
113        self.inner.new_identifier()
114    }
115
116    fn peek_identifier(&self) -> usize {
117        self.inner.peek_identifier()
118    }
119
120    fn log_register(&self) -> Option<std::cell::RefMut<'_, timely::logging_core::Registry>> {
121        self.inner.log_register()
122    }
123
124    fn logger_for<CB: timely::ContainerBuilder>(
125        &self,
126        name: &str,
127    ) -> Option<timely::logging_core::Logger<CB>> {
128        self.inner.logger_for(name)
129    }
130
131    fn logging(&self) -> Option<timely::logging::TimelyLogger> {
132        self.inner.logging()
133    }
134}
135
136impl<G: ScopeParent> ScopeParent for LabelledScope<G> {
137    type Timestamp = G::Timestamp;
138}
139
140impl<'a, G, T> Scope for LabelledScope<Child<'a, G, T>>
141where
142    G: ScopeParent,
143    T: Timestamp + Refines<G::Timestamp>,
144{
145    fn name(&self) -> String {
146        self.inner.name()
147    }
148
149    fn addr(&self) -> Rc<[usize]> {
150        self.inner.addr()
151    }
152
153    fn addr_for_child(&self, index: usize) -> Rc<[usize]> {
154        self.inner.addr_for_child(index)
155    }
156
157    fn add_edge(&self, source: timely::progress::Source, target: timely::progress::Target) {
158        self.inner.add_edge(source, target)
159    }
160
161    fn allocate_operator_index(&mut self) -> usize {
162        self.inner.allocate_operator_index()
163    }
164
165    fn add_operator_with_indices(
166        &mut self,
167        operator: Box<dyn Operate<Self::Timestamp>>,
168        local: usize,
169        global: usize,
170    ) {
171        let operator = LabelledOperator::new(&self.label, BoxedOperator(operator));
172        self.inner
173            .add_operator_with_indices(Box::new(operator), local, global)
174    }
175
176    fn scoped<T2, R, F>(&mut self, name: &str, func: F) -> R
177    where
178        T2: Timestamp + Refines<<Self as ScopeParent>::Timestamp>,
179        F: FnOnce(&mut Child<Self, T2>) -> R,
180    {
181        let index = self.inner.subgraph.borrow_mut().allocate_child_id();
182        let identifier = self.new_identifier();
183        let path = self.addr_for_child(index);
184
185        let type_name = std::any::type_name::<T2>();
186        let progress_logging = self.logger_for(&format!("timely/progress/{type_name}"));
187        let summary_logging = self.logger_for(&format!("timely/summary/{type_name}"));
188
189        let subscope = RefCell::new(SubgraphBuilder::new_from(
190            path,
191            identifier,
192            self.logging(),
193            summary_logging,
194            name,
195        ));
196        let result = {
197            let mut builder = Child {
198                subgraph: &subscope,
199                parent: self.clone(),
200                logging: self.inner.logging.clone(),
201                progress_logging,
202            };
203            func(&mut builder)
204        };
205        let subscope = subscope.into_inner().build(self);
206        let subscope = LabelledOperator::new(&self.label, subscope);
207
208        self.inner
209            .add_operator_with_indices(Box::new(subscope), index, identifier);
210
211        result
212    }
213}
214
215/// A wrapper around a type implementing `Operate` that sets a profiling label every time the
216/// operator is scheduled.
217pub struct LabelledOperator<O> {
218    /// Label value to set when the operator is scheduled.
219    label: String,
220    /// The inner operator.
221    inner: O,
222}
223
224impl<O> LabelledOperator<O> {
225    fn new(label: &str, operator: O) -> Self {
226        LabelledOperator {
227            label: label.to_owned(),
228            inner: operator,
229        }
230    }
231}
232
233impl<T: Timestamp, O: Operate<T>> Operate<T> for LabelledOperator<O> {
234    fn inputs(&self) -> usize {
235        self.inner.inputs()
236    }
237
238    fn outputs(&self) -> usize {
239        self.inner.outputs()
240    }
241
242    fn get_internal_summary(
243        &mut self,
244    ) -> (
245        timely::progress::operate::Connectivity<T::Summary>,
246        Rc<RefCell<SharedProgress<T>>>,
247    ) {
248        self.inner.get_internal_summary()
249    }
250
251    fn local(&self) -> bool {
252        self.inner.local()
253    }
254
255    fn set_external_summary(&mut self) {
256        self.inner.set_external_summary()
257    }
258
259    fn notify_me(&self) -> bool {
260        self.inner.notify_me()
261    }
262}
263
264impl<O: Schedule> Schedule for LabelledOperator<O> {
265    fn name(&self) -> &str {
266        self.inner.name()
267    }
268
269    fn path(&self) -> &[usize] {
270        self.inner.path()
271    }
272
273    #[inline(always)]
274    fn schedule(&mut self) -> bool {
275        custom_labels::with_label("timely-scope", &self.label, || self.inner.schedule())
276    }
277}
278
279struct BoxedOperator<T>(Box<dyn Operate<T>>);
280
281impl<T: Timestamp> Operate<T> for BoxedOperator<T> {
282    fn inputs(&self) -> usize {
283        self.0.inputs()
284    }
285
286    fn outputs(&self) -> usize {
287        self.0.outputs()
288    }
289
290    fn get_internal_summary(
291        &mut self,
292    ) -> (
293        timely::progress::operate::Connectivity<<T as Timestamp>::Summary>,
294        Rc<RefCell<SharedProgress<T>>>,
295    ) {
296        self.0.get_internal_summary()
297    }
298
299    fn local(&self) -> bool {
300        self.0.local()
301    }
302
303    fn set_external_summary(&mut self) {
304        self.0.set_external_summary()
305    }
306
307    fn notify_me(&self) -> bool {
308        self.0.notify_me()
309    }
310}
311
312impl<T> Schedule for BoxedOperator<T> {
313    fn name(&self) -> &str {
314        self.0.name()
315    }
316
317    fn path(&self) -> &[usize] {
318        self.0.path()
319    }
320
321    #[inline(always)]
322    fn schedule(&mut self) -> bool {
323        self.0.schedule()
324    }
325}
326
327/// Extension trait for timely [`Scope`] that allows one to convert a scope into one that sets its
328/// name as a profiling label before scheduling its child operators.
329pub trait ScopeExt: Sized {
330    fn with_label(&mut self) -> LabelledScope<Self>;
331}
332
333impl<S> ScopeExt for S
334where
335    S: Scope + ScopeParent,
336{
337    fn with_label(&mut self) -> LabelledScope<Self> {
338        LabelledScope {
339            label: self.name(),
340            inner: self.clone(),
341        }
342    }
343}