1use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25
26use timely::Container;
27use timely::communication::Push;
28use timely::dataflow::channels::Message;
29use timely::dataflow::channels::pushers::Counter as PushCounter;
30use timely::dataflow::operators::capture::Event;
31use timely::dataflow::operators::capture::event::EventIterator;
32use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
33use timely::dataflow::{Scope, StreamCore};
34use timely::progress::Timestamp;
35use timely::scheduling::ActivateOnDrop;
36
37use crate::activator::ActivatorTrait;
38
39pub trait MzReplay<T, C, A>: Sized
41where
42 T: Timestamp,
43 A: ActivatorTrait,
44{
45 fn mz_replay<S: Scope<Timestamp = T>>(
57 self,
58 scope: &mut S,
59 name: &str,
60 period: Duration,
61 activator: A,
62 ) -> (StreamCore<S, C>, Rc<dyn Any>);
63}
64
65impl<T, C, I, A> MzReplay<T, C, A> for I
66where
67 T: Timestamp,
68 C: Container + Clone,
69 I: IntoIterator,
70 I::Item: EventIterator<T, C> + 'static,
71 A: ActivatorTrait + 'static,
72{
73 fn mz_replay<S: Scope<Timestamp = T>>(
74 self,
75 scope: &mut S,
76 name: &str,
77 period: Duration,
78 activator: A,
79 ) -> (StreamCore<S, C>, Rc<dyn Any>) {
80 let name = format!("Replay {}", name);
81 let mut builder = OperatorBuilder::new(name, scope.clone());
82
83 let address = builder.operator_info().address;
84 let periodic_activator = scope.activator_for(Rc::clone(&address));
85
86 let (targets, stream) = builder.new_output();
87
88 let mut output = PushCounter::new(targets);
89 let mut event_streams = self.into_iter().collect::<Vec<_>>();
90 let mut started = false;
91
92 let mut last_active = Instant::now();
93
94 let mut progress_sofar =
95 <timely::progress::ChangeBatch<_>>::new_from(S::Timestamp::minimum(), 1);
96 let token = Rc::new(ActivateOnDrop::new(
97 (),
98 Rc::clone(&address),
99 scope.activations(),
100 ));
101 let weak_token = Rc::downgrade(&token);
102
103 activator.register(scope, address);
104
105 builder.build(move |progress| {
106 activator.ack();
107 if last_active
108 .checked_add(period)
109 .map_or(false, |next_active| next_active <= Instant::now())
110 || !started
111 {
112 last_active = Instant::now();
113 if period < Duration::MAX {
114 periodic_activator.activate_after(period);
115 }
116 }
117
118 if !started {
119 let len: i64 = event_streams
123 .len()
124 .try_into()
125 .expect("Implausibly large vector");
126 progress.internals[0].update(S::Timestamp::minimum(), len - 1);
127 progress_sofar.update(S::Timestamp::minimum(), len);
128 started = true;
129 }
130
131 if weak_token.upgrade().is_some() {
132 for event_stream in event_streams.iter_mut() {
133 while let Some(event) = event_stream.next() {
134 use Cow::*;
135 match event {
136 Owned(Event::Progress(vec)) => {
137 progress.internals[0].extend(vec.iter().cloned());
138 progress_sofar.extend(vec.into_iter());
139 }
140 Owned(Event::Messages(time, mut data)) => {
141 Message::push_at(&mut data, time, &mut output);
142 }
143 Borrowed(Event::Progress(vec)) => {
144 progress.internals[0].extend(vec.iter().cloned());
145 progress_sofar.extend(vec.iter().cloned());
146 }
147 Borrowed(Event::Messages(time, data)) => {
148 Message::push_at(&mut data.clone(), time.clone(), &mut output);
149 }
150 }
151 }
152 }
153 } else {
154 progress.internals[0]
156 .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
157 }
158
159 output.done();
160 output
161 .produced()
162 .borrow_mut()
163 .drain_into(&mut progress.produceds[0]);
164
165 false
166 });
167
168 (stream, token)
169 }
170}