mz_timely_util/activator.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Utilities to activate dataflows based on external triggers.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20
21use timely::dataflow::Scope;
22use timely::progress::Timestamp;
23use timely::scheduling::Activator;
24
25/// Generic activator behavior
26pub trait ActivatorTrait {
27 /// Trigger an activation of operators behind this activator.
28 fn activate(&self);
29
30 /// Acknowledge the receipt of activations from within an operator.
31 fn ack(&self);
32
33 /// Register a new operator with its path with this activator.
34 fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>);
35}
36
37/// An shared handle to multiple activators with support for triggering and acknowledging
38/// activations.
39///
40/// Activations are only triggered once the `activate` function has been called at least `threshold`
41/// times, and then not again until `ack` is called. This way, the [RcActivator] ensures two
42/// properties:
43/// * It does not enqueue more than one activation per activator, if there is only one activator
44/// registered with this [RcActivator]. Once multiple activators are registered, any ack will
45/// enable more activations.
46/// * The threshold to activation avoids activations purely caused by previous activations. Each
47/// scheduling of a logging dataflow potentially creates additional log data, which needs to be
48/// processed. The threshold should ensure that multiple activations under no load cause the
49/// dataflow to be scheduled. For Materialize's log dataflows, this number seems to be larger than
50/// 32, below we might risk that we do not cause monotonically decreasing work. A value of 64 or
51/// larger is recommended, as there is no harm in bigger values. The log dataflow will still pick
52/// up all its inputs once every introspection interval, and this activator only creates
53/// additional activations.
54#[derive(Debug, Clone)]
55pub struct RcActivator {
56 inner: Rc<RefCell<ActivatorInner>>,
57}
58
59impl RcActivator {
60 /// Construct a new [RcActivator] with the given name and threshold.
61 ///
62 /// The threshold determines now many activations to ignore until scheduling the activation.
63 pub fn new(name: String, threshold: usize) -> Self {
64 let inner = ActivatorInner::new(name, threshold);
65 Self {
66 inner: Rc::new(RefCell::new(inner)),
67 }
68 }
69
70 /// Register an additional [Activator] with this [RcActivator].
71 pub fn register(&self, activator: Activator) {
72 self.inner.borrow_mut().register(activator)
73 }
74
75 /// Activate all contained activators.
76 ///
77 /// The implementation is free to ignore activations and only release them once a sufficient
78 /// volume has been accumulated.
79 pub fn activate(&self) {
80 self.inner.borrow_mut().activate()
81 }
82
83 /// Acknowledge the activation, which enables new activations to be scheduled.
84 pub fn ack(&self) {
85 self.inner.borrow_mut().ack()
86 }
87}
88
89impl ActivatorTrait for RcActivator {
90 fn activate(&self) {
91 self.activate()
92 }
93
94 fn ack(&self) {
95 self.ack()
96 }
97
98 fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>) {
99 self.register(scope.activator_for(path))
100 }
101}
102
103#[derive(Debug)]
104struct ActivatorInner {
105 activated: usize,
106 activators: Vec<Activator>,
107 _name: String,
108 threshold: usize,
109}
110
111impl ActivatorInner {
112 fn new(name: String, threshold: usize) -> Self {
113 Self {
114 _name: name,
115 threshold,
116 activated: 0,
117 activators: Vec::new(),
118 }
119 }
120
121 fn register(&mut self, activator: Activator) {
122 self.activators.push(activator)
123 }
124
125 fn activate(&mut self) {
126 if self.activators.is_empty() {
127 return;
128 }
129 self.activated += 1;
130 if self.activated == self.threshold {
131 for activator in &self.activators {
132 activator.activate();
133 }
134 }
135 }
136
137 fn ack(&mut self) {
138 self.activated = 0;
139 }
140}