mz_timely_util/
probe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::cell::RefCell;
17use std::convert::Infallible;
18use std::rc::Rc;
19
20use timely::container::CapacityContainerBuilder;
21use timely::dataflow::operators::{CapabilitySet, InspectCore};
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::progress::Timestamp;
24use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
25use timely::scheduling::Activator;
26use timely::{Container, Data, PartialOrder};
27use tokio::sync::Notify;
28
29use crate::builder_async::OperatorBuilder as AsyncOperatorBuilder;
30
31/// Monitors progress at a `Stream`.
32pub trait ProbeNotify<G: Scope> {
33    /// Inserts a collection of progress probe in a stream.
34    fn probe_notify_with(&self, handles: Vec<Handle<G::Timestamp>>) -> Self;
35}
36
37impl<G, C> ProbeNotify<G> for StreamCore<G, C>
38where
39    G: Scope,
40    C: Container + Data,
41{
42    fn probe_notify_with(&self, mut handles: Vec<Handle<G::Timestamp>>) -> Self {
43        if handles.is_empty() {
44            return self.clone();
45        }
46        // We need to reset the handle's frontier from the empty one to the minimal one, to enable
47        // downgrading.
48        for handle in &mut handles {
49            handle.update_frontier(&[Timestamp::minimum()]);
50        }
51
52        // TODO: This operator observes but doesn't consume data.
53        // Instead, it should only observe progress statements.
54        self.inspect_container(move |update| {
55            if let Err(frontier) = update {
56                for handle in &mut handles {
57                    handle.update_frontier(frontier);
58                }
59            }
60        })
61    }
62}
63
64#[derive(Debug)]
65pub struct Handle<T: Timestamp> {
66    /// The overall shared frontier managed by all the handles
67    frontier: Rc<RefCell<MutableAntichain<T>>>,
68    /// The private frontier containing the changes produced by this handle only
69    handle_frontier: Antichain<T>,
70    notify: Rc<Notify>,
71    /// Activators to notify when the frontier progresses
72    activators: Rc<RefCell<Vec<Activator>>>,
73}
74
75impl<T: Timestamp> Default for Handle<T> {
76    fn default() -> Self {
77        // Initialize the handle frontier to the empty frontier, to prevent it from unintentionally
78        // holding back the global frontier. Only when a handle is used to probe a stream do we
79        // reset its frontier to the minimal one.
80        Handle {
81            frontier: Rc::new(RefCell::new(MutableAntichain::new())),
82            handle_frontier: Antichain::new(),
83            notify: Rc::new(Notify::new()),
84            activators: Rc::default(),
85        }
86    }
87}
88
89impl<T: Timestamp> Handle<T> {
90    /// Wait for the frontier monitored by this probe to progress
91    pub async fn progressed(&self) {
92        self.notify.notified().await
93    }
94
95    /// Returns true iff the frontier is strictly less than `time`.
96    #[inline]
97    pub fn less_than(&self, time: &T) -> bool {
98        self.frontier.borrow().less_than(time)
99    }
100    /// Returns true iff the frontier is less than or equal to `time`.
101    #[inline]
102    pub fn less_equal(&self, time: &T) -> bool {
103        self.frontier.borrow().less_equal(time)
104    }
105    /// Returns true iff the frontier is empty.
106    #[inline]
107    pub fn done(&self) -> bool {
108        self.frontier.borrow().is_empty()
109    }
110
111    /// Invokes a method on the frontier, returning its result.
112    ///
113    /// This method allows inspection of the frontier, which cannot be returned by reference as
114    /// it is on the other side of a `RefCell`.
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// use mz_timely_util::probe::Handle;
120    ///
121    /// let handle = Handle::<usize>::default();
122    /// let frontier = handle.with_frontier(|frontier| frontier.to_vec());
123    /// ```
124    #[inline]
125    pub fn with_frontier<R, F: FnMut(AntichainRef<T>) -> R>(&self, mut function: F) -> R {
126        function(self.frontier.borrow().frontier())
127    }
128
129    #[inline]
130    fn update_frontier(&mut self, new_frontier: &[T]) {
131        let mut frontier = self.frontier.borrow_mut();
132        let changes = frontier.update_iter(
133            self.handle_frontier
134                .iter()
135                .map(|t| (t.clone(), -1))
136                .chain(new_frontier.iter().map(|t| (t.clone(), 1))),
137        );
138        self.handle_frontier.clear();
139        self.handle_frontier.extend(new_frontier.iter().cloned());
140        if changes.count() > 0 {
141            self.notify.notify_waiters();
142            for activator in self.activators.borrow().iter() {
143                activator.activate();
144            }
145        }
146    }
147
148    /// Register an activator to be notified when the frontier progresses
149    pub fn activate(&self, activator: Activator) {
150        self.activators.borrow_mut().push(activator);
151    }
152}
153
154impl<T: Timestamp> Drop for Handle<T> {
155    fn drop(&mut self) {
156        // This handle is being dropped so remove it from the overall calculation
157        self.frontier
158            .borrow_mut()
159            .update_iter(self.handle_frontier.iter().map(|t| (t.clone(), -1)));
160    }
161}
162
163impl<T: Timestamp> Clone for Handle<T> {
164    fn clone(&self) -> Self {
165        Handle {
166            frontier: Rc::clone(&self.frontier),
167            handle_frontier: Antichain::new(),
168            notify: Rc::clone(&self.notify),
169            activators: Rc::clone(&self.activators),
170        }
171    }
172}
173
174/// Creates a stream that flows progress updates from a probe.
175///
176/// The returned stream is guaranteed to never yield any data updates, as is reflected by its type.
177// TODO: Replace `Infallible` with `!` once the latter stabilizes.
178pub fn source<G, T>(scope: G, name: String, handle: Handle<T>) -> Stream<G, Infallible>
179where
180    G: Scope<Timestamp = T>,
181    T: Timestamp,
182{
183    let mut builder = AsyncOperatorBuilder::new(name, scope);
184    let (_output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
185
186    builder.build(move |capabilities| async move {
187        let mut cap_set = CapabilitySet::from(capabilities);
188        let mut frontier = Antichain::from_elem(T::minimum());
189
190        let mut downgrade_capability = |f: AntichainRef<T>| {
191            if PartialOrder::less_than(&frontier.borrow(), &f) {
192                frontier = f.to_owned();
193                cap_set.downgrade(&f);
194            }
195            !frontier.is_empty()
196        };
197
198        while handle.with_frontier(&mut downgrade_capability) {
199            handle.progressed().await;
200        }
201    });
202
203    output_stream
204}