1use std::marker::PhantomData;
13
14use differential_dataflow::Hashable;
15use differential_dataflow::containers::TimelyStack;
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
19use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
20use differential_dataflow::trace::{Batcher, Builder, Description};
21use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
22use timely::container::PushInto;
23use timely::dataflow::channels::pact::Exchange;
24use timely::dataflow::operators::Operator;
25use timely::dataflow::{Scope, StreamCore};
26use timely::order::TotalOrder;
27use timely::progress::{Antichain, PathSummary, Timestamp};
28use timely::{ContainerBuilder, Data, ExchangeData, PartialOrder};
29
30use crate::typedefs::MzData;
31
32pub trait TemporalBucketing<G: Scope, O> {
36 fn bucket<CB>(
40 &self,
41 as_of: Antichain<G::Timestamp>,
42 threshold: <G::Timestamp as Timestamp>::Summary,
43 ) -> StreamCore<G, CB::Container>
44 where
45 CB: ContainerBuilder + PushInto<O>;
46}
47
48impl<G, D> TemporalBucketing<G, (D, G::Timestamp, mz_repr::Diff)>
50 for StreamCore<G, Vec<(D, G::Timestamp, mz_repr::Diff)>>
51where
52 G: Scope<Timestamp: ExchangeData + MzData + BucketTimestamp + TotalOrder + Lattice>,
53 D: ExchangeData + MzData + Ord + std::fmt::Debug + Hashable,
54{
55 fn bucket<CB>(
56 &self,
57 as_of: Antichain<G::Timestamp>,
58 threshold: <G::Timestamp as Timestamp>::Summary,
59 ) -> StreamCore<G, CB::Container>
60 where
61 CB: ContainerBuilder + PushInto<(D, G::Timestamp, mz_repr::Diff)>,
62 {
63 let scope = self.scope();
64 let logger = scope.logger_for("differential/arrange").map(Into::into);
65
66 let pact = Exchange::new(|(d, _, _): &(D, G::Timestamp, mz_repr::Diff)| d.hashed().into());
67 self.unary_frontier::<CB, _, _, _>(pact, "Temporal delay", |cap, info| {
68 let mut chain = BucketChain::new(MergeBatcherWrapper::new(logger, info.global_id));
69 let activator = scope.activator_for(info.address);
70
71 let mut cap = Some(cap);
73
74 let mut buffer = Vec::new();
76
77 move |(input, frontier), output| {
78 let mut upper = Antichain::new();
81 for time1 in &frontier.frontier() {
82 for time2 in as_of.elements() {
83 if let Some(time) = threshold.results_in(&time1.join(time2)) {
85 upper.insert(time);
86 }
87 }
88 }
89
90 input.for_each_time(|time, data| {
91 let mut session = output.session_with_builder(&time);
92 for data in data {
93 let pass_through = data.extract_if(.., |(_, t, _)| !upper.less_equal(t));
95 session.give_iterator(pass_through);
96
97 data.sort_unstable_by(|(_, t, _), (_, t2, _)| t.cmp(t2));
101
102 let mut drain = data.drain(..);
103 if let Some((datum, time, diff)) = drain.next() {
104 let mut range = chain.range_of(&time).expect("Must exist");
105 buffer.push((datum, time, diff));
106 for (datum, time, diff) in drain {
107 if !range.contains(&time) {
109 if !buffer.is_empty() {
112 let bucket =
113 chain.find_mut(&range.start).expect("Must exist");
114 bucket.inner.push_container(&mut buffer);
115 buffer.clear();
116 }
117 range = chain.range_of(&time).expect("Must exist");
118 }
119 buffer.push((datum, time, diff));
120 }
121
122 if !buffer.is_empty() {
124 let bucket = chain.find_mut(&range.start).expect("Must exist");
125 bucket.inner.push_container(&mut buffer);
126 buffer.clear();
127 }
128 }
129 }
130 });
131
132 let peeled = chain.peel(upper.borrow());
134 if let Some(cap) = cap.as_ref() {
135 let mut session = output.session_with_builder(cap);
136 for stack in peeled.into_iter().flat_map(|x| x.done()) {
137 session.give_iterator(stack.iter().cloned());
139 }
140 } else {
141 assert_eq!(
143 peeled.into_iter().flat_map(|x| x.done()).next(),
144 None,
145 "Unexpected data revealed without a cap."
146 );
147 }
148
149 if frontier.is_empty() || upper.is_empty() {
151 cap = None;
152 } else if let Some(cap) = cap.as_mut() {
153 cap.downgrade(&upper[0]);
155 }
156
157 let mut fuel = 1_000_000;
159 chain.restore(&mut fuel);
160 if fuel <= 0 {
161 activator.activate();
163 }
164 }
165 })
166 }
167}
168
169struct MergeBatcherWrapper<D, T, R>
171where
172 D: MzData + Ord,
173 T: MzData + Ord + PartialOrder + Clone,
174 R: MzData + Semigroup + Default,
175{
176 logger: Option<differential_dataflow::logging::Logger>,
177 operator_id: usize,
178 inner: MergeBatcher<Vec<(D, T, R)>, ColumnationChunker<(D, T, R)>, ColMerger<D, T, R>>,
179}
180
181impl<D, T, R> MergeBatcherWrapper<D, T, R>
182where
183 D: MzData + Ord + Clone,
184 T: MzData + Ord + PartialOrder + Clone + Timestamp,
185 R: MzData + Semigroup + Default,
186{
187 fn new(logger: Option<differential_dataflow::logging::Logger>, operator_id: usize) -> Self {
189 Self {
190 logger: logger.clone(),
191 operator_id,
192 inner: MergeBatcher::new(logger, operator_id),
193 }
194 }
195
196 fn done(mut self) -> Vec<TimelyStack<(D, T, R)>> {
198 self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
199 }
200}
201
202impl<D, T, R> Bucket for MergeBatcherWrapper<D, T, R>
203where
204 D: MzData + Ord + Data,
205 T: MzData + Ord + PartialOrder + Data + BucketTimestamp,
206 R: MzData + Semigroup + Default,
207{
208 type Timestamp = T;
209
210 fn split(mut self, timestamp: &Self::Timestamp, fuel: &mut i64) -> (Self, Self) {
211 let upper = Antichain::from_elem(timestamp.clone());
216 let mut lower = Self::new(self.logger.clone(), self.operator_id);
217 let mut buffer = Vec::new();
218 for chunk in self.inner.seal::<CapturingBuilder<_, _>>(upper) {
219 *fuel = fuel.saturating_sub(chunk.len().try_into().expect("must fit"));
220 buffer.extend(chunk.into_iter().cloned());
222 lower.inner.push_container(&mut buffer);
223 buffer.clear();
224 }
225 (lower, self)
226 }
227}
228
229struct CapturingBuilder<D, T>(D, PhantomData<T>);
230
231impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
232 type Input = D;
233 type Time = T;
234 type Output = Vec<D>;
235
236 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
237 unimplemented!()
239 }
240
241 fn push(&mut self, _chunk: &mut Self::Input) {
242 unimplemented!()
244 }
245
246 fn done(self, _description: Description<Self::Time>) -> Self::Output {
247 unimplemented!()
249 }
250
251 #[inline]
252 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
253 std::mem::take(chain)
254 }
255}