1use differential_dataflow::Hashable;
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::trace::Batcher;
16use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
17use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack};
18use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
19use timely::container::PushInto;
20use timely::dataflow::channels::pact::Exchange;
21use timely::dataflow::operators::Operator;
22use timely::dataflow::{Stream, StreamVec};
23use timely::order::TotalOrder;
24use timely::progress::{Antichain, PathSummary, Timestamp};
25use timely::{ContainerBuilder, ExchangeData, PartialOrder};
26
27use crate::typedefs::MzData;
28
29pub trait TemporalBucketing<'scope, T: Timestamp, O> {
33 fn bucket<CB>(
37 self,
38 as_of: Antichain<T>,
39 threshold: T::Summary,
40 ) -> Stream<'scope, T, CB::Container>
41 where
42 CB: ContainerBuilder + PushInto<O>;
43}
44
45impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)>
47 for StreamVec<'scope, T, (D, T, mz_repr::Diff)>
48where
49 T: Timestamp + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice,
50 D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
51{
52 fn bucket<CB>(
53 self,
54 as_of: Antichain<T>,
55 threshold: T::Summary,
56 ) -> Stream<'scope, T, CB::Container>
57 where
58 CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>,
59 {
60 let scope = self.scope();
61 let logger = scope
62 .worker()
63 .logger_for("differential/arrange")
64 .map(Into::into);
65
66 let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into());
67 self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
68 let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
69 let activator = scope.activator_for(info.address);
70
71 let mut cap = Some(cap);
73
74 let mut buffer = Vec::new();
76
77 move |(input, frontier), output| {
78 let mut upper = Antichain::new();
81 for time1 in &frontier.frontier() {
82 for time2 in as_of.elements() {
83 if let Some(time) = threshold.results_in(&time1.join(time2)) {
85 upper.insert(time);
86 }
87 }
88 }
89
90 input.for_each_time(|time, data| {
91 let mut session = output.session_with_builder(&time);
92 for data in data {
93 let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
95 session.give_iterator(pass_through);
96
97 data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
101
102 let mut drain = data.drain(..);
103 if let Some((datum, time, diff)) = drain.next() {
104 let mut range = chain.range_of(&time).expect("Must exist");
105 buffer.push((datum, time, diff));
106 for (datum, time, diff) in drain {
107 if !range.contains(&time) {
109 if !buffer.is_empty() {
112 let bucket =
113 chain.find_mut(&range.start).expect("Must exist");
114 bucket.push_container(&mut buffer);
115 buffer.clear();
116 }
117 range = chain.range_of(&time).expect("Must exist");
118 }
119 buffer.push((datum, time, diff));
120 }
121
122 if !buffer.is_empty() {
124 let bucket = chain.find_mut(&range.start).expect("Must exist");
125 bucket.push_container(&mut buffer);
126 buffer.clear();
127 }
128 }
129 }
130 });
131
132 let peeled = chain.peel(upper.borrow());
134 if let Some(cap) = cap.as_ref() {
135 let mut session = output.session_with_builder(cap);
136 for stack in peeled.into_iter().flat_map(|x| x.done()) {
137 session.give_iterator(stack.iter().cloned());
139 }
140 } else {
141 assert_eq!(
143 peeled.into_iter().flat_map(|x| x.done()).next(),
144 None,
145 "Unexpected data revealed without a cap."
146 );
147 }
148
149 if frontier.is_empty() || upper.is_empty() {
151 cap = None;
152 } else if let Some(cap) = cap.as_mut() {
153 cap.downgrade(&upper[0]);
155 }
156
157 let mut fuel = 1_000_000;
159 chain.restore(&mut fuel);
160 if fuel <= 0 {
161 activator.activate();
163 }
164 }
165 })
166 }
167}
168
169struct MergeBatcherWrapper<D, T, R>
175where
176 D: MzData + Ord + Clone,
177 T: MzData + Ord + PartialOrder + Clone,
178 R: MzData + Semigroup + Default,
179{
180 logger: Option<differential_dataflow::logging::Logger>,
181 operator_id: usize,
182 chunker: ColumnationChunker<(D, T, R)>,
183 inner: MergeBatcher<ColInternalMerger<D, T, R>>,
184}
185
186impl<D, T, R> MergeBatcherWrapper<D, T, R>
187where
188 D: MzData + Ord + Clone + 'static,
189 T: MzData + Ord + PartialOrder + Clone + Timestamp,
190 R: MzData + Semigroup + Default + 'static,
191{
192 fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
194 Self {
195 logger: logger.clone(),
196 operator_id,
197 chunker: ColumnationChunker::default(),
198 inner: MergeBatcher::new(logger, operator_id),
199 }
200 }
201
202 fn push_container(&mut self, buffer: &mut Vec<(D, T, R)>) {
205 use timely::container::ContainerBuilder as _;
206 self.chunker.push_into(buffer);
207 while let Some(chunk) = self.chunker.extract() {
208 self.inner.push_into(std::mem::take(chunk));
209 }
210 }
211
212 fn flush(&mut self) {
214 use timely::container::ContainerBuilder as _;
215 while let Some(chunk) = self.chunker.finish() {
216 self.inner.push_into(std::mem::take(chunk));
217 }
218 }
219
220 fn done(mut self) -> Vec<ColumnationStack<(D, T, R)>> {
222 self.flush();
223 let (chain, _description) = self.inner.seal(Antichain::new());
224 chain
225 }
226}
227
228impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
229where
230 D: MzData + Ord + Clone + 'static,
231 T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp,
232 R: MzData + Semigroup + Default + 'static,
233{
234 type Timestamp = T;
235
236 fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
237 self.flush();
242 let upper = Antichain::from_elem(timestamp.clone());
243 let mut lower = Self::new(self.logger.clone(), self.operator_id);
244 let mut buffer = Vec::new();
245 let (chain, _description) = self.inner.seal(upper);
246 for chunk in chain {
247 *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
248 buffer.extend(chunk.into_iter().cloned());
250 lower.push_container(&mut buffer);
251 buffer.clear();
252 }
253 (lower, self)
254 }
255}