1use std::collections::BTreeMap;
11use std::rc::Rc;
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable};
20use timely::Container;
21use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
22use timely::dataflow::operators::Operator;
23use timely::dataflow::{Scope, ScopeParent, StreamCore};
24
25use crate::logging::compute::{
26 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
27 ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
28};
29use crate::typedefs::{
30 KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
31};
32
33pub trait MzArrange: MzArrangeCore
35where
36 <Self::Scope as ScopeParent>::Timestamp: Lattice,
37{
38 fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<Self::Scope, TraceAgent<Tr>>
44 where
45 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
46 Bu: Builder<
47 Time = <Self::Scope as ScopeParent>::Timestamp,
48 Input = Ba::Output,
49 Output = Tr::Batch,
50 >,
51 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
52 Tr::Batch: Batch,
53 Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
54}
55
56pub trait MzArrangeCore
58where
59 <Self::Scope as ScopeParent>::Timestamp: Lattice,
60{
61 type Scope: Scope;
63 type Input: Container + Clone + 'static;
65
66 fn mz_arrange_core<P, Ba, Bu, Tr>(
73 &self,
74 pact: P,
75 name: &str,
76 ) -> Arranged<Self::Scope, TraceAgent<Tr>>
77 where
78 P: ParallelizationContract<<Self::Scope as ScopeParent>::Timestamp, Self::Input>,
79 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
80 Bu: Builder<
82 Time = <Self::Scope as ScopeParent>::Timestamp,
83 Input = Ba::Output,
84 Output = Tr::Batch,
85 >,
86 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
87 Tr::Batch: Batch,
88 Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
89}
90
91impl<G, C> MzArrangeCore for StreamCore<G, C>
92where
93 G: Scope,
94 G::Timestamp: Lattice,
95 C: Container + Clone + 'static,
96{
97 type Scope = G;
98 type Input = C;
99
100 fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
101 where
102 P: ParallelizationContract<G::Timestamp, Self::Input>,
103 Ba: Batcher<Input = Self::Input, Time = G::Timestamp> + 'static,
104 Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
105 Tr: Trace + TraceReader<Time = G::Timestamp> + 'static,
106 Tr::Batch: Batch,
107 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
108 {
109 #[allow(clippy::disallowed_methods)]
111 arrange_core::<_, _, Ba, Bu, _>(self, pact, name).log_arrangement_size()
112 }
113}
114
115impl<G, K, V, R> MzArrange for Collection<G, (K, V), R>
116where
117 G: Scope,
118 G::Timestamp: Lattice,
119 K: ExchangeData + Hashable,
120 V: ExchangeData,
121 R: ExchangeData,
122{
123 fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
124 where
125 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
126 Bu: Builder<
127 Time = <Self::Scope as ScopeParent>::Timestamp,
128 Input = Ba::Output,
129 Output = Tr::Batch,
130 >,
131 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
132 Tr::Batch: Batch,
133 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
134 {
135 let exchange =
136 Exchange::new(move |update: &((K, V), G::Timestamp, R)| (update.0).0.hashed().into());
137 self.mz_arrange_core::<_, Ba, Bu, _>(exchange, name)
138 }
139}
140
141impl<G, K, V, R, C> MzArrangeCore for Collection<G, (K, V), R, C>
142where
143 G: Scope,
144 G::Timestamp: Lattice,
145 C: Container + Clone + 'static,
146{
147 type Scope = G;
148 type Input = C;
149
150 fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
151 where
152 P: ParallelizationContract<G::Timestamp, Self::Input>,
153 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
154 Bu: Builder<
155 Time = <Self::Scope as ScopeParent>::Timestamp,
156 Input = Ba::Output,
157 Output = Tr::Batch,
158 >,
159 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
160 Tr::Batch: Batch,
161 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
162 {
163 self.inner.mz_arrange_core::<_, Ba, Bu, _>(pact, name)
164 }
165}
166
167pub struct KeyCollection<G: Scope, K, R = usize>(Collection<G, K, R>);
171
172impl<G: Scope, K, R: Semigroup> From<Collection<G, K, R>> for KeyCollection<G, K, R> {
173 fn from(value: Collection<G, K, R>) -> Self {
174 KeyCollection(value)
175 }
176}
177
178impl<G, K, R> MzArrange for KeyCollection<G, K, R>
179where
180 G: Scope,
181 K: ExchangeData + Hashable,
182 G::Timestamp: Lattice,
183 R: ExchangeData,
184{
185 fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
186 where
187 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
188 Bu: Builder<
189 Time = <Self::Scope as ScopeParent>::Timestamp,
190 Input = Ba::Output,
191 Output = Tr::Batch,
192 >,
193 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
194 Tr::Batch: Batch,
195 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
196 {
197 self.0.map(|d| (d, ())).mz_arrange::<Ba, Bu, _>(name)
198 }
199}
200
201impl<G, K, R> MzArrangeCore for KeyCollection<G, K, R>
202where
203 G: Scope,
204 K: Data,
205 G::Timestamp: Lattice,
206 R: Data,
207{
208 type Scope = G;
209 type Input = Vec<((K, ()), G::Timestamp, R)>;
210
211 fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
212 where
213 P: ParallelizationContract<G::Timestamp, Self::Input>,
214 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
215 Bu: Builder<
216 Time = <Self::Scope as ScopeParent>::Timestamp,
217 Input = Ba::Output,
218 Output = Tr::Batch,
219 >,
220 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
221 Tr::Batch: Batch,
222 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
223 {
224 self.0
225 .map(|d| (d, ()))
226 .mz_arrange_core::<_, Ba, Bu, _>(pact, name)
227 }
228}
229
230pub trait ArrangementSize {
232 fn log_arrangement_size(self) -> Self;
234}
235
236fn log_arrangement_size_inner<G, B, L>(
242 arranged: Arranged<G, TraceAgent<Spine<Rc<B>>>>,
243 mut logic: L,
244) -> Arranged<G, TraceAgent<Spine<Rc<B>>>>
245where
246 G: Scope<Timestamp: Lattice>,
247 B: Batch + 'static,
248 L: FnMut(&B) -> (usize, usize, usize) + 'static,
249{
250 let scope = arranged.stream.scope();
251 let Some(logger) = scope.logger_for::<ComputeEventBuilder>("materialize/compute") else {
252 return arranged;
253 };
254 let operator_id = arranged.trace.operator().global_id;
255 let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
256
257 let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
258
259 let stream = arranged
260 .stream
261 .unary(Pipeline, "ArrangementSize", |_cap, info| {
262 let address = info.address;
263 logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
264 ArrangementHeapSizeOperator {
265 operator_id,
266 address: address.to_vec(),
267 },
268 ));
269
270 let mut batches = BTreeMap::new();
272
273 move |input, output| {
274 while let Some((time, data)) = input.next() {
275 batches.extend(
276 data.iter()
277 .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))),
278 );
279 output.session(&time).give_container(data);
280 }
281 let Some(trace) = trace.upgrade() else {
282 return;
283 };
284
285 trace.borrow().trace.map_batches(|batch| {
286 batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch));
287 });
288
289 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
290 batches.retain(|_, weak| {
291 if let Some(batch) = weak.upgrade() {
292 let (sz, c, a) = logic(&batch);
293 (size += sz, capacity += c, allocations += a);
294 true
295 } else {
296 false
297 }
298 });
299
300 let size = size.try_into().expect("must fit");
301 if size != old_size {
302 logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
303 operator_id,
304 delta_size: size - old_size,
305 }));
306 }
307
308 let capacity = capacity.try_into().expect("must fit");
309 if capacity != old_capacity {
310 logger.log(&ComputeEvent::ArrangementHeapCapacity(
311 ArrangementHeapCapacity {
312 operator_id,
313 delta_capacity: capacity - old_capacity,
314 },
315 ));
316 }
317
318 let allocations = allocations.try_into().expect("must fit");
319 if allocations != old_allocations {
320 logger.log(&ComputeEvent::ArrangementHeapAllocations(
321 ArrangementHeapAllocations {
322 operator_id,
323 delta_allocations: allocations - old_allocations,
324 },
325 ));
326 }
327
328 old_size = size;
329 old_capacity = capacity;
330 old_allocations = allocations;
331 }
332 });
333 Arranged {
334 trace: arranged.trace,
335 stream,
336 }
337}
338
339impl<G, K, V, R> ArrangementSize for Arranged<G, KeyValAgent<K, V, G::Timestamp, R>>
340where
341 G: Scope,
342 G::Timestamp: MzTimestamp,
343 K: Data + MzData,
344 V: Data + MzData,
345 R: Semigroup + Ord + MzData + 'static,
346{
347 fn log_arrangement_size(self) -> Self {
348 log_arrangement_size_inner(self, |batch| {
349 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
350 let mut callback = |siz, cap| {
351 size += siz;
352 capacity += cap;
353 allocations += usize::from(cap > 0);
354 };
355 batch.storage.keys.heap_size(&mut callback);
356 batch.storage.vals.offs.heap_size(&mut callback);
357 batch.storage.vals.vals.heap_size(&mut callback);
358 batch.storage.upds.offs.heap_size(&mut callback);
359 batch.storage.upds.times.heap_size(&mut callback);
360 batch.storage.upds.diffs.heap_size(&mut callback);
361 (size, capacity, allocations)
362 })
363 }
364}
365
366impl<G, K, R> ArrangementSize for Arranged<G, KeyAgent<K, G::Timestamp, R>>
367where
368 G: Scope,
369 G::Timestamp: MzTimestamp,
370 K: Data + MzArrangeData,
371 R: Semigroup + Ord + MzData + 'static,
372{
373 fn log_arrangement_size(self) -> Self {
374 log_arrangement_size_inner(self, |batch| {
375 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
376 let mut callback = |siz, cap| {
377 size += siz;
378 capacity += cap;
379 allocations += usize::from(cap > 0);
380 };
381 batch.storage.keys.heap_size(&mut callback);
382 batch.storage.upds.offs.heap_size(&mut callback);
383 batch.storage.upds.times.heap_size(&mut callback);
384 batch.storage.upds.diffs.heap_size(&mut callback);
385 (size, capacity, allocations)
386 })
387 }
388}
389
390impl<G, V, R> ArrangementSize for Arranged<G, RowValAgent<V, G::Timestamp, R>>
391where
392 G: Scope,
393 G::Timestamp: MzTimestamp,
394 V: Data + MzArrangeData,
395 R: Semigroup + Ord + MzArrangeData + 'static,
396{
397 fn log_arrangement_size(self) -> Self {
398 log_arrangement_size_inner(self, |batch| {
399 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
400 let mut callback = |siz, cap| {
401 size += siz;
402 capacity += cap;
403 allocations += usize::from(cap > 0);
404 };
405 batch.storage.keys.heap_size(&mut callback);
406 batch.storage.vals.offs.heap_size(&mut callback);
407 batch.storage.vals.vals.heap_size(&mut callback);
408 batch.storage.upds.offs.heap_size(&mut callback);
409 batch.storage.upds.times.heap_size(&mut callback);
410 batch.storage.upds.diffs.heap_size(&mut callback);
411 (size, capacity, allocations)
412 })
413 }
414}
415
416impl<G, R> ArrangementSize for Arranged<G, RowRowAgent<G::Timestamp, R>>
417where
418 G: Scope,
419 G::Timestamp: MzTimestamp,
420 R: Semigroup + Ord + MzArrangeData + 'static,
421{
422 fn log_arrangement_size(self) -> Self {
423 log_arrangement_size_inner(self, |batch| {
424 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
425 let mut callback = |siz, cap| {
426 size += siz;
427 capacity += cap;
428 allocations += usize::from(cap > 0);
429 };
430 batch.storage.keys.heap_size(&mut callback);
431 batch.storage.vals.offs.heap_size(&mut callback);
432 batch.storage.vals.vals.heap_size(&mut callback);
433 batch.storage.upds.offs.heap_size(&mut callback);
434 batch.storage.upds.times.heap_size(&mut callback);
435 batch.storage.upds.diffs.heap_size(&mut callback);
436 (size, capacity, allocations)
437 })
438 }
439}
440
441impl<G, R> ArrangementSize for Arranged<G, RowAgent<G::Timestamp, R>>
442where
443 G: Scope,
444 G::Timestamp: MzTimestamp,
445 R: Semigroup + Ord + MzArrangeData + 'static,
446{
447 fn log_arrangement_size(self) -> Self {
448 log_arrangement_size_inner(self, |batch| {
449 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
450 let mut callback = |siz, cap| {
451 size += siz;
452 capacity += cap;
453 allocations += usize::from(cap > 0);
454 };
455 batch.storage.keys.heap_size(&mut callback);
456 batch.storage.upds.offs.heap_size(&mut callback);
457 batch.storage.upds.times.heap_size(&mut callback);
458 batch.storage.upds.diffs.heap_size(&mut callback);
459 (size, capacity, allocations)
460 })
461 }
462}