1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
1516//! Traits and types for replaying captured timely dataflow streams.
17//!
18//! This is roughly based on [timely::dataflow::operators::capture::Replay], which
19//! provides the protocol and semantics of the [MzReplay] operator.
2021use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25use timely::container::ContainerBuilder;
2627use timely::Container;
28use timely::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, Session};
29use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee};
30use timely::dataflow::operators::capture::Event;
31use timely::dataflow::operators::capture::event::EventIterator;
32use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
33use timely::dataflow::{Scope, StreamCore};
34use timely::progress::Timestamp;
35use timely::scheduling::ActivateOnDrop;
3637use crate::activator::ActivatorTrait;
3839/// Replay a capture stream into a scope with the same timestamp.
40pub trait MzReplay<T, C, A>: Sized
41where
42T: Timestamp,
43 A: ActivatorTrait,
44{
45/// Replays `self` into the provided scope, as a `StreamCore<S, CB::Container>` and provides
46 /// a cancellation token. Uses the supplied container builder `CB` to form containers.
47 ///
48 /// The `period` argument allows the specification of a re-activation period, where the operator
49 /// will re-activate itself every so often.
50 ///
51 /// * `scope`: The [Scope] to replay into.
52 /// * `name`: Human-readable debug name of the Timely operator.
53 /// * `period`: Reschedule the operator once the period has elapsed.
54 /// Provide [Duration::MAX] to disable periodic scheduling.
55 /// * `activator`: An activator to trigger the operator.
56fn mz_replay<S: Scope<Timestamp = T>, CB, L>(
57self,
58 scope: &mut S,
59 name: &str,
60 period: Duration,
61 activator: A,
62 logic: L,
63 ) -> (StreamCore<S, CB::Container>, Rc<dyn Any>)
64where
65CB: ContainerBuilder,
66 L: FnMut(Session<T, CB, PushCounter<T, CB::Container, Tee<T, CB::Container>>>, Cow<C>)
67 + 'static;
68}
6970impl<T, C, I, A> MzReplay<T, C, A> for I
71where
72T: Timestamp,
73 C: Container + Clone,
74 I: IntoIterator,
75 I::Item: EventIterator<T, C> + 'static,
76 A: ActivatorTrait + 'static,
77{
78fn mz_replay<S: Scope<Timestamp = T>, CB, L>(
79self,
80 scope: &mut S,
81 name: &str,
82 period: Duration,
83 activator: A,
84mut logic: L,
85 ) -> (StreamCore<S, CB::Container>, Rc<dyn Any>)
86where
87 for<'a> CB: ContainerBuilder,
88 L: FnMut(Session<T, CB, PushCounter<T, CB::Container, Tee<T, CB::Container>>>, Cow<C>)
89 + 'static,
90 {
91let name = format!("Replay {}", name);
92let mut builder = OperatorBuilder::new(name, scope.clone());
9394let address = builder.operator_info().address;
95let periodic_activator = scope.activator_for(Rc::clone(&address));
9697let (targets, stream) = builder.new_output();
9899let mut output: PushBuffer<_, CB, _> = PushBuffer::new(PushCounter::new(targets));
100let mut event_streams = self.into_iter().collect::<Vec<_>>();
101let mut started = false;
102103let mut last_active = Instant::now();
104105let mut progress_sofar =
106 <timely::progress::ChangeBatch<_>>::new_from(S::Timestamp::minimum(), 1);
107let token = Rc::new(ActivateOnDrop::new(
108 (),
109 Rc::clone(&address),
110 scope.activations(),
111 ));
112let weak_token = Rc::downgrade(&token);
113114 activator.register(scope, address);
115116 builder.build(move |progress| {
117 activator.ack();
118if last_active
119 .checked_add(period)
120 .map_or(false, |next_active| next_active <= Instant::now())
121 || !started
122 {
123 last_active = Instant::now();
124if period < Duration::MAX {
125 periodic_activator.activate_after(period);
126 }
127 }
128129if !started {
130// The first thing we do is modify our capabilities to match the number of streams we manage.
131 // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as
132 // our very first action.
133let len: i64 = event_streams
134 .len()
135 .try_into()
136 .expect("Implausibly large vector");
137 progress.internals[0].update(S::Timestamp::minimum(), len - 1);
138 progress_sofar.update(S::Timestamp::minimum(), len);
139 started = true;
140 }
141142if weak_token.upgrade().is_some() {
143for event_stream in event_streams.iter_mut() {
144while let Some(event) = event_stream.next() {
145use Cow::*;
146match event {
147 Owned(Event::Progress(vec)) => {
148 progress.internals[0].extend(vec.iter().cloned());
149 progress_sofar.extend(vec.into_iter());
150 }
151 Owned(Event::Messages(time, data)) => {
152 logic(output.session_with_builder(&time), Owned(data));
153 }
154 Borrowed(Event::Progress(vec)) => {
155 progress.internals[0].extend(vec.iter().cloned());
156 progress_sofar.extend(vec.iter().cloned());
157 }
158 Borrowed(Event::Messages(time, data)) => {
159 logic(output.session_with_builder(time), Borrowed(data));
160 }
161 }
162 }
163 }
164 } else {
165// Negate the accumulated progress contents emitted so far.
166progress.internals[0]
167 .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
168 }
169170 output.cease();
171 output
172 .inner()
173 .produced()
174 .borrow_mut()
175 .drain_into(&mut progress.produceds[0]);
176177false
178});
179180 (stream, token)
181 }
182}