Skip to main content

mz_timely_util/
activator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Utilities to activate dataflows based on external triggers.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20
21use timely::dataflow::Scope;
22use timely::progress::Timestamp;
23use timely::scheduling::Activator;
24
25/// Generic activator behavior
26pub trait ActivatorTrait {
27    /// Trigger an activation of operators behind this activator.
28    fn activate(&self);
29
30    /// Acknowledge the receipt of activations from within an operator.
31    fn ack(&self);
32
33    /// Register a new operator with its path with this activator.
34    fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>);
35}
36
37/// An shared handle to multiple activators with support for triggering and acknowledging
38/// activations.
39///
40/// Activations are only triggered once the `activate` function has been called at least `threshold`
41/// times, and then not again until `ack` is called. This way, the [RcActivator] ensures two
42/// properties:
43/// * It does not enqueue more than one activation per activator, if there is only one activator
44///   registered with this [RcActivator]. Once multiple activators are registered, any ack will
45///   enable more activations.
46/// * The threshold to activation avoids activations purely caused by previous activations. Each
47///   scheduling of a logging dataflow potentially creates additional log data, which needs to be
48///   processed. The threshold should ensure that multiple activations under no load cause the
49///   dataflow to be scheduled. For Materialize's log dataflows, this number seems to be larger than
50///   32, below we might risk that we do not cause monotonically decreasing work. A value of 64 or
51///   larger is recommended, as there is no harm in bigger values. The log dataflow will still pick
52///   up all its inputs once every introspection interval, and this activator only creates
53///   additional activations.
54#[derive(Debug, Clone)]
55pub struct RcActivator {
56    inner: Rc<RefCell<ActivatorInner>>,
57}
58
59impl RcActivator {
60    /// Construct a new [RcActivator] with the given name and threshold.
61    ///
62    /// The threshold determines now many activations to ignore until scheduling the activation.
63    pub fn new(name: String, threshold: usize) -> Self {
64        let inner = ActivatorInner::new(name, threshold);
65        Self {
66            inner: Rc::new(RefCell::new(inner)),
67        }
68    }
69
70    /// Register an additional [Activator] with this [RcActivator].
71    pub fn register(&self, activator: Activator) {
72        self.inner.borrow_mut().register(activator)
73    }
74
75    /// Activate all contained activators.
76    ///
77    /// The implementation is free to ignore activations and only release them once a sufficient
78    /// volume has been accumulated.
79    pub fn activate(&self) {
80        self.inner.borrow_mut().activate()
81    }
82
83    /// Acknowledge the activation, which enables new activations to be scheduled.
84    pub fn ack(&self) {
85        self.inner.borrow_mut().ack()
86    }
87}
88
89impl ActivatorTrait for RcActivator {
90    fn activate(&self) {
91        self.activate()
92    }
93
94    fn ack(&self) {
95        self.ack()
96    }
97
98    fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>) {
99        self.register(scope.activator_for(path))
100    }
101}
102
103#[derive(Debug)]
104struct ActivatorInner {
105    activated: usize,
106    activators: Vec<Activator>,
107    _name: String,
108    threshold: usize,
109}
110
111impl ActivatorInner {
112    fn new(name: String, threshold: usize) -> Self {
113        Self {
114            _name: name,
115            threshold,
116            activated: 0,
117            activators: Vec::new(),
118        }
119    }
120
121    fn register(&mut self, activator: Activator) {
122        self.activators.push(activator)
123    }
124
125    fn activate(&mut self) {
126        if self.activators.is_empty() {
127            return;
128        }
129        self.activated += 1;
130        if self.activated == self.threshold {
131            for activator in &self.activators {
132                activator.activate();
133            }
134        }
135    }
136
137    fn ack(&mut self) {
138        self.activated = 0;
139    }
140}