1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::activator::RcActivator;
use std::time::{Duration, Instant};
use timely::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use timely::dataflow::channels::pushers::Counter as PushCounter;
use timely::dataflow::operators::capture::event::EventIterator;
use timely::dataflow::operators::capture::Event;
use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
use timely::dataflow::{Scope, Stream};
use timely::progress::Timestamp;
use timely::Data;
pub trait MzReplay<T: Timestamp, D: Data>: Sized {
fn mz_replay<S: Scope<Timestamp = T>>(
self,
scope: &mut S,
name: &str,
perid: Duration,
rc_activator: RcActivator,
) -> Stream<S, D>;
}
impl<T: Timestamp, D: Data, I> MzReplay<T, D> for I
where
I: IntoIterator,
<I as IntoIterator>::Item: EventIterator<T, D> + 'static,
{
fn mz_replay<S: Scope<Timestamp = T>>(
self,
scope: &mut S,
name: &str,
period: Duration,
rc_activator: RcActivator,
) -> Stream<S, D> {
let name = format!("Replay {}", name);
let mut builder = OperatorBuilder::new(name, scope.clone());
let address = builder.operator_info().address;
let activator = scope.activator_for(&address[..]);
let (targets, stream) = builder.new_output();
let mut output = PushBuffer::new(PushCounter::new(targets));
let mut event_streams = self.into_iter().collect::<Vec<_>>();
let mut started = false;
let mut last_active = Instant::now();
rc_activator.register(scope.activator_for(&address[..]));
builder.build(move |progress| {
rc_activator.ack();
if last_active
.checked_add(period)
.map_or(false, |next_active| next_active <= Instant::now())
|| !started
{
last_active = Instant::now();
if period < Duration::MAX {
activator.activate_after(period);
}
}
if !started {
progress.internals[0]
.update(S::Timestamp::minimum(), (event_streams.len() as i64) - 1);
started = true;
}
let mut buffer = Vec::new();
for event_stream in event_streams.iter_mut() {
while let Some(event) = event_stream.next() {
match &event {
Event::Progress(vec) => {
progress.internals[0].extend(vec.iter().cloned());
}
Event::Messages(time, data) => {
buffer.extend_from_slice(data);
output.session(time).give_vec(&mut buffer);
}
}
}
}
output.cease();
output
.inner()
.produced()
.borrow_mut()
.drain_into(&mut progress.produceds[0]);
false
});
stream
}
}