1use std::collections::BTreeMap;
11use std::rc::{Rc, Weak};
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection};
20use timely::Container;
21use timely::container::{ContainerBuilder, PushInto};
22use timely::dataflow::Stream;
23use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
24use timely::dataflow::operators::Operator;
25use timely::progress::Timestamp;
26
27use crate::logging::compute::{
28 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
29 ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
30};
31use crate::typedefs::{
32 KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
33};
34
35pub trait MzArrange<'scope>: MzArrangeCore<'scope> {
37 fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
43 where
44 Ba: Batcher<Time = Self::Timestamp> + 'static,
45 Chu: ContainerBuilder<Container = Ba::Output>
46 + for<'a> PushInto<&'a mut Self::Input>
47 + 'static,
48 Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
49 Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
50 Tr::Batch: Batch,
51 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
52}
53
54pub trait MzArrangeCore<'scope> {
56 type Timestamp: Timestamp + Lattice;
58 type Input: Container + Clone + 'static;
60
61 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
68 self,
69 pact: P,
70 name: &str,
71 ) -> Arranged<'scope, TraceAgent<Tr>>
72 where
73 P: ParallelizationContract<Self::Timestamp, Self::Input>,
74 Ba: Batcher<Time = Self::Timestamp> + 'static,
75 Chu: ContainerBuilder<Container = Ba::Output>
76 + for<'a> PushInto<&'a mut Self::Input>
77 + 'static,
78 Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
79 Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
80 Tr::Batch: Batch,
81 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
82}
83
84impl<'scope, T, C> MzArrangeCore<'scope> for Stream<'scope, T, C>
85where
86 T: Timestamp + Lattice,
87 C: Container + Clone + 'static,
88{
89 type Timestamp = T;
90 type Input = C;
91
92 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
93 self,
94 pact: P,
95 name: &str,
96 ) -> Arranged<'scope, TraceAgent<Tr>>
97 where
98 P: ParallelizationContract<T, Self::Input>,
99 Ba: Batcher<Time = T> + 'static,
100 Chu: ContainerBuilder<Container = Ba::Output>
101 + for<'a> PushInto<&'a mut Self::Input>
102 + 'static,
103 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
104 Tr: Trace + TraceReader<Time = T> + 'static,
105 Tr::Batch: Batch,
106 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
107 {
108 #[allow(clippy::disallowed_methods)]
110 arrange_core::<_, _, Chu, Ba, Bu, _>(self, pact, name).log_arrangement_size()
111 }
112}
113
114impl<'scope, T, K, V, R> MzArrange<'scope> for VecCollection<'scope, T, (K, V), R>
115where
116 T: Timestamp + Lattice,
117 K: ExchangeData + Hashable,
118 V: ExchangeData,
119 R: ExchangeData,
120{
121 fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
122 where
123 Ba: Batcher<Time = T> + 'static,
124 Chu: ContainerBuilder<Container = Ba::Output>
125 + for<'a> PushInto<&'a mut Self::Input>
126 + 'static,
127 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
128 Tr: Trace + TraceReader<Time = T> + 'static,
129 Tr::Batch: Batch,
130 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
131 {
132 let exchange = Exchange::new(move |update: &((K, V), T, R)| (update.0).0.hashed().into());
133 self.mz_arrange_core::<_, Chu, Ba, Bu, _>(exchange, name)
134 }
135}
136
137impl<'scope, T, C> MzArrangeCore<'scope> for Collection<'scope, T, C>
138where
139 T: Timestamp + Lattice,
140 C: Container + Clone + 'static,
141{
142 type Timestamp = T;
143 type Input = C;
144
145 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
146 self,
147 pact: P,
148 name: &str,
149 ) -> Arranged<'scope, TraceAgent<Tr>>
150 where
151 P: ParallelizationContract<T, Self::Input>,
152 Ba: Batcher<Time = T> + 'static,
153 Chu: ContainerBuilder<Container = Ba::Output>
154 + for<'a> PushInto<&'a mut Self::Input>
155 + 'static,
156 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
157 Tr: Trace + TraceReader<Time = T> + 'static,
158 Tr::Batch: Batch,
159 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
160 {
161 self.inner.mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
162 }
163}
164
165pub struct KeyCollection<'scope, T: Timestamp, K: 'static, R: 'static = usize>(
169 VecCollection<'scope, T, K, R>,
170);
171
172impl<'scope, T: Timestamp, K, R: Semigroup> From<VecCollection<'scope, T, K, R>>
173 for KeyCollection<'scope, T, K, R>
174{
175 fn from(value: VecCollection<'scope, T, K, R>) -> Self {
176 KeyCollection(value)
177 }
178}
179
180impl<'scope, T, K, R> MzArrange<'scope> for KeyCollection<'scope, T, K, R>
181where
182 T: Timestamp + Lattice,
183 K: ExchangeData + Hashable,
184 R: ExchangeData,
185{
186 fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
187 where
188 Ba: Batcher<Time = T> + 'static,
189 Chu: ContainerBuilder<Container = Ba::Output>
190 + for<'a> PushInto<&'a mut Self::Input>
191 + 'static,
192 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
193 Tr: Trace + TraceReader<Time = T> + 'static,
194 Tr::Batch: Batch,
195 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
196 {
197 self.0.map(|d| (d, ())).mz_arrange::<Chu, Ba, Bu, _>(name)
198 }
199}
200
201impl<'scope, T, K, R> MzArrangeCore<'scope> for KeyCollection<'scope, T, K, R>
202where
203 T: Timestamp + Lattice,
204 K: Clone + 'static,
205 R: Clone + 'static,
206{
207 type Timestamp = T;
208 type Input = Vec<((K, ()), T, R)>;
209
210 fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
211 self,
212 pact: P,
213 name: &str,
214 ) -> Arranged<'scope, TraceAgent<Tr>>
215 where
216 P: ParallelizationContract<T, Self::Input>,
217 Ba: Batcher<Time = T> + 'static,
218 Chu: ContainerBuilder<Container = Ba::Output>
219 + for<'a> PushInto<&'a mut Self::Input>
220 + 'static,
221 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
222 Tr: Trace + TraceReader<Time = T> + 'static,
223 Tr::Batch: Batch,
224 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
225 {
226 self.0
227 .map(|d| (d, ()))
228 .mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
229 }
230}
231
232pub trait ArrangementSize {
234 fn log_arrangement_size(self) -> Self;
236}
237
238fn log_arrangement_size_inner<'scope, B, L>(
244 arranged: Arranged<'scope, TraceAgent<Spine<Rc<B>>>>,
245 mut logic: L,
246) -> Arranged<'scope, TraceAgent<Spine<Rc<B>>>>
247where
248 B: Batch + 'static,
249 L: FnMut(&B) -> (usize, usize, usize) + 'static,
250{
251 let scope = arranged.stream.scope();
252 let Some(logger) = scope
253 .worker()
254 .logger_for::<ComputeEventBuilder>("materialize/compute")
255 else {
256 return arranged;
257 };
258 let operator_id = arranged.trace.operator().global_id;
259 let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
260
261 let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
262
263 let stream = arranged
264 .stream
265 .unary(Pipeline, "ArrangementSize", |_cap, info| {
266 let address = info.address;
267 logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
268 ArrangementHeapSizeOperator {
269 operator_id,
270 address: address.to_vec(),
271 },
272 ));
273
274 let mut batches: BTreeMap<*const B, (Weak<B>, (usize, usize, usize))> = BTreeMap::new();
280
281 move |input, output| {
282 input.for_each(|time, data| {
283 for batch in data.iter() {
284 batches
285 .entry(Rc::as_ptr(batch))
286 .or_insert_with(|| (Rc::downgrade(batch), logic(batch)));
287 }
288 output.session(&time).give_container(data);
289 });
290 let Some(trace) = trace.upgrade() else {
291 return;
292 };
293
294 trace.borrow().trace().map_batches(|batch| {
295 batches
296 .entry(Rc::as_ptr(batch))
297 .or_insert_with(|| (Rc::downgrade(batch), logic(batch)));
298 });
299
300 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
301 batches.retain(|_, (weak, cached)| {
302 if weak.strong_count() > 0 {
303 let (sz, c, a) = *cached;
304 (size += sz, capacity += c, allocations += a);
305 true
306 } else {
307 false
308 }
309 });
310
311 let size = size.try_into().expect("must fit");
312 if size != old_size {
313 logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
314 operator_id,
315 delta_size: size - old_size,
316 }));
317 }
318
319 let capacity = capacity.try_into().expect("must fit");
320 if capacity != old_capacity {
321 logger.log(&ComputeEvent::ArrangementHeapCapacity(
322 ArrangementHeapCapacity {
323 operator_id,
324 delta_capacity: capacity - old_capacity,
325 },
326 ));
327 }
328
329 let allocations = allocations.try_into().expect("must fit");
330 if allocations != old_allocations {
331 logger.log(&ComputeEvent::ArrangementHeapAllocations(
332 ArrangementHeapAllocations {
333 operator_id,
334 delta_allocations: allocations - old_allocations,
335 },
336 ));
337 }
338
339 old_size = size;
340 old_capacity = capacity;
341 old_allocations = allocations;
342 }
343 });
344 Arranged {
345 trace: arranged.trace,
346 stream,
347 }
348}
349
350impl<'scope, T, K, V, R> ArrangementSize for Arranged<'scope, KeyValAgent<K, V, T, R>>
351where
352 T: MzTimestamp,
353 K: Data + MzData,
354 V: Data + MzData,
355 R: Semigroup + Ord + MzData + 'static,
356{
357 fn log_arrangement_size(self) -> Self {
358 log_arrangement_size_inner(self, |batch| {
359 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
360 let mut callback = |siz, cap| {
361 size += siz;
362 capacity += cap;
363 allocations += usize::from(cap > 0);
364 };
365 batch.storage.keys.heap_size(&mut callback);
366 batch.storage.vals.offs.heap_size(&mut callback);
367 batch.storage.vals.vals.heap_size(&mut callback);
368 batch.storage.upds.offs.heap_size(&mut callback);
369 batch.storage.upds.times.heap_size(&mut callback);
370 batch.storage.upds.diffs.heap_size(&mut callback);
371 (size, capacity, allocations)
372 })
373 }
374}
375
376impl<'scope, T, K, R> ArrangementSize for Arranged<'scope, KeyAgent<K, T, R>>
377where
378 T: MzTimestamp,
379 K: Data + MzArrangeData,
380 R: Semigroup + Ord + MzData + 'static,
381{
382 fn log_arrangement_size(self) -> Self {
383 log_arrangement_size_inner(self, |batch| {
384 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
385 let mut callback = |siz, cap| {
386 size += siz;
387 capacity += cap;
388 allocations += usize::from(cap > 0);
389 };
390 batch.storage.keys.heap_size(&mut callback);
391 batch.storage.upds.offs.heap_size(&mut callback);
392 batch.storage.upds.times.heap_size(&mut callback);
393 batch.storage.upds.diffs.heap_size(&mut callback);
394 (size, capacity, allocations)
395 })
396 }
397}
398
399impl<'scope, T, V, R> ArrangementSize for Arranged<'scope, RowValAgent<V, T, R>>
400where
401 T: MzTimestamp,
402 V: Data + MzArrangeData,
403 R: Semigroup + Ord + MzArrangeData + 'static,
404{
405 fn log_arrangement_size(self) -> Self {
406 log_arrangement_size_inner(self, |batch| {
407 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
408 let mut callback = |siz, cap| {
409 size += siz;
410 capacity += cap;
411 allocations += usize::from(cap > 0);
412 };
413 batch.storage.keys.heap_size(&mut callback);
414 batch.storage.vals.offs.heap_size(&mut callback);
415 batch.storage.vals.vals.heap_size(&mut callback);
416 batch.storage.upds.offs.heap_size(&mut callback);
417 batch.storage.upds.times.heap_size(&mut callback);
418 batch.storage.upds.diffs.heap_size(&mut callback);
419 (size, capacity, allocations)
420 })
421 }
422}
423
424impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowRowAgent<T, R>>
425where
426 T: MzTimestamp,
427 R: Semigroup + Ord + MzArrangeData + 'static,
428{
429 fn log_arrangement_size(self) -> Self {
430 log_arrangement_size_inner(self, |batch| {
431 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
432 let mut callback = |siz, cap| {
433 size += siz;
434 capacity += cap;
435 allocations += usize::from(cap > 0);
436 };
437 batch.storage.keys.heap_size(&mut callback);
438 batch.storage.vals.offs.heap_size(&mut callback);
439 batch.storage.vals.vals.heap_size(&mut callback);
440 batch.storage.upds.offs.heap_size(&mut callback);
441 batch.storage.upds.times.heap_size(&mut callback);
442 batch.storage.upds.diffs.heap_size(&mut callback);
443 (size, capacity, allocations)
444 })
445 }
446}
447
448impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowAgent<T, R>>
449where
450 T: MzTimestamp,
451 R: Semigroup + Ord + MzArrangeData + 'static,
452{
453 fn log_arrangement_size(self) -> Self {
454 log_arrangement_size_inner(self, |batch| {
455 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
456 let mut callback = |siz, cap| {
457 size += siz;
458 capacity += cap;
459 allocations += usize::from(cap > 0);
460 };
461 batch.storage.keys.heap_size(&mut callback);
462 batch.storage.upds.offs.heap_size(&mut callback);
463 batch.storage.upds.times.heap_size(&mut callback);
464 batch.storage.upds.diffs.heap_size(&mut callback);
465 (size, capacity, allocations)
466 })
467 }
468}