1use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25
26use timely::Container;
27use timely::communication::Push;
28use timely::dataflow::channels::Message;
29use timely::dataflow::channels::pushers::Counter as PushCounter;
30use timely::dataflow::operators::capture::Event;
31use timely::dataflow::operators::capture::event::EventIterator;
32use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
33use timely::dataflow::{Scope, Stream};
34use timely::progress::Timestamp;
35use timely::scheduling::ActivateOnDrop;
36
37use crate::activator::ActivatorTrait;
38
39pub trait MzReplay<T, C, A>: Sized
41where
42 T: Timestamp,
43 A: ActivatorTrait,
44{
45 fn mz_replay<'scope>(
57 self,
58 scope: Scope<'scope, T>,
59 name: &str,
60 period: Duration,
61 activator: A,
62 ) -> (Stream<'scope, T, C>, Rc<dyn Any>);
63}
64
65impl<T, C, I, A> MzReplay<T, C, A> for I
66where
67 T: Timestamp,
68 C: Container + Clone,
69 I: IntoIterator,
70 I::Item: EventIterator<T, C> + 'static,
71 A: ActivatorTrait + 'static,
72{
73 fn mz_replay<'scope>(
74 self,
75 scope: Scope<'scope, T>,
76 name: &str,
77 period: Duration,
78 activator: A,
79 ) -> (Stream<'scope, T, C>, Rc<dyn Any>) {
80 let name = format!("Replay {}", name);
81 let mut builder = OperatorBuilder::new(name, scope.clone());
82
83 let address = builder.operator_info().address;
84 let periodic_activator = scope.activator_for(Rc::clone(&address));
85
86 let (targets, stream) = builder.new_output();
87
88 let mut output = PushCounter::new(targets);
89 let mut event_streams = self.into_iter().collect::<Vec<_>>();
90 let mut started = false;
91
92 let mut last_active = Instant::now();
93
94 let mut progress_sofar = <timely::progress::ChangeBatch<_>>::new_from(T::minimum(), 1);
95 let token = Rc::new(ActivateOnDrop::new(
96 (),
97 Rc::clone(&address),
98 scope.activations(),
99 ));
100 let weak_token = Rc::downgrade(&token);
101
102 activator.register(scope, address);
103
104 builder.build(move |progress| {
105 activator.ack();
106 if last_active
107 .checked_add(period)
108 .map_or(false, |next_active| next_active <= Instant::now())
109 || !started
110 {
111 last_active = Instant::now();
112 if period < Duration::MAX {
113 periodic_activator.activate_after(period);
114 }
115 }
116
117 if !started {
118 let len: i64 = event_streams
122 .len()
123 .try_into()
124 .expect("Implausibly large vector");
125 progress.internals[0].update(T::minimum(), len - 1);
126 progress_sofar.update(T::minimum(), len);
127 started = true;
128 }
129
130 if weak_token.upgrade().is_some() {
131 for event_stream in event_streams.iter_mut() {
132 while let Some(event) = event_stream.next() {
133 use Cow::*;
134 match event {
135 Owned(Event::Progress(vec)) => {
136 progress.internals[0].extend(vec.iter().cloned());
137 progress_sofar.extend(vec.into_iter());
138 }
139 Owned(Event::Messages(time, mut data)) => {
140 Message::push_at(&mut data, time, &mut output);
141 }
142 Borrowed(Event::Progress(vec)) => {
143 progress.internals[0].extend(vec.iter().cloned());
144 progress_sofar.extend(vec.iter().cloned());
145 }
146 Borrowed(Event::Messages(time, data)) => {
147 Message::push_at(&mut data.clone(), time.clone(), &mut output);
148 }
149 }
150 }
151 }
152 } else {
153 progress.internals[0]
155 .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
156 }
157
158 output.done();
159 output
160 .produced()
161 .borrow_mut()
162 .drain_into(&mut progress.produceds[0]);
163
164 false
165 });
166
167 (stream, token)
168 }
169}