1use std::collections::BTreeMap;
11use std::rc::Rc;
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection};
20use timely::Container;
21use timely::container::{ContainerBuilder, PushInto};
22use timely::dataflow::Stream;
23use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
24use timely::dataflow::operators::Operator;
25use timely::progress::Timestamp;
26
27use crate::logging::compute::{
28 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
29 ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
30};
31use crate::typedefs::{
32 KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
33};
34
35pub trait MzArrange<'scope>: MzArrangeCore<'scope> {
37 fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
43 where
44 Ba: Batcher<Time = Self::Timestamp> + 'static,
45 Chu: ContainerBuilder<Container = Ba::Output>
46 + for<'a> PushInto<&'a mut Self::Input>
47 + 'static,
48 Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
49 Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
50 Tr::Batch: Batch,
51 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
52}
53
54pub trait MzArrangeCore<'scope> {
56 type Timestamp: Timestamp + Lattice;
58 type Input: Container + Clone + 'static;
60
61 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
68 self,
69 pact: P,
70 name: &str,
71 ) -> Arranged<'scope, TraceAgent<Tr>>
72 where
73 P: ParallelizationContract<Self::Timestamp, Self::Input>,
74 Ba: Batcher<Time = Self::Timestamp> + 'static,
75 Chu: ContainerBuilder<Container = Ba::Output>
76 + for<'a> PushInto<&'a mut Self::Input>
77 + 'static,
78 Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
79 Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
80 Tr::Batch: Batch,
81 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
82}
83
84impl<'scope, T, C> MzArrangeCore<'scope> for Stream<'scope, T, C>
85where
86 T: Timestamp + Lattice,
87 C: Container + Clone + 'static,
88{
89 type Timestamp = T;
90 type Input = C;
91
92 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
93 self,
94 pact: P,
95 name: &str,
96 ) -> Arranged<'scope, TraceAgent<Tr>>
97 where
98 P: ParallelizationContract<T, Self::Input>,
99 Ba: Batcher<Time = T> + 'static,
100 Chu: ContainerBuilder<Container = Ba::Output>
101 + for<'a> PushInto<&'a mut Self::Input>
102 + 'static,
103 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
104 Tr: Trace + TraceReader<Time = T> + 'static,
105 Tr::Batch: Batch,
106 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
107 {
108 #[allow(clippy::disallowed_methods)]
110 arrange_core::<_, _, Chu, Ba, Bu, _>(self, pact, name).log_arrangement_size()
111 }
112}
113
114impl<'scope, T, K, V, R> MzArrange<'scope> for VecCollection<'scope, T, (K, V), R>
115where
116 T: Timestamp + Lattice,
117 K: ExchangeData + Hashable,
118 V: ExchangeData,
119 R: ExchangeData,
120{
121 fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
122 where
123 Ba: Batcher<Time = T> + 'static,
124 Chu: ContainerBuilder<Container = Ba::Output>
125 + for<'a> PushInto<&'a mut Self::Input>
126 + 'static,
127 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
128 Tr: Trace + TraceReader<Time = T> + 'static,
129 Tr::Batch: Batch,
130 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
131 {
132 let exchange = Exchange::new(move |update: &((K, V), T, R)| (update.0).0.hashed().into());
133 self.mz_arrange_core::<_, Chu, Ba, Bu, _>(exchange, name)
134 }
135}
136
137impl<'scope, T, C> MzArrangeCore<'scope> for Collection<'scope, T, C>
138where
139 T: Timestamp + Lattice,
140 C: Container + Clone + 'static,
141{
142 type Timestamp = T;
143 type Input = C;
144
145 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
146 self,
147 pact: P,
148 name: &str,
149 ) -> Arranged<'scope, TraceAgent<Tr>>
150 where
151 P: ParallelizationContract<T, Self::Input>,
152 Ba: Batcher<Time = T> + 'static,
153 Chu: ContainerBuilder<Container = Ba::Output>
154 + for<'a> PushInto<&'a mut Self::Input>
155 + 'static,
156 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
157 Tr: Trace + TraceReader<Time = T> + 'static,
158 Tr::Batch: Batch,
159 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
160 {
161 self.inner.mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
162 }
163}
164
165pub struct KeyCollection<'scope, T: Timestamp, K: 'static, R: 'static = usize>(
169 VecCollection<'scope, T, K, R>,
170);
171
172impl<'scope, T: Timestamp, K, R: Semigroup> From<VecCollection<'scope, T, K, R>>
173 for KeyCollection<'scope, T, K, R>
174{
175 fn from(value: VecCollection<'scope, T, K, R>) -> Self {
176 KeyCollection(value)
177 }
178}
179
180impl<'scope, T, K, R> MzArrange<'scope> for KeyCollection<'scope, T, K, R>
181where
182 T: Timestamp + Lattice,
183 K: ExchangeData + Hashable,
184 R: ExchangeData,
185{
186 fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
187 where
188 Ba: Batcher<Time = T> + 'static,
189 Chu: ContainerBuilder<Container = Ba::Output>
190 + for<'a> PushInto<&'a mut Self::Input>
191 + 'static,
192 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
193 Tr: Trace + TraceReader<Time = T> + 'static,
194 Tr::Batch: Batch,
195 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
196 {
197 self.0.map(|d| (d, ())).mz_arrange::<Chu, Ba, Bu, _>(name)
198 }
199}
200
201impl<'scope, T, K, R> MzArrangeCore<'scope> for KeyCollection<'scope, T, K, R>
202where
203 T: Timestamp + Lattice,
204 K: Clone + 'static,
205 R: Clone + 'static,
206{
207 type Timestamp = T;
208 type Input = Vec<((K, ()), T, R)>;
209
210 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
211 self,
212 pact: P,
213 name: &str,
214 ) -> Arranged<'scope, TraceAgent<Tr>>
215 where
216 P: ParallelizationContract<T, Self::Input>,
217 Ba: Batcher<Time = T> + 'static,
218 Chu: ContainerBuilder<Container = Ba::Output>
219 + for<'a> PushInto<&'a mut Self::Input>
220 + 'static,
221 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
222 Tr: Trace + TraceReader<Time = T> + 'static,
223 Tr::Batch: Batch,
224 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
225 {
226 self.0
227 .map(|d| (d, ()))
228 .mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
229 }
230}
231
232pub trait ArrangementSize {
234 fn log_arrangement_size(self) -> Self;
236}
237
238fn log_arrangement_size_inner<'scope, B, L>(
244 arranged: Arranged<'scope, TraceAgent<Spine<Rc<B>>>>,
245 mut logic: L,
246) -> Arranged<'scope, TraceAgent<Spine<Rc<B>>>>
247where
248 B: Batch + 'static,
249 L: FnMut(&B) -> (usize, usize, usize) + 'static,
250{
251 let scope = arranged.stream.scope();
252 let Some(logger) = scope
253 .worker()
254 .logger_for::<ComputeEventBuilder>("materialize/compute")
255 else {
256 return arranged;
257 };
258 let operator_id = arranged.trace.operator().global_id;
259 let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
260
261 let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
262
263 let stream = arranged
264 .stream
265 .unary(Pipeline, "ArrangementSize", |_cap, info| {
266 let address = info.address;
267 logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
268 ArrangementHeapSizeOperator {
269 operator_id,
270 address: address.to_vec(),
271 },
272 ));
273
274 let mut batches = BTreeMap::new();
276
277 move |input, output| {
278 input.for_each(|time, data| {
279 batches.extend(
280 data.iter()
281 .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))),
282 );
283 output.session(&time).give_container(data);
284 });
285 let Some(trace) = trace.upgrade() else {
286 return;
287 };
288
289 trace.borrow().trace().map_batches(|batch| {
290 batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch));
291 });
292
293 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
294 batches.retain(|_, weak| {
295 if let Some(batch) = weak.upgrade() {
296 let (sz, c, a) = logic(&batch);
297 (size += sz, capacity += c, allocations += a);
298 true
299 } else {
300 false
301 }
302 });
303
304 let size = size.try_into().expect("must fit");
305 if size != old_size {
306 logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
307 operator_id,
308 delta_size: size - old_size,
309 }));
310 }
311
312 let capacity = capacity.try_into().expect("must fit");
313 if capacity != old_capacity {
314 logger.log(&ComputeEvent::ArrangementHeapCapacity(
315 ArrangementHeapCapacity {
316 operator_id,
317 delta_capacity: capacity - old_capacity,
318 },
319 ));
320 }
321
322 let allocations = allocations.try_into().expect("must fit");
323 if allocations != old_allocations {
324 logger.log(&ComputeEvent::ArrangementHeapAllocations(
325 ArrangementHeapAllocations {
326 operator_id,
327 delta_allocations: allocations - old_allocations,
328 },
329 ));
330 }
331
332 old_size = size;
333 old_capacity = capacity;
334 old_allocations = allocations;
335 }
336 });
337 Arranged {
338 trace: arranged.trace,
339 stream,
340 }
341}
342
343impl<'scope, T, K, V, R> ArrangementSize for Arranged<'scope, KeyValAgent<K, V, T, R>>
344where
345 T: MzTimestamp,
346 K: Data + MzData,
347 V: Data + MzData,
348 R: Semigroup + Ord + MzData + 'static,
349{
350 fn log_arrangement_size(self) -> Self {
351 log_arrangement_size_inner(self, |batch| {
352 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
353 let mut callback = |siz, cap| {
354 size += siz;
355 capacity += cap;
356 allocations += usize::from(cap > 0);
357 };
358 batch.storage.keys.heap_size(&mut callback);
359 batch.storage.vals.offs.heap_size(&mut callback);
360 batch.storage.vals.vals.heap_size(&mut callback);
361 batch.storage.upds.offs.heap_size(&mut callback);
362 batch.storage.upds.times.heap_size(&mut callback);
363 batch.storage.upds.diffs.heap_size(&mut callback);
364 (size, capacity, allocations)
365 })
366 }
367}
368
369impl<'scope, T, K, R> ArrangementSize for Arranged<'scope, KeyAgent<K, T, R>>
370where
371 T: MzTimestamp,
372 K: Data + MzArrangeData,
373 R: Semigroup + Ord + MzData + 'static,
374{
375 fn log_arrangement_size(self) -> Self {
376 log_arrangement_size_inner(self, |batch| {
377 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
378 let mut callback = |siz, cap| {
379 size += siz;
380 capacity += cap;
381 allocations += usize::from(cap > 0);
382 };
383 batch.storage.keys.heap_size(&mut callback);
384 batch.storage.upds.offs.heap_size(&mut callback);
385 batch.storage.upds.times.heap_size(&mut callback);
386 batch.storage.upds.diffs.heap_size(&mut callback);
387 (size, capacity, allocations)
388 })
389 }
390}
391
392impl<'scope, T, V, R> ArrangementSize for Arranged<'scope, RowValAgent<V, T, R>>
393where
394 T: MzTimestamp,
395 V: Data + MzArrangeData,
396 R: Semigroup + Ord + MzArrangeData + 'static,
397{
398 fn log_arrangement_size(self) -> Self {
399 log_arrangement_size_inner(self, |batch| {
400 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
401 let mut callback = |siz, cap| {
402 size += siz;
403 capacity += cap;
404 allocations += usize::from(cap > 0);
405 };
406 batch.storage.keys.heap_size(&mut callback);
407 batch.storage.vals.offs.heap_size(&mut callback);
408 batch.storage.vals.vals.heap_size(&mut callback);
409 batch.storage.upds.offs.heap_size(&mut callback);
410 batch.storage.upds.times.heap_size(&mut callback);
411 batch.storage.upds.diffs.heap_size(&mut callback);
412 (size, capacity, allocations)
413 })
414 }
415}
416
417impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowRowAgent<T, R>>
418where
419 T: MzTimestamp,
420 R: Semigroup + Ord + MzArrangeData + 'static,
421{
422 fn log_arrangement_size(self) -> Self {
423 log_arrangement_size_inner(self, |batch| {
424 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
425 let mut callback = |siz, cap| {
426 size += siz;
427 capacity += cap;
428 allocations += usize::from(cap > 0);
429 };
430 batch.storage.keys.heap_size(&mut callback);
431 batch.storage.vals.offs.heap_size(&mut callback);
432 batch.storage.vals.vals.heap_size(&mut callback);
433 batch.storage.upds.offs.heap_size(&mut callback);
434 batch.storage.upds.times.heap_size(&mut callback);
435 batch.storage.upds.diffs.heap_size(&mut callback);
436 (size, capacity, allocations)
437 })
438 }
439}
440
441impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowAgent<T, R>>
442where
443 T: MzTimestamp,
444 R: Semigroup + Ord + MzArrangeData + 'static,
445{
446 fn log_arrangement_size(self) -> Self {
447 log_arrangement_size_inner(self, |batch| {
448 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
449 let mut callback = |siz, cap| {
450 size += siz;
451 capacity += cap;
452 allocations += usize::from(cap > 0);
453 };
454 batch.storage.keys.heap_size(&mut callback);
455 batch.storage.upds.offs.heap_size(&mut callback);
456 batch.storage.upds.times.heap_size(&mut callback);
457 batch.storage.upds.diffs.heap_size(&mut callback);
458 (size, capacity, allocations)
459 })
460 }
461}