Skip to main content

mz_timely_util/
replay.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Traits and types for replaying captured timely dataflow streams.
17//!
18//! This is roughly based on [timely::dataflow::operators::capture::Replay], which
19//! provides the protocol and semantics of the [MzReplay] operator.
20
21use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25
26use timely::Container;
27use timely::communication::Push;
28use timely::dataflow::channels::Message;
29use timely::dataflow::channels::pushers::Counter as PushCounter;
30use timely::dataflow::operators::capture::Event;
31use timely::dataflow::operators::capture::event::EventIterator;
32use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
33use timely::dataflow::{Scope, Stream};
34use timely::progress::Timestamp;
35use timely::scheduling::ActivateOnDrop;
36
37use crate::activator::ActivatorTrait;
38
39/// Replay a capture stream into a scope with the same timestamp.
40pub trait MzReplay<T, C, A>: Sized
41where
42    T: Timestamp,
43    A: ActivatorTrait,
44{
45    /// Replays `self` into the provided scope, as a `Stream<S, CB::Container>` and provides
46    /// a cancellation token. Uses the supplied container builder `CB` to form containers.
47    ///
48    /// The `period` argument allows the specification of a re-activation period, where the operator
49    /// will re-activate itself every so often.
50    ///
51    /// * `scope`: The [Scope] to replay into.
52    /// * `name`: Human-readable debug name of the Timely operator.
53    /// * `period`: Reschedule the operator once the period has elapsed.
54    ///    Provide [Duration::MAX] to disable periodic scheduling.
55    /// * `activator`: An activator to trigger the operator.
56    fn mz_replay<'scope>(
57        self,
58        scope: Scope<'scope, T>,
59        name: &str,
60        period: Duration,
61        activator: A,
62    ) -> (Stream<'scope, T, C>, Rc<dyn Any>);
63}
64
65impl<T, C, I, A> MzReplay<T, C, A> for I
66where
67    T: Timestamp,
68    C: Container + Clone,
69    I: IntoIterator,
70    I::Item: EventIterator<T, C> + 'static,
71    A: ActivatorTrait + 'static,
72{
73    fn mz_replay<'scope>(
74        self,
75        scope: Scope<'scope, T>,
76        name: &str,
77        period: Duration,
78        activator: A,
79    ) -> (Stream<'scope, T, C>, Rc<dyn Any>) {
80        let name = format!("Replay {}", name);
81        let mut builder = OperatorBuilder::new(name, scope.clone());
82
83        let address = builder.operator_info().address;
84        let periodic_activator = scope.activator_for(Rc::clone(&address));
85
86        let (targets, stream) = builder.new_output();
87
88        let mut output = PushCounter::new(targets);
89        let mut event_streams = self.into_iter().collect::<Vec<_>>();
90        let mut started = false;
91
92        let mut last_active = Instant::now();
93
94        let mut progress_sofar = <timely::progress::ChangeBatch<_>>::new_from(T::minimum(), 1);
95        let token = Rc::new(ActivateOnDrop::new(
96            (),
97            Rc::clone(&address),
98            scope.activations(),
99        ));
100        let weak_token = Rc::downgrade(&token);
101
102        activator.register(scope, address);
103
104        builder.build(move |progress| {
105            activator.ack();
106            if last_active
107                .checked_add(period)
108                .map_or(false, |next_active| next_active <= Instant::now())
109                || !started
110            {
111                last_active = Instant::now();
112                if period < Duration::MAX {
113                    periodic_activator.activate_after(period);
114                }
115            }
116
117            if !started {
118                // The first thing we do is modify our capabilities to match the number of streams we manage.
119                // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as
120                // our very first action.
121                let len: i64 = event_streams
122                    .len()
123                    .try_into()
124                    .expect("Implausibly large vector");
125                progress.internals[0].update(T::minimum(), len - 1);
126                progress_sofar.update(T::minimum(), len);
127                started = true;
128            }
129
130            if weak_token.upgrade().is_some() {
131                for event_stream in event_streams.iter_mut() {
132                    while let Some(event) = event_stream.next() {
133                        use Cow::*;
134                        match event {
135                            Owned(Event::Progress(vec)) => {
136                                progress.internals[0].extend(vec.iter().cloned());
137                                progress_sofar.extend(vec.into_iter());
138                            }
139                            Owned(Event::Messages(time, mut data)) => {
140                                Message::push_at(&mut data, time, &mut output);
141                            }
142                            Borrowed(Event::Progress(vec)) => {
143                                progress.internals[0].extend(vec.iter().cloned());
144                                progress_sofar.extend(vec.iter().cloned());
145                            }
146                            Borrowed(Event::Messages(time, data)) => {
147                                Message::push_at(&mut data.clone(), time.clone(), &mut output);
148                            }
149                        }
150                    }
151                }
152            } else {
153                // Negate the accumulated progress contents emitted so far.
154                progress.internals[0]
155                    .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
156            }
157
158            output.done();
159            output
160                .produced()
161                .borrow_mut()
162                .drain_into(&mut progress.produceds[0]);
163
164            false
165        });
166
167        (stream, token)
168    }
169}