mz_timely_util/
replay.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Traits and types for replaying captured timely dataflow streams.
17//!
18//! This is roughly based on [timely::dataflow::operators::capture::Replay], which
19//! provides the protocol and semantics of the [MzReplay] operator.
20
21use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25use timely::container::ContainerBuilder;
26
27use timely::Container;
28use timely::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, Session};
29use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee};
30use timely::dataflow::operators::capture::Event;
31use timely::dataflow::operators::capture::event::EventIterator;
32use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
33use timely::dataflow::{Scope, StreamCore};
34use timely::progress::Timestamp;
35use timely::scheduling::ActivateOnDrop;
36
37use crate::activator::ActivatorTrait;
38
39/// Replay a capture stream into a scope with the same timestamp.
40pub trait MzReplay<T, C, A>: Sized
41where
42    T: Timestamp,
43    A: ActivatorTrait,
44{
45    /// Replays `self` into the provided scope, as a `StreamCore<S, CB::Container>` and provides
46    /// a cancellation token. Uses the supplied container builder `CB` to form containers.
47    ///
48    /// The `period` argument allows the specification of a re-activation period, where the operator
49    /// will re-activate itself every so often.
50    ///
51    /// * `scope`: The [Scope] to replay into.
52    /// * `name`: Human-readable debug name of the Timely operator.
53    /// * `period`: Reschedule the operator once the period has elapsed.
54    ///    Provide [Duration::MAX] to disable periodic scheduling.
55    /// * `activator`: An activator to trigger the operator.
56    fn mz_replay<S: Scope<Timestamp = T>, CB, L>(
57        self,
58        scope: &mut S,
59        name: &str,
60        period: Duration,
61        activator: A,
62        logic: L,
63    ) -> (StreamCore<S, CB::Container>, Rc<dyn Any>)
64    where
65        CB: ContainerBuilder,
66        L: FnMut(Session<T, CB, PushCounter<T, CB::Container, Tee<T, CB::Container>>>, Cow<C>)
67            + 'static;
68}
69
70impl<T, C, I, A> MzReplay<T, C, A> for I
71where
72    T: Timestamp,
73    C: Container + Clone,
74    I: IntoIterator,
75    I::Item: EventIterator<T, C> + 'static,
76    A: ActivatorTrait + 'static,
77{
78    fn mz_replay<S: Scope<Timestamp = T>, CB, L>(
79        self,
80        scope: &mut S,
81        name: &str,
82        period: Duration,
83        activator: A,
84        mut logic: L,
85    ) -> (StreamCore<S, CB::Container>, Rc<dyn Any>)
86    where
87        for<'a> CB: ContainerBuilder,
88        L: FnMut(Session<T, CB, PushCounter<T, CB::Container, Tee<T, CB::Container>>>, Cow<C>)
89            + 'static,
90    {
91        let name = format!("Replay {}", name);
92        let mut builder = OperatorBuilder::new(name, scope.clone());
93
94        let address = builder.operator_info().address;
95        let periodic_activator = scope.activator_for(Rc::clone(&address));
96
97        let (targets, stream) = builder.new_output();
98
99        let mut output: PushBuffer<_, CB, _> = PushBuffer::new(PushCounter::new(targets));
100        let mut event_streams = self.into_iter().collect::<Vec<_>>();
101        let mut started = false;
102
103        let mut last_active = Instant::now();
104
105        let mut progress_sofar =
106            <timely::progress::ChangeBatch<_>>::new_from(S::Timestamp::minimum(), 1);
107        let token = Rc::new(ActivateOnDrop::new(
108            (),
109            Rc::clone(&address),
110            scope.activations(),
111        ));
112        let weak_token = Rc::downgrade(&token);
113
114        activator.register(scope, address);
115
116        builder.build(move |progress| {
117            activator.ack();
118            if last_active
119                .checked_add(period)
120                .map_or(false, |next_active| next_active <= Instant::now())
121                || !started
122            {
123                last_active = Instant::now();
124                if period < Duration::MAX {
125                    periodic_activator.activate_after(period);
126                }
127            }
128
129            if !started {
130                // The first thing we do is modify our capabilities to match the number of streams we manage.
131                // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as
132                // our very first action.
133                let len: i64 = event_streams
134                    .len()
135                    .try_into()
136                    .expect("Implausibly large vector");
137                progress.internals[0].update(S::Timestamp::minimum(), len - 1);
138                progress_sofar.update(S::Timestamp::minimum(), len);
139                started = true;
140            }
141
142            if weak_token.upgrade().is_some() {
143                for event_stream in event_streams.iter_mut() {
144                    while let Some(event) = event_stream.next() {
145                        use Cow::*;
146                        match event {
147                            Owned(Event::Progress(vec)) => {
148                                progress.internals[0].extend(vec.iter().cloned());
149                                progress_sofar.extend(vec.into_iter());
150                            }
151                            Owned(Event::Messages(time, data)) => {
152                                logic(output.session_with_builder(&time), Owned(data));
153                            }
154                            Borrowed(Event::Progress(vec)) => {
155                                progress.internals[0].extend(vec.iter().cloned());
156                                progress_sofar.extend(vec.iter().cloned());
157                            }
158                            Borrowed(Event::Messages(time, data)) => {
159                                logic(output.session_with_builder(time), Borrowed(data));
160                            }
161                        }
162                    }
163                }
164            } else {
165                // Negate the accumulated progress contents emitted so far.
166                progress.internals[0]
167                    .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
168            }
169
170            output.cease();
171            output
172                .inner()
173                .produced()
174                .borrow_mut()
175                .drain_into(&mut progress.produceds[0]);
176
177            false
178        });
179
180        (stream, token)
181    }
182}