1use std::marker::PhantomData;
13
14use differential_dataflow::Hashable;
15use differential_dataflow::difference::Semigroup;
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
18use differential_dataflow::trace::{Batcher, Builder, Description};
19use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack};
20use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
21use timely::container::PushInto;
22use timely::dataflow::channels::pact::Exchange;
23use timely::dataflow::operators::Operator;
24use timely::dataflow::{Stream, StreamVec};
25use timely::order::TotalOrder;
26use timely::progress::{Antichain, PathSummary, Timestamp};
27use timely::{ContainerBuilder, ExchangeData, PartialOrder};
28
29use crate::typedefs::MzData;
30
31pub trait TemporalBucketing<'scope, T: Timestamp, O> {
35 fn bucket<CB>(
39 self,
40 as_of: Antichain<T>,
41 threshold: T::Summary,
42 ) -> Stream<'scope, T, CB::Container>
43 where
44 CB: ContainerBuilder + PushInto<O>;
45}
46
47impl<'scope, T, D> TemporalBucketing<'scope, T, (D, T, mz_repr::Diff)>
49 for StreamVec<'scope, T, (D, T, mz_repr::Diff)>
50where
51 T: Timestamp + ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice,
52 D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
53{
54 fn bucket<CB>(
55 self,
56 as_of: Antichain<T>,
57 threshold: T::Summary,
58 ) -> Stream<'scope, T, CB::Container>
59 where
60 CB: ContainerBuilder + PushInto<(D, T, mz_repr::Diff)>,
61 {
62 let scope = self.scope();
63 let logger = scope
64 .worker()
65 .logger_for("differential/arrange")
66 .map(Into::into);
67
68 let pact = Exchange::new(|(d, _, _): &(D, T, mz_repr::Diff)| d.hashed().into());
69 self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
70 let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
71 let activator = scope.activator_for(info.address);
72
73 let mut cap = Some(cap);
75
76 let mut buffer = Vec::new();
78
79 move |(input, frontier), output| {
80 let mut upper = Antichain::new();
83 for time1 in &frontier.frontier() {
84 for time2 in as_of.elements() {
85 if let Some(time) = threshold.results_in(&time1.join(time2)) {
87 upper.insert(time);
88 }
89 }
90 }
91
92 input.for_each_time(|time, data| {
93 let mut session = output.session_with_builder(&time);
94 for data in data {
95 let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
97 session.give_iterator(pass_through);
98
99 data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
103
104 let mut drain = data.drain(..);
105 if let Some((datum, time, diff)) = drain.next() {
106 let mut range = chain.range_of(&time).expect("Must exist");
107 buffer.push((datum, time, diff));
108 for (datum, time, diff) in drain {
109 if !range.contains(&time) {
111 if !buffer.is_empty() {
114 let bucket =
115 chain.find_mut(&range.start).expect("Must exist");
116 bucket.inner.push_container(&mut buffer);
117 buffer.clear();
118 }
119 range = chain.range_of(&time).expect("Must exist");
120 }
121 buffer.push((datum, time, diff));
122 }
123
124 if !buffer.is_empty() {
126 let bucket = chain.find_mut(&range.start).expect("Must exist");
127 bucket.inner.push_container(&mut buffer);
128 buffer.clear();
129 }
130 }
131 }
132 });
133
134 let peeled = chain.peel(upper.borrow());
136 if let Some(cap) = cap.as_ref() {
137 let mut session = output.session_with_builder(cap);
138 for stack in peeled.into_iter().flat_map(|x| x.done()) {
139 session.give_iterator(stack.iter().cloned());
141 }
142 } else {
143 assert_eq!(
145 peeled.into_iter().flat_map(|x| x.done()).next(),
146 None,
147 "Unexpected data revealed without a cap."
148 );
149 }
150
151 if frontier.is_empty() || upper.is_empty() {
153 cap = None;
154 } else if let Some(cap) = cap.as_mut() {
155 cap.downgrade(&upper[0]);
157 }
158
159 let mut fuel = 1_000_000;
161 chain.restore(&mut fuel);
162 if fuel <= 0 {
163 activator.activate();
165 }
166 }
167 })
168 }
169}
170
171struct MergeBatcherWrapper<D, T, R>
173where
174 D: MzData + Ord + Clone,
175 T: MzData + Ord + PartialOrder + Clone,
176 R: MzData + Semigroup + Default,
177{
178 logger: Option<differential_dataflow::logging::Logger>,
179 operator_id: usize,
180 inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColInternalMerger<D, T, R>>,
181}
182
183impl<D, T, R> MergeBatcherWrapper<D, T, R>
184where
185 D: MzData + Ord + Clone,
186 T: MzData + Ord + PartialOrder + Clone + Timestamp,
187 R: MzData + Semigroup + Default,
188{
189 fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
191 Self {
192 logger: logger.clone(),
193 operator_id,
194 inner: MergeBatcher::new(logger, operator_id),
195 }
196 }
197
198 fn done(mut self) -> Vec<ColumnationStack<(D, T, R)>> {
200 self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
201 }
202}
203
204impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
205where
206 D: MzData + Ord + Clone + 'static,
207 T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp,
208 R: MzData + Semigroup + Default,
209{
210 type Timestamp = T;
211
212 fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
213 let upper = Antichain::from_elem(timestamp.clone());
218 let mut lower = Self::new(self.logger.clone(), self.operator_id);
219 let mut buffer = Vec::new();
220 for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
221 *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
222 buffer.extend(chunk.into_iter().cloned());
224 lower.inner.push_container(&mut buffer);
225 buffer.clear();
226 }
227 (lower, self)
228 }
229}
230
231struct CapturingBuilder<D, T>(D, PhantomData<T>);
232
233impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
234 type Input = D;
235 type Time = T;
236 type Output = Vec<D>;
237
238 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
239 unimplemented!()
241 }
242
243 fn push(&mut self, _chunk: &mut Self::Input) {
244 unimplemented!()
246 }
247
248 fn done(self, _description: Description<Self::Time>) -> Self::Output {
249 unimplemented!()
251 }
252
253 #[inline]
254 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
255 std::mem::take(chain)
256 }
257}