timely/dataflow/operators/core/capture/
replay.rs

1//! Traits and types for replaying captured timely dataflow streams.
2//!
3//! A type can be replayed into any timely dataflow scope if it presents as an
4//! iterator whose `Item` type implements `EventIterator` with the same timestamp.
5//! Other types can implement the `ReplayInto` trait, but this should be done with
6//! care, as there is a protocol the replayer follows that must be respected if the
7//! computation is to make sense.
8//!
9//! # Protocol
10//!
11//! The stream of events produced by each `EventIterator` implementation must satisfy,
12//! starting from a default timestamp of `Timestamp::minimum()` with count 1,
13//!
14//! 1. The progress messages may only increment the count for a timestamp if
15//!    the cumulative count for some prior or equal timestamp is positive.
16//! 2. The data messages map only use a timestamp if the cumulative count for
17//!    some prior or equal timestamp is positive.
18//!
19//! Alternately, the sequence of events should, starting from an initial count of 1
20//! for the timestamp `Default::default()`, describe decrements to held capabilities
21//! or the production of capabilities in their future, or messages sent at times in
22//! the future of held capabilities.
23//!
24//! The order is very important here. One can move `Event::Message` events arbitrarily
25//! earlier in the sequence, and `Event::Progress` events arbitrarily later, but one
26//! cannot move a `Event::Progress` message that discards a last capability before any
27//! `Event::Message` that would use that capability.
28//!
29//! For an example, the `Operate<T>` implementation for `capture::CaptureOperator<T, D, P>`
30//! records exactly what data is presented at the operator, both in terms of progress
31//! messages and data received.
32//!
33//! # Notes
34//!
35//! Provided no stream of events reports the consumption of capabilities it does not hold,
36//! any interleaving of the streams of events will still maintain the invariants above.
37//! This means that each timely dataflow replay operator can replay any number of streams,
38//! allowing the replay to occur in a timely dataflow computation with more or fewer workers
39//! than that in which the stream was captured.
40
41use crate::dataflow::{Scope, StreamCore};
42use crate::dataflow::channels::pushers::Counter as PushCounter;
43use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
44use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
45use crate::progress::Timestamp;
46
47use super::Event;
48use super::event::EventIterator;
49use crate::Container;
50
51/// Replay a capture stream into a scope with the same timestamp.
52pub trait Replay<T: Timestamp, C> : Sized {
53    /// Replays `self` into the provided scope, as a `StreamCore<S, C>`.
54    fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, C> {
55        self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
56    }
57    /// Replays `self` into the provided scope, as a `StreamCore<S, C>`.
58    ///
59    /// The `period` argument allows the specification of a re-activation period, where the operator
60    /// will re-activate itself every so often. The `None` argument instructs the operator not to
61    /// re-activate itself.
62    fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
63}
64
65impl<T: Timestamp, C: Container + Clone + 'static, I> Replay<T, C> for I
66where
67    I : IntoIterator,
68    <I as IntoIterator>::Item: EventIterator<T, C>+'static,
69{
70    fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>{
71
72        let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
73
74        let address = builder.operator_info().address;
75        let activator = scope.activator_for(address);
76
77        let (targets, stream) = builder.new_output();
78
79        let mut output = PushBuffer::new(PushCounter::new(targets));
80        let mut event_streams = self.into_iter().collect::<Vec<_>>();
81        let mut started = false;
82        let mut allocation: C = Default::default();
83
84        builder.build(
85            move |progress| {
86
87                if !started {
88                    // The first thing we do is modify our capabilities to match the number of streams we manage.
89                    // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as
90                    // our very first action.
91                    progress.internals[0].update(S::Timestamp::minimum(), (event_streams.len() as i64) - 1);
92                    started = true;
93                }
94
95                for event_stream in event_streams.iter_mut() {
96                    while let Some(event) = event_stream.next() {
97                        use std::borrow::Cow::*;
98                        match event {
99                            Owned(Event::Progress(vec)) => {
100                                progress.internals[0].extend(vec.into_iter());
101                            },
102                            Owned(Event::Messages(time, mut data)) => {
103                                output.session(&time).give_container(&mut data);
104                            }
105                            Borrowed(Event::Progress(vec)) => {
106                                progress.internals[0].extend(vec.iter().cloned());
107                            },
108                            Borrowed(Event::Messages(time, data)) => {
109                                allocation.clone_from(data);
110                                output.session(time).give_container(&mut allocation);
111                            }
112                        }
113                    }
114                }
115
116                // A `None` period indicates that we do not re-activate here.
117                if let Some(delay) = period {
118                    activator.activate_after(delay);
119                }
120
121                output.cease();
122                output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]);
123
124                false
125            }
126        );
127
128        stream
129    }
130}