mz_timely_util/activator.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Utilities to activate dataflows based on external triggers.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20
21use timely::dataflow::Scope;
22use timely::scheduling::Activator;
23
24/// Generic activator behavior
25pub trait ActivatorTrait {
26    /// Trigger an activation of operators behind this activator.
27    fn activate(&self);
28
29    /// Acknowledge the receipt of activations from within an operator.
30    fn ack(&self);
31
32    /// Register a new operator with its path with this activator.
33    fn register<S: Scope>(&self, scope: &mut S, path: Rc<[usize]>);
34}
35
36/// An shared handle to multiple activators with support for triggering and acknowledging
37/// activations.
38///
39/// Activations are only triggered once the `activate` function has been called at least `threshold`
40/// times, and then not again until `ack` is called. This way, the [RcActivator] ensures two
41/// properties:
42/// * It does not enqueue more than one activation per activator, if there is only one activator
43///   registered with this [RcActivator]. Once multiple activators are registered, any ack will
44///   enable more activations.
45/// * The threshold to activation avoids activations purely caused by previous activations. Each
46///   scheduling of a logging dataflow potentially creates additional log data, which needs to be
47///   processed. The threshold should ensure that multiple activations under no load cause the
48///   dataflow to be scheduled. For Materialize's log dataflows, this number seems to be larger than
49///   32, below we might risk that we do not cause monotonically decreasing work. A value of 64 or
50///   larger is recommended, as there is no harm in bigger values. The log dataflow will still pick
51///   up all its inputs once every introspection interval, and this activator only creates
52///   additional activations.
53#[derive(Debug, Clone)]
54pub struct RcActivator {
55    inner: Rc<RefCell<ActivatorInner>>,
56}
57
58impl RcActivator {
59    /// Construct a new [RcActivator] with the given name and threshold.
60    ///
61    /// The threshold determines now many activations to ignore until scheduling the activation.
62    pub fn new(name: String, threshold: usize) -> Self {
63        let inner = ActivatorInner::new(name, threshold);
64        Self {
65            inner: Rc::new(RefCell::new(inner)),
66        }
67    }
68
69    /// Register an additional [Activator] with this [RcActivator].
70    pub fn register(&self, activator: Activator) {
71        self.inner.borrow_mut().register(activator)
72    }
73
74    /// Activate all contained activators.
75    ///
76    /// The implementation is free to ignore activations and only release them once a sufficient
77    /// volume has been accumulated.
78    pub fn activate(&self) {
79        self.inner.borrow_mut().activate()
80    }
81
82    /// Acknowledge the activation, which enables new activations to be scheduled.
83    pub fn ack(&self) {
84        self.inner.borrow_mut().ack()
85    }
86}
87
88impl ActivatorTrait for RcActivator {
89    fn activate(&self) {
90        self.activate()
91    }
92
93    fn ack(&self) {
94        self.ack()
95    }
96
97    fn register<S: Scope>(&self, scope: &mut S, path: Rc<[usize]>) {
98        self.register(scope.activator_for(path))
99    }
100}
101
102#[derive(Debug)]
103struct ActivatorInner {
104    activated: usize,
105    activators: Vec<Activator>,
106    _name: String,
107    threshold: usize,
108}
109
110impl ActivatorInner {
111    fn new(name: String, threshold: usize) -> Self {
112        Self {
113            _name: name,
114            threshold,
115            activated: 0,
116            activators: Vec::new(),
117        }
118    }
119
120    fn register(&mut self, activator: Activator) {
121        self.activators.push(activator)
122    }
123
124    fn activate(&mut self) {
125        if self.activators.is_empty() {
126            return;
127        }
128        self.activated += 1;
129        if self.activated == self.threshold {
130            for activator in &self.activators {
131                activator.activate();
132            }
133        }
134    }
135
136    fn ack(&mut self) {
137        self.activated = 0;
138    }
139}