1use std::marker::PhantomData;
13
14use differential_dataflow::Hashable;
15use differential_dataflow::containers::TimelyStack;
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
19use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
20use differential_dataflow::trace::implementations::merge_batcher::container::ColInternalMerger;
21use differential_dataflow::trace::{Batcher, Builder, Description};
22use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
23use timely::container::PushInto;
24use timely::dataflow::channels::pact::Exchange;
25use timely::dataflow::operators::Operator;
26use timely::dataflow::{Scope, Stream, StreamVec};
27use timely::order::TotalOrder;
28use timely::progress::{Antichain, PathSummary, Timestamp};
29use timely::{ContainerBuilder, ExchangeData, PartialOrder};
30
31use crate::typedefs::MzData;
32
33pub trait TemporalBucketing<G: Scope, O> {
37 fn bucket<CB>(
41 self,
42 as_of: Antichain<G::Timestamp>,
43 threshold: <G::Timestamp as Timestamp>::Summary,
44 ) -> Stream<G, CB::Container>
45 where
46 CB: ContainerBuilder + PushInto<O>;
47}
48
49impl<G, D> TemporalBucketing<G, (D, G::Timestamp, mz_repr::Diff)>
51 for StreamVec<G, (D, G::Timestamp, mz_repr::Diff)>
52where
53 G: Scope<Timestamp: ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice>,
54 D: ExchangeData + MzData + Ord + Clone + std::fmt::Debug + Hashable,
55{
56 fn bucket<CB>(
57 self,
58 as_of: Antichain<G::Timestamp>,
59 threshold: <G::Timestamp as Timestamp>::Summary,
60 ) -> Stream<G, CB::Container>
61 where
62 CB: ContainerBuilder + PushInto<(D, G::Timestamp, mz_repr::Diff)>,
63 {
64 let scope = self.scope();
65 let logger = scope.logger_for("differential/arrange").map(Into::into);
66
67 let pact = Exchange::new(|(d, _, _): &(D, G::Timestamp, mz_repr::Diff)| d.hashed().into());
68 self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
69 let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
70 let activator = scope.activator_for(info.address);
71
72 let mut cap = Some(cap);
74
75 let mut buffer = Vec::new();
77
78 move |(input, frontier), output| {
79 let mut upper = Antichain::new();
82 for time1 in &frontier.frontier() {
83 for time2 in as_of.elements() {
84 if let Some(time) = threshold.results_in(&time1.join(time2)) {
86 upper.insert(time);
87 }
88 }
89 }
90
91 input.for_each_time(|time, data| {
92 let mut session = output.session_with_builder(&time);
93 for data in data {
94 let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
96 session.give_iterator(pass_through);
97
98 data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
102
103 let mut drain = data.drain(..);
104 if let Some((datum, time, diff)) = drain.next() {
105 let mut range = chain.range_of(&time).expect("Must exist");
106 buffer.push((datum, time, diff));
107 for (datum, time, diff) in drain {
108 if !range.contains(&time) {
110 if !buffer.is_empty() {
113 let bucket =
114 chain.find_mut(&range.start).expect("Must exist");
115 bucket.inner.push_container(&mut buffer);
116 buffer.clear();
117 }
118 range = chain.range_of(&time).expect("Must exist");
119 }
120 buffer.push((datum, time, diff));
121 }
122
123 if !buffer.is_empty() {
125 let bucket = chain.find_mut(&range.start).expect("Must exist");
126 bucket.inner.push_container(&mut buffer);
127 buffer.clear();
128 }
129 }
130 }
131 });
132
133 let peeled = chain.peel(upper.borrow());
135 if let Some(cap) = cap.as_ref() {
136 let mut session = output.session_with_builder(cap);
137 for stack in peeled.into_iter().flat_map(|x| x.done()) {
138 session.give_iterator(stack.iter().cloned());
140 }
141 } else {
142 assert_eq!(
144 peeled.into_iter().flat_map(|x| x.done()).next(),
145 None,
146 "Unexpected data revealed without a cap."
147 );
148 }
149
150 if frontier.is_empty() || upper.is_empty() {
152 cap = None;
153 } else if let Some(cap) = cap.as_mut() {
154 cap.downgrade(&upper[0]);
156 }
157
158 let mut fuel = 1_000_000;
160 chain.restore(&mut fuel);
161 if fuel <= 0 {
162 activator.activate();
164 }
165 }
166 })
167 }
168}
169
170struct MergeBatcherWrapper<D, T, R>
172where
173 D: MzData + Ord + Clone,
174 T: MzData + Ord + PartialOrder + Clone,
175 R: MzData + Semigroup + Default,
176{
177 logger: Option<differential_dataflow::logging::Logger>,
178 operator_id: usize,
179 inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColInternalMerger<D, T, R>>,
180}
181
182impl<D, T, R> MergeBatcherWrapper<D, T, R>
183where
184 D: MzData + Ord + Clone,
185 T: MzData + Ord + PartialOrder + Clone + Timestamp,
186 R: MzData + Semigroup + Default,
187{
188 fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
190 Self {
191 logger: logger.clone(),
192 operator_id,
193 inner: MergeBatcher::new(logger, operator_id),
194 }
195 }
196
197 fn done(mut self) -> Vec<TimelyStack<(D, T, R)>> {
199 self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
200 }
201}
202
203impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
204where
205 D: MzData + Ord + Clone + 'static,
206 T: MzData + Ord + PartialOrder + Clone + 'static + BucketTimestamp,
207 R: MzData + Semigroup + Default,
208{
209 type Timestamp = T;
210
211 fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
212 let upper = Antichain::from_elem(timestamp.clone());
217 let mut lower = Self::new(self.logger.clone(), self.operator_id);
218 let mut buffer = Vec::new();
219 for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
220 *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
221 buffer.extend(chunk.into_iter().cloned());
223 lower.inner.push_container(&mut buffer);
224 buffer.clear();
225 }
226 (lower, self)
227 }
228}
229
230struct CapturingBuilder<D, T>(D, PhantomData<T>);
231
232impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
233 type Input = D;
234 type Time = T;
235 type Output = Vec<D>;
236
237 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
238 unimplemented!()
240 }
241
242 fn push(&mut self, _chunk: &mut Self::Input) {
243 unimplemented!()
245 }
246
247 fn done(self, _description: Description<Self::Time>) -> Self::Output {
248 unimplemented!()
250 }
251
252 #[inline]
253 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
254 std::mem::take(chain)
255 }
256}