mz_timely_util/
replay.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Traits and types for replaying captured timely dataflow streams.
17//!
18//! This is roughly based on [timely::dataflow::operators::capture::Replay], which
19//! provides the protocol and semantics of the [MzReplay] operator.
20
21use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25
26use timely::Container;
27use timely::communication::Push;
28use timely::dataflow::channels::pushers::Counter as PushCounter;
29use timely::dataflow::operators::capture::Event;
30use timely::dataflow::operators::capture::event::EventIterator;
31use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
32use timely::dataflow::{Scope, StreamCore};
33use timely::progress::Timestamp;
34use timely::scheduling::ActivateOnDrop;
35
36use crate::activator::ActivatorTrait;
37
38/// Replay a capture stream into a scope with the same timestamp.
39pub trait MzReplay<T, C, A>: Sized
40where
41    T: Timestamp,
42    A: ActivatorTrait,
43{
44    /// Replays `self` into the provided scope, as a `StreamCore<S, CB::Container>` and provides
45    /// a cancellation token. Uses the supplied container builder `CB` to form containers.
46    ///
47    /// The `period` argument allows the specification of a re-activation period, where the operator
48    /// will re-activate itself every so often.
49    ///
50    /// * `scope`: The [Scope] to replay into.
51    /// * `name`: Human-readable debug name of the Timely operator.
52    /// * `period`: Reschedule the operator once the period has elapsed.
53    ///    Provide [Duration::MAX] to disable periodic scheduling.
54    /// * `activator`: An activator to trigger the operator.
55    fn mz_replay<S: Scope<Timestamp = T>>(
56        self,
57        scope: &mut S,
58        name: &str,
59        period: Duration,
60        activator: A,
61    ) -> (StreamCore<S, C>, Rc<dyn Any>);
62}
63
64impl<T, C, I, A> MzReplay<T, C, A> for I
65where
66    T: Timestamp,
67    C: Container + Clone,
68    I: IntoIterator,
69    I::Item: EventIterator<T, C> + 'static,
70    A: ActivatorTrait + 'static,
71{
72    fn mz_replay<S: Scope<Timestamp = T>>(
73        self,
74        scope: &mut S,
75        name: &str,
76        period: Duration,
77        activator: A,
78    ) -> (StreamCore<S, C>, Rc<dyn Any>) {
79        let name = format!("Replay {}", name);
80        let mut builder = OperatorBuilder::new(name, scope.clone());
81
82        let address = builder.operator_info().address;
83        let periodic_activator = scope.activator_for(Rc::clone(&address));
84
85        let (targets, stream) = builder.new_output();
86
87        let mut output = PushCounter::new(targets);
88        let mut event_streams = self.into_iter().collect::<Vec<_>>();
89        let mut started = false;
90
91        let mut last_active = Instant::now();
92
93        let mut progress_sofar =
94            <timely::progress::ChangeBatch<_>>::new_from(S::Timestamp::minimum(), 1);
95        let token = Rc::new(ActivateOnDrop::new(
96            (),
97            Rc::clone(&address),
98            scope.activations(),
99        ));
100        let weak_token = Rc::downgrade(&token);
101
102        activator.register(scope, address);
103
104        builder.build(move |progress| {
105            activator.ack();
106            if last_active
107                .checked_add(period)
108                .map_or(false, |next_active| next_active <= Instant::now())
109                || !started
110            {
111                last_active = Instant::now();
112                if period < Duration::MAX {
113                    periodic_activator.activate_after(period);
114                }
115            }
116
117            if !started {
118                // The first thing we do is modify our capabilities to match the number of streams we manage.
119                // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as
120                // our very first action.
121                let len: i64 = event_streams
122                    .len()
123                    .try_into()
124                    .expect("Implausibly large vector");
125                progress.internals[0].update(S::Timestamp::minimum(), len - 1);
126                progress_sofar.update(S::Timestamp::minimum(), len);
127                started = true;
128            }
129
130            if weak_token.upgrade().is_some() {
131                for event_stream in event_streams.iter_mut() {
132                    while let Some(event) = event_stream.next() {
133                        use Cow::*;
134                        match event {
135                            Owned(Event::Progress(vec)) => {
136                                progress.internals[0].extend(vec.iter().cloned());
137                                progress_sofar.extend(vec.into_iter());
138                            }
139                            Owned(Event::Messages(time, mut data)) => {
140                                output.give(time, &mut data);
141                            }
142                            Borrowed(Event::Progress(vec)) => {
143                                progress.internals[0].extend(vec.iter().cloned());
144                                progress_sofar.extend(vec.iter().cloned());
145                            }
146                            Borrowed(Event::Messages(time, data)) => {
147                                output.give(time.clone(), &mut data.clone());
148                            }
149                        }
150                    }
151                }
152            } else {
153                // Negate the accumulated progress contents emitted so far.
154                progress.internals[0]
155                    .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
156            }
157
158            output.done();
159            output
160                .produced()
161                .borrow_mut()
162                .drain_into(&mut progress.produceds[0]);
163
164            false
165        });
166
167        (stream, token)
168    }
169}