mz_timely_util/activator.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Utilities to activate dataflows based on external triggers.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use timely::dataflow::Scope;
24use timely::dataflow::operators::generic::OperatorInfo;
25use timely::progress::Timestamp;
26use timely::scheduling::Activator;
27
28/// Generic activator behavior
29pub trait ActivatorTrait {
30 /// Trigger an activation of operators behind this activator.
31 fn activate(&self);
32
33 /// Acknowledge the receipt of activations from within an operator.
34 fn ack(&self);
35
36 /// Register a new operator with its path with this activator.
37 fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>);
38}
39
40/// An shared handle to multiple activators with support for triggering and acknowledging
41/// activations.
42///
43/// Activations are only triggered once the `activate` function has been called at least `threshold`
44/// times, and then not again until `ack` is called. This way, the [RcActivator] ensures two
45/// properties:
46/// * It does not enqueue more than one activation per activator, if there is only one activator
47/// registered with this [RcActivator]. Once multiple activators are registered, any ack will
48/// enable more activations.
49/// * The threshold to activation avoids activations purely caused by previous activations. Each
50/// scheduling of a logging dataflow potentially creates additional log data, which needs to be
51/// processed. The threshold should ensure that multiple activations under no load cause the
52/// dataflow to be scheduled. For Materialize's log dataflows, this number seems to be larger than
53/// 32, below we might risk that we do not cause monotonically decreasing work. A value of 64 or
54/// larger is recommended, as there is no harm in bigger values. The log dataflow will still pick
55/// up all its inputs once every introspection interval, and this activator only creates
56/// additional activations.
57#[derive(Debug, Clone)]
58pub struct RcActivator {
59 inner: Rc<RefCell<ActivatorInner>>,
60}
61
62impl RcActivator {
63 /// Construct a new [RcActivator] with the given name and threshold.
64 ///
65 /// The threshold determines now many activations to ignore until scheduling the activation.
66 pub fn new(name: String, threshold: usize) -> Self {
67 let inner = ActivatorInner::new(name, threshold);
68 Self {
69 inner: Rc::new(RefCell::new(inner)),
70 }
71 }
72
73 /// Register an additional [Activator] with this [RcActivator].
74 pub fn register(&self, activator: Activator) {
75 self.inner.borrow_mut().register(activator)
76 }
77
78 /// Activate all contained activators.
79 ///
80 /// The implementation is free to ignore activations and only release them once a sufficient
81 /// volume has been accumulated.
82 pub fn activate(&self) {
83 self.inner.borrow_mut().activate()
84 }
85
86 /// Acknowledge the activation, which enables new activations to be scheduled.
87 pub fn ack(&self) {
88 self.inner.borrow_mut().ack()
89 }
90}
91
92impl ActivatorTrait for RcActivator {
93 fn activate(&self) {
94 self.activate()
95 }
96
97 fn ack(&self) {
98 self.ack()
99 }
100
101 fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>) {
102 self.register(scope.activator_for(path))
103 }
104}
105
106#[derive(Debug)]
107struct ActivatorInner {
108 activated: usize,
109 activators: Vec<Activator>,
110 _name: String,
111 threshold: usize,
112}
113
114impl ActivatorInner {
115 fn new(name: String, threshold: usize) -> Self {
116 Self {
117 _name: name,
118 threshold,
119 activated: 0,
120 activators: Vec::new(),
121 }
122 }
123
124 fn register(&mut self, activator: Activator) {
125 self.activators.push(activator)
126 }
127
128 fn activate(&mut self) {
129 if self.activators.is_empty() {
130 return;
131 }
132 self.activated += 1;
133 if self.activated == self.threshold {
134 for activator in &self.activators {
135 activator.activate();
136 }
137 }
138 }
139
140 fn ack(&mut self) {
141 self.activated = 0;
142 }
143}
144
145/// A [`timely::scheduling::SyncActivator`] wrapper that coalesces multiple activations.
146///
147/// Tasks can call [`ArcActivator::activate`] liberally. The actual
148/// `SyncActivator::activate` call is suppressed if the operator has not yet run since the
149/// last activation. The Timely operator holds the corresponding [`ActivationAck`] and calls
150/// [`ActivationAck::ack`] each time it is scheduled, resetting the flag so the next
151/// `activate` call goes through.
152pub struct ArcActivator {
153 inner: timely::scheduling::SyncActivator,
154 /// `true` if the operator has been activated but not yet scheduled.
155 pending: Arc<AtomicBool>,
156}
157
158/// Handle held by the Timely operator to acknowledge activations from a [`ArcActivator`].
159///
160/// Call [`ActivationAck::ack`] at the start of each operator scheduling to allow the
161/// [`ArcActivator`] to send new activations.
162pub struct ActivationAck(Arc<AtomicBool>);
163
164impl ArcActivator {
165 /// Create a new [`ArcActivator`] and its corresponding [`ActivationAck`].
166 pub fn new<'scope, T: Timestamp>(
167 scope: Scope<'scope, T>,
168 info: &OperatorInfo,
169 ) -> (Self, ActivationAck) {
170 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
171 let pending = Arc::new(AtomicBool::new(false));
172 (
173 Self {
174 inner: sync_activator,
175 pending: Arc::clone(&pending),
176 },
177 ActivationAck(pending),
178 )
179 }
180
181 /// Activate the Timely operator, unless an activation is already pending.
182 pub fn activate(&self) {
183 // Acquire ensures we see the latest `ack` store before deciding to skip.
184 if !self.pending.swap(true, Ordering::AcqRel) {
185 let _ = self.inner.activate();
186 }
187 }
188}
189
190impl ActivationAck {
191 /// Acknowledge the activation, allowing the [`ArcActivator`] to activate again.
192 ///
193 /// Call this at the start of each operator scheduling.
194 pub fn ack(&self) {
195 // Release ensures the `false` is visible to the Tokio task's `activate`.
196 self.0.store(false, Ordering::Release);
197 }
198}