Skip to main content

mz_timely_util/
scope_label.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Scopes with profiling labels set at schedule time.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20
21use timely::dataflow::Scope;
22use timely::dataflow::scopes::Child;
23use timely::dataflow::scopes::ScopeParent;
24use timely::progress::operate::{Connectivity, FrontierInterest, SharedProgress};
25use timely::progress::timestamp::Refines;
26use timely::progress::{Operate, SubgraphBuilder, Timestamp};
27use timely::scheduling::Schedule;
28use timely::scheduling::Scheduler;
29use timely::worker::AsWorker;
30
31/// A wrapper around a timely [`Scope`] that that sets its name as a profiling label before
32/// scheduling its child operators.
33#[derive(Clone)]
34pub struct LabelledScope<G> {
35    /// Label value to set when an operator is scheduled.
36    label: String,
37    /// The inner scope.
38    inner: G,
39}
40
41impl<'a, G, T> LabelledScope<Child<'a, G, T>>
42where
43    G: ScopeParent,
44    T: Timestamp + Refines<G::Timestamp>,
45{
46    /// A reference of the child’s parent scope.
47    pub fn parent(&self) -> &G {
48        &self.inner.parent
49    }
50}
51
52impl<G: Scheduler> Scheduler for LabelledScope<G> {
53    fn activations(&self) -> Rc<RefCell<timely::scheduling::Activations>> {
54        self.inner.activations()
55    }
56
57    fn activator_for(&self, path: Rc<[usize]>) -> timely::scheduling::Activator {
58        self.inner.activator_for(path)
59    }
60
61    fn sync_activator_for(&self, path: Vec<usize>) -> timely::scheduling::SyncActivator {
62        self.inner.sync_activator_for(path)
63    }
64}
65
66impl<G: AsWorker> AsWorker for LabelledScope<G> {
67    fn config(&self) -> &timely::WorkerConfig {
68        self.inner.config()
69    }
70
71    fn index(&self) -> usize {
72        self.inner.index()
73    }
74
75    fn peers(&self) -> usize {
76        self.inner.peers()
77    }
78
79    fn allocate<T: timely::communication::Exchangeable>(
80        &mut self,
81        identifier: usize,
82        address: Rc<[usize]>,
83    ) -> (
84        Vec<Box<dyn timely::communication::Push<T>>>,
85        Box<dyn timely::communication::Pull<T>>,
86    ) {
87        self.inner.allocate(identifier, address)
88    }
89
90    fn pipeline<T: 'static>(
91        &mut self,
92        identifier: usize,
93        address: Rc<[usize]>,
94    ) -> (
95        timely::communication::allocator::thread::ThreadPusher<T>,
96        timely::communication::allocator::thread::ThreadPuller<T>,
97    ) {
98        self.inner.pipeline(identifier, address)
99    }
100
101    fn broadcast<T: timely::communication::Exchangeable + Clone>(
102        &mut self,
103        identifier: usize,
104        address: Rc<[usize]>,
105    ) -> (
106        Box<dyn timely::communication::Push<T>>,
107        Box<dyn timely::communication::Pull<T>>,
108    ) {
109        self.inner.broadcast(identifier, address)
110    }
111
112    fn new_identifier(&mut self) -> usize {
113        self.inner.new_identifier()
114    }
115
116    fn peek_identifier(&self) -> usize {
117        self.inner.peek_identifier()
118    }
119
120    fn log_register(&self) -> Option<std::cell::RefMut<'_, timely::logging_core::Registry>> {
121        self.inner.log_register()
122    }
123
124    fn logger_for<CB: timely::ContainerBuilder>(
125        &self,
126        name: &str,
127    ) -> Option<timely::logging_core::Logger<CB>> {
128        self.inner.logger_for(name)
129    }
130
131    fn logging(&self) -> Option<timely::logging::TimelyLogger> {
132        self.inner.logging()
133    }
134}
135
136impl<G: ScopeParent> ScopeParent for LabelledScope<G> {
137    type Timestamp = G::Timestamp;
138}
139
140impl<'a, G, T> Scope for LabelledScope<Child<'a, G, T>>
141where
142    G: ScopeParent,
143    T: Timestamp + Refines<G::Timestamp>,
144{
145    fn name(&self) -> String {
146        self.inner.name()
147    }
148
149    fn addr(&self) -> Rc<[usize]> {
150        self.inner.addr()
151    }
152
153    fn addr_for_child(&self, index: usize) -> Rc<[usize]> {
154        self.inner.addr_for_child(index)
155    }
156
157    fn add_edge(&self, source: timely::progress::Source, target: timely::progress::Target) {
158        self.inner.add_edge(source, target)
159    }
160
161    fn allocate_operator_index(&mut self) -> usize {
162        self.inner.allocate_operator_index()
163    }
164
165    fn add_operator_with_indices(
166        &mut self,
167        operator: Box<dyn Operate<Self::Timestamp>>,
168        local: usize,
169        global: usize,
170    ) {
171        let operator = LabelledOperator::new(&self.label, BoxedOperator(operator));
172        self.inner
173            .add_operator_with_indices(Box::new(operator), local, global)
174    }
175
176    fn scoped<T2, R, F>(&mut self, name: &str, func: F) -> R
177    where
178        T2: Timestamp + Refines<<Self as ScopeParent>::Timestamp>,
179        F: FnOnce(&mut Child<Self, T2>) -> R,
180    {
181        let index = self.inner.subgraph.borrow_mut().allocate_child_id();
182        let identifier = self.new_identifier();
183        let path = self.addr_for_child(index);
184
185        let type_name = std::any::type_name::<T2>();
186        let progress_logging = self.logger_for(&format!("timely/progress/{type_name}"));
187        let summary_logging = self.logger_for(&format!("timely/summary/{type_name}"));
188
189        let subscope = RefCell::new(SubgraphBuilder::new_from(
190            path,
191            identifier,
192            self.logging(),
193            summary_logging,
194            name,
195        ));
196        let result = {
197            let mut builder = Child {
198                subgraph: &subscope,
199                parent: self.clone(),
200                logging: self.inner.logging.clone(),
201                progress_logging,
202            };
203            func(&mut builder)
204        };
205        let subscope = subscope.into_inner().build(self);
206        let subscope = LabelledOperator::new(&self.label, subscope);
207
208        self.inner
209            .add_operator_with_indices(Box::new(subscope), index, identifier);
210
211        result
212    }
213}
214
215/// A wrapper around a type implementing `Operate` that sets a profiling label every time the
216/// operator is scheduled.
217pub struct LabelledOperator<O> {
218    /// Label value to set when the operator is scheduled.
219    label: String,
220    /// The inner operator.
221    inner: O,
222}
223
224impl<O> LabelledOperator<O> {
225    fn new(label: &str, operator: O) -> Self {
226        LabelledOperator {
227            label: label.to_owned(),
228            inner: operator,
229        }
230    }
231}
232
233impl<T: Timestamp, O: Operate<T>> Operate<T> for LabelledOperator<O> {
234    fn inputs(&self) -> usize {
235        self.inner.inputs()
236    }
237
238    fn outputs(&self) -> usize {
239        self.inner.outputs()
240    }
241
242    fn initialize(
243        self: Box<Self>,
244    ) -> (
245        Connectivity<T::Summary>,
246        Rc<RefCell<SharedProgress<T>>>,
247        Box<dyn Schedule>,
248    ) {
249        let label = self.label;
250        let (connectivity, shared, schedule) = Box::new(self.inner).initialize();
251        (
252            connectivity,
253            shared,
254            Box::new(LabelledSchedule {
255                label,
256                inner: schedule,
257            }),
258        )
259    }
260
261    fn local(&self) -> bool {
262        self.inner.local()
263    }
264
265    fn notify_me(&self) -> &[FrontierInterest] {
266        self.inner.notify_me()
267    }
268}
269
270/// A wrapper around a `Schedule` that sets a profiling label before scheduling.
271struct LabelledSchedule {
272    label: String,
273    inner: Box<dyn Schedule>,
274}
275
276impl Schedule for LabelledSchedule {
277    fn name(&self) -> &str {
278        self.inner.name()
279    }
280
281    fn path(&self) -> &[usize] {
282        self.inner.path()
283    }
284
285    #[inline(always)]
286    fn schedule(&mut self) -> bool {
287        custom_labels::with_label("timely-scope", &self.label, || self.inner.schedule())
288    }
289}
290
291struct BoxedOperator<T>(Box<dyn Operate<T>>);
292
293impl<T: Timestamp> Operate<T> for BoxedOperator<T> {
294    fn inputs(&self) -> usize {
295        self.0.inputs()
296    }
297
298    fn outputs(&self) -> usize {
299        self.0.outputs()
300    }
301
302    fn initialize(
303        self: Box<Self>,
304    ) -> (
305        Connectivity<T::Summary>,
306        Rc<RefCell<SharedProgress<T>>>,
307        Box<dyn Schedule>,
308    ) {
309        self.0.initialize()
310    }
311
312    fn local(&self) -> bool {
313        self.0.local()
314    }
315
316    fn notify_me(&self) -> &[FrontierInterest] {
317        self.0.notify_me()
318    }
319}
320
321/// Extension trait for timely [`Scope`] that allows one to convert a scope into one that sets its
322/// name as a profiling label before scheduling its child operators.
323pub trait ScopeExt: Sized {
324    fn with_label(&mut self) -> LabelledScope<Self>;
325}
326
327impl<S> ScopeExt for S
328where
329    S: Scope + ScopeParent,
330{
331    fn with_label(&mut self) -> LabelledScope<Self> {
332        LabelledScope {
333            label: self.name(),
334            inner: self.clone(),
335        }
336    }
337}