1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
1516use std::cell::RefCell;
17use std::convert::Infallible;
18use std::rc::Rc;
1920use timely::container::CapacityContainerBuilder;
21use timely::dataflow::operators::{CapabilitySet, InspectCore};
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::progress::Timestamp;
24use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
25use timely::scheduling::Activator;
26use timely::{Container, Data, PartialOrder};
27use tokio::sync::Notify;
2829use crate::builder_async::OperatorBuilder as AsyncOperatorBuilder;
3031/// Monitors progress at a `Stream`.
32pub trait ProbeNotify<G: Scope> {
33/// Inserts a collection of progress probe in a stream.
34fn probe_notify_with(&self, handles: Vec<Handle<G::Timestamp>>) -> Self;
35}
3637impl<G, C> ProbeNotify<G> for StreamCore<G, C>
38where
39G: Scope,
40 C: Container + Data,
41{
42fn probe_notify_with(&self, mut handles: Vec<Handle<G::Timestamp>>) -> Self {
43if handles.is_empty() {
44return self.clone();
45 }
46// We need to reset the handle's frontier from the empty one to the minimal one, to enable
47 // downgrading.
48for handle in &mut handles {
49 handle.update_frontier(&[Timestamp::minimum()]);
50 }
5152// TODO: This operator observes but doesn't consume data.
53 // Instead, it should only observe progress statements.
54self.inspect_container(move |update| {
55if let Err(frontier) = update {
56for handle in &mut handles {
57 handle.update_frontier(frontier);
58 }
59 }
60 })
61 }
62}
6364#[derive(Debug)]
65pub struct Handle<T: Timestamp> {
66/// The overall shared frontier managed by all the handles
67frontier: Rc<RefCell<MutableAntichain<T>>>,
68/// The private frontier containing the changes produced by this handle only
69handle_frontier: Antichain<T>,
70 notify: Rc<Notify>,
71/// Activators to notify when the frontier progresses
72activators: Rc<RefCell<Vec<Activator>>>,
73}
7475impl<T: Timestamp> Default for Handle<T> {
76fn default() -> Self {
77// Initialize the handle frontier to the empty frontier, to prevent it from unintentionally
78 // holding back the global frontier. Only when a handle is used to probe a stream do we
79 // reset its frontier to the minimal one.
80Handle {
81 frontier: Rc::new(RefCell::new(MutableAntichain::new())),
82 handle_frontier: Antichain::new(),
83 notify: Rc::new(Notify::new()),
84 activators: Rc::default(),
85 }
86 }
87}
8889impl<T: Timestamp> Handle<T> {
90/// Wait for the frontier monitored by this probe to progress
91pub async fn progressed(&self) {
92self.notify.notified().await
93}
9495/// Returns true iff the frontier is strictly less than `time`.
96#[inline]
97pub fn less_than(&self, time: &T) -> bool {
98self.frontier.borrow().less_than(time)
99 }
100/// Returns true iff the frontier is less than or equal to `time`.
101#[inline]
102pub fn less_equal(&self, time: &T) -> bool {
103self.frontier.borrow().less_equal(time)
104 }
105/// Returns true iff the frontier is empty.
106#[inline]
107pub fn done(&self) -> bool {
108self.frontier.borrow().is_empty()
109 }
110111/// Invokes a method on the frontier, returning its result.
112 ///
113 /// This method allows inspection of the frontier, which cannot be returned by reference as
114 /// it is on the other side of a `RefCell`.
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// use mz_timely_util::probe::Handle;
120 ///
121 /// let handle = Handle::<usize>::default();
122 /// let frontier = handle.with_frontier(|frontier| frontier.to_vec());
123 /// ```
124#[inline]
125pub fn with_frontier<R, F: FnMut(AntichainRef<T>) -> R>(&self, mut function: F) -> R {
126 function(self.frontier.borrow().frontier())
127 }
128129#[inline]
130fn update_frontier(&mut self, new_frontier: &[T]) {
131let mut frontier = self.frontier.borrow_mut();
132let changes = frontier.update_iter(
133self.handle_frontier
134 .iter()
135 .map(|t| (t.clone(), -1))
136 .chain(new_frontier.iter().map(|t| (t.clone(), 1))),
137 );
138self.handle_frontier.clear();
139self.handle_frontier.extend(new_frontier.iter().cloned());
140if changes.count() > 0 {
141self.notify.notify_waiters();
142for activator in self.activators.borrow().iter() {
143 activator.activate();
144 }
145 }
146 }
147148/// Register an activator to be notified when the frontier progresses
149pub fn activate(&self, activator: Activator) {
150self.activators.borrow_mut().push(activator);
151 }
152}
153154impl<T: Timestamp> Drop for Handle<T> {
155fn drop(&mut self) {
156// This handle is being dropped so remove it from the overall calculation
157self.frontier
158 .borrow_mut()
159 .update_iter(self.handle_frontier.iter().map(|t| (t.clone(), -1)));
160 }
161}
162163impl<T: Timestamp> Clone for Handle<T> {
164fn clone(&self) -> Self {
165 Handle {
166 frontier: Rc::clone(&self.frontier),
167 handle_frontier: Antichain::new(),
168 notify: Rc::clone(&self.notify),
169 activators: Rc::clone(&self.activators),
170 }
171 }
172}
173174/// Creates a stream that flows progress updates from a probe.
175///
176/// The returned stream is guaranteed to never yield any data updates, as is reflected by its type.
177// TODO: Replace `Infallible` with `!` once the latter stabilizes.
178pub fn source<G, T>(scope: G, name: String, handle: Handle<T>) -> Stream<G, Infallible>
179where
180G: Scope<Timestamp = T>,
181 T: Timestamp,
182{
183let mut builder = AsyncOperatorBuilder::new(name, scope);
184let (_output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
185186 builder.build(move |capabilities| async move {
187let mut cap_set = CapabilitySet::from(capabilities);
188let mut frontier = Antichain::from_elem(T::minimum());
189190let mut downgrade_capability = |f: AntichainRef<T>| {
191if PartialOrder::less_than(&frontier.borrow(), &f) {
192 frontier = f.to_owned();
193 cap_set.downgrade(&f);
194 }
195 !frontier.is_empty()
196 };
197198while handle.with_frontier(&mut downgrade_capability) {
199 handle.progressed().await;
200 }
201 });
202203 output_stream
204}