1use columnar::{Columnar, Index, Len};
13use differential_dataflow::Hashable;
14use differential_dataflow::difference::Semigroup;
15use differential_dataflow::lattice::Lattice;
16use differential_dataflow::trace::Batcher;
17use mz_timely_util::columnar::Column;
18use mz_timely_util::columnar::batcher::ColumnChunker;
19use mz_timely_util::columnar::merge_batcher::ColumnMergeBatcher;
20use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
21use timely::Accountable;
22use timely::container::PushInto;
23use timely::dataflow::channels::pact::Exchange;
24use timely::dataflow::operators::Operator;
25use timely::dataflow::{Stream, StreamVec};
26use timely::order::TotalOrder;
27use timely::progress::{Antichain, PathSummary, Timestamp};
28use timely::{ContainerBuilder, ExchangeData, PartialOrder};
29
30use crate::typedefs::MzData;
31
32pub trait TemporalBucketing<'scope, T: Timestamp, O> {
36 fn bucket<CB>(
40 self,
41 as_of: Antichain<T>,
42 threshold: T::Summary,
43 ) -> Stream<'scope, T, CB::Container>
44 where
45 CB: ContainerBuilder + PushInto<O>;
46}
47
48impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)>
50 for StreamVec<'scope, T, (D, T, mz_repr::Diff)>
51where
52 T: Timestamp + Default + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice,
53 D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
54{
55 fn bucket<CB>(
56 self,
57 as_of: Antichain<T>,
58 threshold: T::Summary,
59 ) -> Stream<'scope, T, CB::Container>
60 where
61 CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>,
62 {
63 let scope = self.scope();
64 let logger = scope
65 .worker()
66 .logger_for("differential/arrange")
67 .map(Into::into);
68
69 let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into());
70 self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
71 let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
72 let activator = scope.activator_for(info.address);
73
74 let mut cap = Some(cap);
76
77 let mut buffer = Vec::new();
79
80 move |(input, frontier), output| {
81 let mut upper = Antichain::new();
84 for time1 in &frontier.frontier() {
85 for time2 in as_of.elements() {
86 if let Some(time) = threshold.results_in(&time1.join(time2)) {
88 upper.insert(time);
89 }
90 }
91 }
92
93 input.for_each_time(|time, data| {
94 let mut session = output.session_with_builder(&time);
95 for data in data {
96 let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
98 session.give_iterator(pass_through);
99
100 data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
104
105 let mut drain = data.drain(..);
106 if let Some((datum, time, diff)) = drain.next() {
107 let mut range = chain.range_of(&time).expect("Must exist");
108 buffer.push((datum, time, diff));
109 for (datum, time, diff) in drain {
110 if !range.contains(&time) {
112 if !buffer.is_empty() {
115 let bucket =
116 chain.find_mut(&range.start).expect("Must exist");
117 bucket.push_container(&mut buffer);
118 buffer.clear();
119 }
120 range = chain.range_of(&time).expect("Must exist");
121 }
122 buffer.push((datum, time, diff));
123 }
124
125 if !buffer.is_empty() {
127 let bucket = chain.find_mut(&range.start).expect("Must exist");
128 bucket.push_container(&mut buffer);
129 buffer.clear();
130 }
131 }
132 }
133 });
134
135 let peeled = chain.peel(upper.borrow());
137 if let Some(cap) = cap.as_ref() {
138 let mut session = output.session_with_builder(cap);
139 for chunk in peeled.into_iter().flat_map(|x| x.done()) {
140 session.give_iterator(
144 chunk
145 .borrow()
146 .into_index_iter()
147 .map(<(D, T, mz_repr::Diff)>::into_owned),
148 );
149 }
150 } else {
151 assert!(
153 peeled
154 .into_iter()
155 .flat_map(|x| x.done())
156 .all(|chunk| chunk.record_count() == 0),
157 "Unexpected data revealed without a cap."
158 );
159 }
160
161 if frontier.is_empty() || upper.is_empty() {
163 cap = None;
164 } else if let Some(cap) = cap.as_mut() {
165 cap.downgrade(&upper[0]);
167 }
168
169 let mut fuel = 1_000_000;
171 chain.restore(&mut fuel);
172 if fuel <= 0 {
173 activator.activate();
175 }
176 }
177 })
178 }
179}
180
181struct MergeBatcherWrapper<D, T, R>
189where
190 D: MzData + Ord + Clone,
191 T: MzData + Ord + PartialOrder + Clone,
192 R: MzData + Semigroup + Default,
193{
194 logger: Option<differential_dataflow::logging::Logger>,
195 operator_id: usize,
196 chunker: ColumnChunker<(D, T, R)>,
197 inner: ColumnMergeBatcher<D, T, R>,
198}
199
200impl<D, T, R> MergeBatcherWrapper<D, T, R>
201where
202 D: MzData + Ord + Clone + 'static,
203 T: MzData + Ord + PartialOrder + Clone + Default + Timestamp,
204 R: MzData + Semigroup + Default + 'static + for<'a> Semigroup<columnar::Ref<'a, R>>,
205 for<'a> columnar::Ref<'a, R>: Ord,
206 for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
207 for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
208 for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
209 for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
210{
211 fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
213 Self {
214 logger: logger.clone(),
215 operator_id,
216 chunker: ColumnChunker::default(),
217 inner: ColumnMergeBatcher::new(logger, operator_id),
218 }
219 }
220
221 fn push_container(&mut self, buffer: &mut Vec<(D, T, R)>) {
224 use timely::container::{ContainerBuilder as _, PushInto as _};
225 if buffer.is_empty() {
226 return;
227 }
228 let mut raw: Column<(D, T, R)> = Default::default();
231 for update in buffer.drain(..) {
232 raw.push_into(&update);
233 }
234 self.chunker.push_into(&mut raw);
235 while let Some(chunk) = self.chunker.extract() {
236 self.inner.push_into(std::mem::take(chunk));
237 }
238 }
239
240 fn flush(&mut self) {
242 use timely::container::ContainerBuilder as _;
243 while let Some(chunk) = self.chunker.finish() {
244 self.inner.push_into(std::mem::take(chunk));
245 }
246 }
247
248 fn done(mut self) -> Vec<Column<(D, T, R)>> {
250 self.flush();
251 let (chain, _description) = self.inner.seal(Antichain::new());
252 chain
253 }
254}
255
256impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
257where
258 D: MzData + Ord + Clone + 'static,
259 T: MzData + Ord + PartialOrder + Clone + Default + 'static + BucketTimestamp,
260 R: MzData + Semigroup + Default + 'static + for<'a> Semigroup<columnar::Ref<'a, R>>,
261 for<'a> columnar::Ref<'a, R>: Ord,
262 for<'a> <D as Columnar>::Container: columnar::Push<columnar::Ref<'a, D>>,
263 for<'a> <T as Columnar>::Container: columnar::Push<columnar::Ref<'a, T>>,
264 for<'a> <R as Columnar>::Container: columnar::Push<&'a R>,
265 for<'a> <(D, T, R) as Columnar>::Container: columnar::Push<&'a (D, T, R)>,
266{
267 type Timestamp = T;
268
269 fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
270 self.flush();
274 let upper = Antichain::from_elem(timestamp.clone());
275 let mut lower = Self::new(self.logger.clone(), self.operator_id);
276 let mut buffer = Vec::new();
277 let (chain, _description) = self.inner.seal(upper);
278 for chunk in chain {
279 *fuel = fuel.saturating_sub(chunk.record_count());
280 buffer.extend(
282 chunk
283 .borrow()
284 .into_index_iter()
285 .map(<(D, T, R)>::into_owned),
286 );
287 lower.push_container(&mut buffer);
288 buffer.clear();
289 }
290 (lower, self)
291 }
292}