1use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25
26use timely::Container;
27use timely::communication::Push;
28use timely::dataflow::channels::pushers::Counter as PushCounter;
29use timely::dataflow::operators::capture::Event;
30use timely::dataflow::operators::capture::event::EventIterator;
31use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
32use timely::dataflow::{Scope, StreamCore};
33use timely::progress::Timestamp;
34use timely::scheduling::ActivateOnDrop;
35
36use crate::activator::ActivatorTrait;
37
38pub trait MzReplay<T, C, A>: Sized
40where
41 T: Timestamp,
42 A: ActivatorTrait,
43{
44 fn mz_replay<S: Scope<Timestamp = T>>(
56 self,
57 scope: &mut S,
58 name: &str,
59 period: Duration,
60 activator: A,
61 ) -> (StreamCore<S, C>, Rc<dyn Any>);
62}
63
64impl<T, C, I, A> MzReplay<T, C, A> for I
65where
66 T: Timestamp,
67 C: Container + Clone,
68 I: IntoIterator,
69 I::Item: EventIterator<T, C> + 'static,
70 A: ActivatorTrait + 'static,
71{
72 fn mz_replay<S: Scope<Timestamp = T>>(
73 self,
74 scope: &mut S,
75 name: &str,
76 period: Duration,
77 activator: A,
78 ) -> (StreamCore<S, C>, Rc<dyn Any>) {
79 let name = format!("Replay {}", name);
80 let mut builder = OperatorBuilder::new(name, scope.clone());
81
82 let address = builder.operator_info().address;
83 let periodic_activator = scope.activator_for(Rc::clone(&address));
84
85 let (targets, stream) = builder.new_output();
86
87 let mut output = PushCounter::new(targets);
88 let mut event_streams = self.into_iter().collect::<Vec<_>>();
89 let mut started = false;
90
91 let mut last_active = Instant::now();
92
93 let mut progress_sofar =
94 <timely::progress::ChangeBatch<_>>::new_from(S::Timestamp::minimum(), 1);
95 let token = Rc::new(ActivateOnDrop::new(
96 (),
97 Rc::clone(&address),
98 scope.activations(),
99 ));
100 let weak_token = Rc::downgrade(&token);
101
102 activator.register(scope, address);
103
104 builder.build(move |progress| {
105 activator.ack();
106 if last_active
107 .checked_add(period)
108 .map_or(false, |next_active| next_active <= Instant::now())
109 || !started
110 {
111 last_active = Instant::now();
112 if period < Duration::MAX {
113 periodic_activator.activate_after(period);
114 }
115 }
116
117 if !started {
118 let len: i64 = event_streams
122 .len()
123 .try_into()
124 .expect("Implausibly large vector");
125 progress.internals[0].update(S::Timestamp::minimum(), len - 1);
126 progress_sofar.update(S::Timestamp::minimum(), len);
127 started = true;
128 }
129
130 if weak_token.upgrade().is_some() {
131 for event_stream in event_streams.iter_mut() {
132 while let Some(event) = event_stream.next() {
133 use Cow::*;
134 match event {
135 Owned(Event::Progress(vec)) => {
136 progress.internals[0].extend(vec.iter().cloned());
137 progress_sofar.extend(vec.into_iter());
138 }
139 Owned(Event::Messages(time, mut data)) => {
140 output.give(time, &mut data);
141 }
142 Borrowed(Event::Progress(vec)) => {
143 progress.internals[0].extend(vec.iter().cloned());
144 progress_sofar.extend(vec.iter().cloned());
145 }
146 Borrowed(Event::Messages(time, data)) => {
147 output.give(time.clone(), &mut data.clone());
148 }
149 }
150 }
151 }
152 } else {
153 progress.internals[0]
155 .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
156 }
157
158 output.done();
159 output
160 .produced()
161 .borrow_mut()
162 .drain_into(&mut progress.produceds[0]);
163
164 false
165 });
166
167 (stream, token)
168 }
169}