Skip to main content

mz_timely_util/
activator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Utilities to activate dataflows based on external triggers.
17
18use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use timely::dataflow::Scope;
24use timely::dataflow::operators::generic::OperatorInfo;
25use timely::progress::Timestamp;
26use timely::scheduling::Activator;
27
28/// Generic activator behavior
29pub trait ActivatorTrait {
30    /// Trigger an activation of operators behind this activator.
31    fn activate(&self);
32
33    /// Acknowledge the receipt of activations from within an operator.
34    fn ack(&self);
35
36    /// Register a new operator with its path with this activator.
37    fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>);
38}
39
40/// An shared handle to multiple activators with support for triggering and acknowledging
41/// activations.
42///
43/// Activations are only triggered once the `activate` function has been called at least `threshold`
44/// times, and then not again until `ack` is called. This way, the [RcActivator] ensures two
45/// properties:
46/// * It does not enqueue more than one activation per activator, if there is only one activator
47///   registered with this [RcActivator]. Once multiple activators are registered, any ack will
48///   enable more activations.
49/// * The threshold to activation avoids activations purely caused by previous activations. Each
50///   scheduling of a logging dataflow potentially creates additional log data, which needs to be
51///   processed. The threshold should ensure that multiple activations under no load cause the
52///   dataflow to be scheduled. For Materialize's log dataflows, this number seems to be larger than
53///   32, below we might risk that we do not cause monotonically decreasing work. A value of 64 or
54///   larger is recommended, as there is no harm in bigger values. The log dataflow will still pick
55///   up all its inputs once every introspection interval, and this activator only creates
56///   additional activations.
57#[derive(Debug, Clone)]
58pub struct RcActivator {
59    inner: Rc<RefCell<ActivatorInner>>,
60}
61
62impl RcActivator {
63    /// Construct a new [RcActivator] with the given name and threshold.
64    ///
65    /// The threshold determines now many activations to ignore until scheduling the activation.
66    pub fn new(name: String, threshold: usize) -> Self {
67        let inner = ActivatorInner::new(name, threshold);
68        Self {
69            inner: Rc::new(RefCell::new(inner)),
70        }
71    }
72
73    /// Register an additional [Activator] with this [RcActivator].
74    pub fn register(&self, activator: Activator) {
75        self.inner.borrow_mut().register(activator)
76    }
77
78    /// Activate all contained activators.
79    ///
80    /// The implementation is free to ignore activations and only release them once a sufficient
81    /// volume has been accumulated.
82    pub fn activate(&self) {
83        self.inner.borrow_mut().activate()
84    }
85
86    /// Acknowledge the activation, which enables new activations to be scheduled.
87    pub fn ack(&self) {
88        self.inner.borrow_mut().ack()
89    }
90}
91
92impl ActivatorTrait for RcActivator {
93    fn activate(&self) {
94        self.activate()
95    }
96
97    fn ack(&self) {
98        self.ack()
99    }
100
101    fn register<'scope, T: Timestamp>(&self, scope: Scope<'scope, T>, path: Rc<[usize]>) {
102        self.register(scope.activator_for(path))
103    }
104}
105
106#[derive(Debug)]
107struct ActivatorInner {
108    activated: usize,
109    activators: Vec<Activator>,
110    _name: String,
111    threshold: usize,
112}
113
114impl ActivatorInner {
115    fn new(name: String, threshold: usize) -> Self {
116        Self {
117            _name: name,
118            threshold,
119            activated: 0,
120            activators: Vec::new(),
121        }
122    }
123
124    fn register(&mut self, activator: Activator) {
125        self.activators.push(activator)
126    }
127
128    fn activate(&mut self) {
129        if self.activators.is_empty() {
130            return;
131        }
132        self.activated += 1;
133        if self.activated == self.threshold {
134            for activator in &self.activators {
135                activator.activate();
136            }
137        }
138    }
139
140    fn ack(&mut self) {
141        self.activated = 0;
142    }
143}
144
145/// A [`timely::scheduling::SyncActivator`] wrapper that coalesces multiple activations.
146///
147/// Tasks can call [`ArcActivator::activate`] liberally. The actual
148/// `SyncActivator::activate` call is suppressed if the operator has not yet run since the
149/// last activation. The Timely operator holds the corresponding [`ActivationAck`] and calls
150/// [`ActivationAck::ack`] each time it is scheduled, resetting the flag so the next
151/// `activate` call goes through.
152pub struct ArcActivator {
153    inner: timely::scheduling::SyncActivator,
154    /// `true` if the operator has been activated but not yet scheduled.
155    pending: Arc<AtomicBool>,
156}
157
158/// Handle held by the Timely operator to acknowledge activations from a [`ArcActivator`].
159///
160/// Call [`ActivationAck::ack`] at the start of each operator scheduling to allow the
161/// [`ArcActivator`] to send new activations.
162pub struct ActivationAck(Arc<AtomicBool>);
163
164impl ArcActivator {
165    /// Create a new [`ArcActivator`] and its corresponding [`ActivationAck`].
166    pub fn new<'scope, T: Timestamp>(
167        scope: Scope<'scope, T>,
168        info: &OperatorInfo,
169    ) -> (Self, ActivationAck) {
170        let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
171        let pending = Arc::new(AtomicBool::new(false));
172        (
173            Self {
174                inner: sync_activator,
175                pending: Arc::clone(&pending),
176            },
177            ActivationAck(pending),
178        )
179    }
180
181    /// Activate the Timely operator, unless an activation is already pending.
182    pub fn activate(&self) {
183        // Acquire ensures we see the latest `ack` store before deciding to skip.
184        if !self.pending.swap(true, Ordering::AcqRel) {
185            let _ = self.inner.activate();
186        }
187    }
188}
189
190impl ActivationAck {
191    /// Acknowledge the activation, allowing the [`ArcActivator`] to activate again.
192    ///
193    /// Call this at the start of each operator scheduling.
194    pub fn ack(&self) {
195        // Release ensures the `false` is visible to the Tokio task's `activate`.
196        self.0.store(false, Ordering::Release);
197    }
198}