1use std::any::Any;
22use std::borrow::Cow;
23use std::rc::Rc;
24use std::time::{Duration, Instant};
25use timely::container::ContainerBuilder;
26
27use timely::Container;
28use timely::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, Session};
29use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee};
30use timely::dataflow::operators::capture::Event;
31use timely::dataflow::operators::capture::event::EventIterator;
32use timely::dataflow::operators::generic::builder_raw::OperatorBuilder;
33use timely::dataflow::{Scope, StreamCore};
34use timely::progress::Timestamp;
35use timely::scheduling::ActivateOnDrop;
36
37use crate::activator::ActivatorTrait;
38
39pub trait MzReplay<T, C, A>: Sized
41where
42 T: Timestamp,
43 A: ActivatorTrait,
44{
45 fn mz_replay<S: Scope<Timestamp = T>, CB, L>(
57 self,
58 scope: &mut S,
59 name: &str,
60 period: Duration,
61 activator: A,
62 logic: L,
63 ) -> (StreamCore<S, CB::Container>, Rc<dyn Any>)
64 where
65 CB: ContainerBuilder,
66 L: FnMut(Session<T, CB, PushCounter<T, CB::Container, Tee<T, CB::Container>>>, Cow<C>)
67 + 'static;
68}
69
70impl<T, C, I, A> MzReplay<T, C, A> for I
71where
72 T: Timestamp,
73 C: Container + Clone,
74 I: IntoIterator,
75 I::Item: EventIterator<T, C> + 'static,
76 A: ActivatorTrait + 'static,
77{
78 fn mz_replay<S: Scope<Timestamp = T>, CB, L>(
79 self,
80 scope: &mut S,
81 name: &str,
82 period: Duration,
83 activator: A,
84 mut logic: L,
85 ) -> (StreamCore<S, CB::Container>, Rc<dyn Any>)
86 where
87 for<'a> CB: ContainerBuilder,
88 L: FnMut(Session<T, CB, PushCounter<T, CB::Container, Tee<T, CB::Container>>>, Cow<C>)
89 + 'static,
90 {
91 let name = format!("Replay {}", name);
92 let mut builder = OperatorBuilder::new(name, scope.clone());
93
94 let address = builder.operator_info().address;
95 let periodic_activator = scope.activator_for(Rc::clone(&address));
96
97 let (targets, stream) = builder.new_output();
98
99 let mut output: PushBuffer<_, CB, _> = PushBuffer::new(PushCounter::new(targets));
100 let mut event_streams = self.into_iter().collect::<Vec<_>>();
101 let mut started = false;
102
103 let mut last_active = Instant::now();
104
105 let mut progress_sofar =
106 <timely::progress::ChangeBatch<_>>::new_from(S::Timestamp::minimum(), 1);
107 let token = Rc::new(ActivateOnDrop::new(
108 (),
109 Rc::clone(&address),
110 scope.activations(),
111 ));
112 let weak_token = Rc::downgrade(&token);
113
114 activator.register(scope, address);
115
116 builder.build(move |progress| {
117 activator.ack();
118 if last_active
119 .checked_add(period)
120 .map_or(false, |next_active| next_active <= Instant::now())
121 || !started
122 {
123 last_active = Instant::now();
124 if period < Duration::MAX {
125 periodic_activator.activate_after(period);
126 }
127 }
128
129 if !started {
130 let len: i64 = event_streams
134 .len()
135 .try_into()
136 .expect("Implausibly large vector");
137 progress.internals[0].update(S::Timestamp::minimum(), len - 1);
138 progress_sofar.update(S::Timestamp::minimum(), len);
139 started = true;
140 }
141
142 if weak_token.upgrade().is_some() {
143 for event_stream in event_streams.iter_mut() {
144 while let Some(event) = event_stream.next() {
145 use Cow::*;
146 match event {
147 Owned(Event::Progress(vec)) => {
148 progress.internals[0].extend(vec.iter().cloned());
149 progress_sofar.extend(vec.into_iter());
150 }
151 Owned(Event::Messages(time, data)) => {
152 logic(output.session_with_builder(&time), Owned(data));
153 }
154 Borrowed(Event::Progress(vec)) => {
155 progress.internals[0].extend(vec.iter().cloned());
156 progress_sofar.extend(vec.iter().cloned());
157 }
158 Borrowed(Event::Messages(time, data)) => {
159 logic(output.session_with_builder(time), Borrowed(data));
160 }
161 }
162 }
163 }
164 } else {
165 progress.internals[0]
167 .extend(progress_sofar.drain().map(|(time, diff)| (time, -diff)));
168 }
169
170 output.cease();
171 output
172 .inner()
173 .produced()
174 .borrow_mut()
175 .drain_into(&mut progress.produceds[0]);
176
177 false
178 });
179
180 (stream, token)
181 }
182}