mz_timely_util/activator.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Utilities to activate dataflows based on external triggers.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20
21use timely::dataflow::Scope;
22use timely::scheduling::Activator;
23
24/// Generic activator behavior
25pub trait ActivatorTrait {
26 /// Trigger an activation of operators behind this activator.
27 fn activate(&self);
28
29 /// Acknowledge the receipt of activations from within an operator.
30 fn ack(&self);
31
32 /// Register a new operator with its path with this activator.
33 fn register<S: Scope>(&self, scope: &mut S, path: Rc<[usize]>);
34}
35
36/// An shared handle to multiple activators with support for triggering and acknowledging
37/// activations.
38///
39/// Activations are only triggered once the `activate` function has been called at least `threshold`
40/// times, and then not again until `ack` is called. This way, the [RcActivator] ensures two
41/// properties:
42/// * It does not enqueue more than one activation per activator, if there is only one activator
43/// registered with this [RcActivator]. Once multiple activators are registered, any ack will
44/// enable more activations.
45/// * The threshold to activation avoids activations purely caused by previous activations. Each
46/// scheduling of a logging dataflow potentially creates additional log data, which needs to be
47/// processed. The threshold should ensure that multiple activations under no load cause the
48/// dataflow to be scheduled. For Materialize's log dataflows, this number seems to be larger than
49/// 32, below we might risk that we do not cause monotonically decreasing work. A value of 64 or
50/// larger is recommended, as there is no harm in bigger values. The log dataflow will still pick
51/// up all its inputs once every introspection interval, and this activator only creates
52/// additional activations.
53#[derive(Debug, Clone)]
54pub struct RcActivator {
55 inner: Rc<RefCell<ActivatorInner>>,
56}
57
58impl RcActivator {
59 /// Construct a new [RcActivator] with the given name and threshold.
60 ///
61 /// The threshold determines now many activations to ignore until scheduling the activation.
62 pub fn new(name: String, threshold: usize) -> Self {
63 let inner = ActivatorInner::new(name, threshold);
64 Self {
65 inner: Rc::new(RefCell::new(inner)),
66 }
67 }
68
69 /// Register an additional [Activator] with this [RcActivator].
70 pub fn register(&self, activator: Activator) {
71 self.inner.borrow_mut().register(activator)
72 }
73
74 /// Activate all contained activators.
75 ///
76 /// The implementation is free to ignore activations and only release them once a sufficient
77 /// volume has been accumulated.
78 pub fn activate(&self) {
79 self.inner.borrow_mut().activate()
80 }
81
82 /// Acknowledge the activation, which enables new activations to be scheduled.
83 pub fn ack(&self) {
84 self.inner.borrow_mut().ack()
85 }
86}
87
88impl ActivatorTrait for RcActivator {
89 fn activate(&self) {
90 self.activate()
91 }
92
93 fn ack(&self) {
94 self.ack()
95 }
96
97 fn register<S: Scope>(&self, scope: &mut S, path: Rc<[usize]>) {
98 self.register(scope.activator_for(path))
99 }
100}
101
102#[derive(Debug)]
103struct ActivatorInner {
104 activated: usize,
105 activators: Vec<Activator>,
106 _name: String,
107 threshold: usize,
108}
109
110impl ActivatorInner {
111 fn new(name: String, threshold: usize) -> Self {
112 Self {
113 _name: name,
114 threshold,
115 activated: 0,
116 activators: Vec::new(),
117 }
118 }
119
120 fn register(&mut self, activator: Activator) {
121 self.activators.push(activator)
122 }
123
124 fn activate(&mut self) {
125 if self.activators.is_empty() {
126 return;
127 }
128 self.activated += 1;
129 if self.activated == self.threshold {
130 for activator in &self.activators {
131 activator.activate();
132 }
133 }
134 }
135
136 fn ack(&mut self) {
137 self.activated = 0;
138 }
139}