timely/dataflow/operators/core/capture/
replay.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! Traits and types for replaying captured timely dataflow streams.
//!
//! A type can be replayed into any timely dataflow scope if it presents as an
//! iterator whose `Item` type implements `EventIterator` with the same timestamp.
//! Other types can implement the `ReplayInto` trait, but this should be done with
//! care, as there is a protocol the replayer follows that must be respected if the
//! computation is to make sense.
//!
//! # Protocol
//!
//! The stream of events produced by each `EventIterator` implementation must satisfy,
//! starting from a default timestamp of `Timestamp::minimum()` with count 1,
//!
//! 1. The progress messages may only increment the count for a timestamp if
//!    the cumulative count for some prior or equal timestamp is positive.
//! 2. The data messages map only use a timestamp if the cumulative count for
//!    some prior or equal timestamp is positive.
//!
//! Alternately, the sequence of events should, starting from an initial count of 1
//! for the timestamp `Default::default()`, describe decrements to held capabilities
//! or the production of capabilities in their future, or messages sent at times in
//! the future of held capabilities.
//!
//! The order is very important here. One can move `Event::Message` events arbitrarily
//! earlier in the sequence, and `Event::Progress` events arbitrarily later, but one
//! cannot move a `Event::Progress` message that discards a last capability before any
//! `Event::Message` that would use that capability.
//!
//! For an example, the `Operate<T>` implementation for `capture::CaptureOperator<T, D, P>`
//! records exactly what data is presented at the operator, both in terms of progress
//! messages and data received.
//!
//! # Notes
//!
//! Provided no stream of events reports the consumption of capabilities it does not hold,
//! any interleaving of the streams of events will still maintain the invariants above.
//! This means that each timely dataflow replay operator can replay any number of streams,
//! allowing the replay to occur in a timely dataflow computation with more or fewer workers
//! than that in which the stream was captured.

use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::Timestamp;

use super::Event;
use super::event::EventIterator;
use crate::Container;

/// Replay a capture stream into a scope with the same timestamp.
pub trait Replay<T: Timestamp, C> : Sized {
    /// Replays `self` into the provided scope, as a `StreamCore<S, C>`.
    fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> StreamCore<S, C> {
        self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
    }
    /// Replays `self` into the provided scope, as a `StreamCore<S, C>`.
    ///
    /// The `period` argument allows the specification of a re-activation period, where the operator
    /// will re-activate itself every so often. The `None` argument instructs the operator not to
    /// re-activate itself.
    fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
}

impl<T: Timestamp, C: Container + Clone + 'static, I> Replay<T, C> for I
where
    I : IntoIterator,
    <I as IntoIterator>::Item: EventIterator<T, C>+'static,
{
    fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>{

        let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());

        let address = builder.operator_info().address;
        let activator = scope.activator_for(address);

        let (targets, stream) = builder.new_output();

        let mut output = PushBuffer::new(PushCounter::new(targets));
        let mut event_streams = self.into_iter().collect::<Vec<_>>();
        let mut started = false;
        let mut allocation: C = Default::default();

        builder.build(
            move |progress| {

                if !started {
                    // The first thing we do is modify our capabilities to match the number of streams we manage.
                    // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as
                    // our very first action.
                    progress.internals[0].update(S::Timestamp::minimum(), (event_streams.len() as i64) - 1);
                    started = true;
                }

                for event_stream in event_streams.iter_mut() {
                    while let Some(event) = event_stream.next() {
                        use std::borrow::Cow::*;
                        match event {
                            Owned(Event::Progress(vec)) => {
                                progress.internals[0].extend(vec.into_iter());
                            },
                            Owned(Event::Messages(time, mut data)) => {
                                output.session(&time).give_container(&mut data);
                            }
                            Borrowed(Event::Progress(vec)) => {
                                progress.internals[0].extend(vec.iter().cloned());
                            },
                            Borrowed(Event::Messages(time, data)) => {
                                allocation.clone_from(data);
                                output.session(time).give_container(&mut allocation);
                            }
                        }
                    }
                }

                // A `None` period indicates that we do not re-activate here.
                if let Some(delay) = period {
                    activator.activate_after(delay);
                }

                output.cease();
                output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]);

                false
            }
        );

        stream
    }
}