1use std::marker::PhantomData;
13
14use differential_dataflow::containers::TimelyStack;
15use differential_dataflow::difference::Semigroup;
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
18use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
19use differential_dataflow::trace::{Batcher, Builder, Description};
20use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
21use timely::container::{ContainerBuilder, PushInto};
22use timely::dataflow::channels::pact::Pipeline;
23use timely::dataflow::operators::Operator;
24use timely::dataflow::{Scope, StreamCore};
25use timely::order::TotalOrder;
26use timely::progress::{Antichain, PathSummary, Timestamp};
27use timely::{Data, PartialOrder};
28
29use crate::typedefs::MzData;
30
31pub trait TemporalBucketing<G: Scope, O> {
35 fn bucket<CB>(
39 &self,
40 as_of: Antichain<G::Timestamp>,
41 threshold: <G::Timestamp as Timestamp>::Summary,
42 ) -> StreamCore<G, CB::Container>
43 where
44 CB: ContainerBuilder + PushInto<O>;
45}
46
47impl<G, D> TemporalBucketing<G, (D, G::Timestamp, mz_repr::Diff)>
49 for StreamCore<G, Vec<(D, G::Timestamp, mz_repr::Diff)>>
50where
51 G: Scope,
52 G::Timestamp: Data + MzData + BucketTimestamp + TotalOrder + Lattice,
53 D: Data + MzData + Ord + std::fmt::Debug,
54{
55 fn bucket<CB>(
56 &self,
57 as_of: Antichain<G::Timestamp>,
58 threshold: <G::Timestamp as Timestamp>::Summary,
59 ) -> StreamCore<G, CB::Container>
60 where
61 CB: ContainerBuilder + PushInto<(D, G::Timestamp, mz_repr::Diff)>,
62 {
63 let scope = self.scope();
64 let logger = scope.logger_for("differential/arrange").map(Into::into);
65
66 self.unary_frontier::<CB, _, _, _>(Pipeline, "Temporal delay", |cap, info| {
67 let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
68 let activator = scope.activator_for(info.address);
69
70 let mut cap = Some(cap);
72
73 let mut buffer = Vec::new();
75
76 move |input, output| {
77 let mut upper = Antichain::new();
80 for time1 in &input.frontier().frontier() {
81 for time2 in as_of.elements() {
82 if let Some(time) = threshold.results_in(&time1.join(time2)) {
84 upper.insert(time);
85 }
86 }
87 }
88
89 while let Some((time, data)) = input.next() {
90 let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
92 output
93 .session_with_builder(&time)
94 .give_iterator(pass_through);
95
96 data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
100
101 let mut drain = data.drain(..);
102 if let Some((datum, time, diff)) = drain.next() {
103 let mut range = chain.range_of(&time).expect("Must exist");
104 buffer.push((datum, time, diff));
105 for (datum, time, diff) in drain {
106 if !range.contains(&time) {
108 if !buffer.is_empty() {
111 let bucket = chain.find_mut(&range.start).expect("Must exist");
112 bucket.inner.push_container(&mut buffer);
113 buffer.clear();
114 }
115 range = chain.range_of(&time).expect("Must exist");
116 }
117 buffer.push((datum, time, diff));
118 }
119
120 if !buffer.is_empty() {
122 let bucket = chain.find_mut(&range.start).expect("Must exist");
123 bucket.inner.push_container(&mut buffer);
124 buffer.clear();
125 }
126 }
127 }
128
129 let peeled = chain.peel(upper.borrow());
131 if let Some(cap) = cap.as_ref() {
132 let mut session = output.session_with_builder(cap);
133 for stack in peeled.into_iter().flat_map(|x| x.done()) {
134 session.give_iterator(stack.iter().cloned());
136 }
137 } else {
138 assert_eq!(
140 peeled.into_iter().flat_map(|x| x.done()).next(),
141 None,
142 "Unexpected data revealed without a cap."
143 );
144 }
145
146 if input.frontier().is_empty() || upper.is_empty() {
148 cap = None;
149 } else if let Some(cap) = cap.as_mut() {
150 cap.downgrade(&upper[0]);
152 }
153
154 let mut fuel = 1_000_000;
156 chain.restore(&mut fuel);
157 if fuel <= 0 {
158 activator.activate();
160 }
161 }
162 })
163 }
164}
165
166struct MergeBatcherWrapper<D, T, R>
168where
169 D: MzData + Ord,
170 T: MzData + Ord + PartialOrder + Clone,
171 R: MzData + Semigroup + Default,
172{
173 logger: Option<differential_dataflow::logging::Logger>,
174 operator_id: usize,
175 inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColMerger<D, T, R>>,
176}
177
178impl<D, T, R> MergeBatcherWrapper<D, T, R>
179where
180 D: MzData + Ord + Clone,
181 T: MzData + Ord + PartialOrder + Clone + Timestamp,
182 R: MzData + Semigroup + Default,
183{
184 fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
186 Self {
187 logger: logger.clone(),
188 operator_id,
189 inner: MergeBatcher::new(logger, operator_id),
190 }
191 }
192
193 fn done(mut self) -> Vec<TimelyStack<(D, T, R)>> {
195 self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
196 }
197}
198
199impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
200where
201 D: MzData + Ord + Data,
202 T: MzData + Ord + PartialOrder + Data + BucketTimestamp,
203 R: MzData + Semigroup + Default,
204{
205 type Timestamp = T;
206
207 fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
208 let upper = Antichain::from_elem(timestamp.clone());
213 let mut lower = Self::new(self.logger.clone(), self.operator_id);
214 let mut buffer = Vec::new();
215 for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
216 *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
217 buffer.extend(chunk.into_iter().cloned());
219 lower.inner.push_container(&mut buffer);
220 buffer.clear();
221 }
222 (lower, self)
223 }
224}
225
226struct CapturingBuilder<D, T>(D, PhantomData<T>);
227
228impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
229 type Input = D;
230 type Time = T;
231 type Output = Vec<D>;
232
233 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
234 unimplemented!()
236 }
237
238 fn push(&mut self, _chunk: &mut Self::Input) {
239 unimplemented!()
241 }
242
243 fn done(self, _description: Description<Self::Time>) -> Self::Output {
244 unimplemented!()
246 }
247
248 #[inline]
249 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
250 std::mem::take(chain)
251 }
252}