1use std::rc::Rc;
11
12use differential_dataflow::containers::Columnation;
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
18use differential_dataflow::{Collection, Data, ExchangeData, Hashable};
19use timely::Container;
20use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
21use timely::dataflow::operators::Operator;
22use timely::dataflow::{Scope, ScopeParent, StreamCore};
23use timely::progress::Timestamp;
24
25use crate::logging::compute::{
26 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
27 ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
28};
29use crate::typedefs::{KeyAgent, KeyValAgent, RowAgent, RowRowAgent, RowValAgent};
30
31pub trait MzArrange: MzArrangeCore
33where
34 <Self::Scope as ScopeParent>::Timestamp: Lattice,
35{
36 fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<Self::Scope, TraceAgent<Tr>>
42 where
43 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
44 Bu: Builder<
45 Time = <Self::Scope as ScopeParent>::Timestamp,
46 Input = Ba::Output,
47 Output = Tr::Batch,
48 >,
49 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
50 Tr::Batch: Batch,
51 Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
52}
53
54pub trait MzArrangeCore
56where
57 <Self::Scope as ScopeParent>::Timestamp: Lattice,
58{
59 type Scope: Scope;
61 type Input: Container + Clone + 'static;
63
64 fn mz_arrange_core<P, Ba, Bu, Tr>(
71 &self,
72 pact: P,
73 name: &str,
74 ) -> Arranged<Self::Scope, TraceAgent<Tr>>
75 where
76 P: ParallelizationContract<<Self::Scope as ScopeParent>::Timestamp, Self::Input>,
77 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
78 Bu: Builder<
80 Time = <Self::Scope as ScopeParent>::Timestamp,
81 Input = Ba::Output,
82 Output = Tr::Batch,
83 >,
84 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
85 Tr::Batch: Batch,
86 Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
87}
88
89impl<G, C> MzArrangeCore for StreamCore<G, C>
90where
91 G: Scope,
92 G::Timestamp: Lattice,
93 C: Container + Clone + 'static,
94{
95 type Scope = G;
96 type Input = C;
97
98 fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
99 where
100 P: ParallelizationContract<G::Timestamp, Self::Input>,
101 Ba: Batcher<Input = Self::Input, Time = G::Timestamp> + 'static,
102 Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
103 Tr: Trace + TraceReader<Time = G::Timestamp> + 'static,
104 Tr::Batch: Batch,
105 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
106 {
107 #[allow(clippy::disallowed_methods)]
109 arrange_core::<_, _, Ba, Bu, _>(self, pact, name).log_arrangement_size()
110 }
111}
112
113impl<G, K, V, R> MzArrange for Collection<G, (K, V), R>
114where
115 G: Scope,
116 G::Timestamp: Lattice,
117 K: ExchangeData + Hashable,
118 V: ExchangeData,
119 R: ExchangeData,
120{
121 fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
122 where
123 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
124 Bu: Builder<
125 Time = <Self::Scope as ScopeParent>::Timestamp,
126 Input = Ba::Output,
127 Output = Tr::Batch,
128 >,
129 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
130 Tr::Batch: Batch,
131 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
132 {
133 let exchange =
134 Exchange::new(move |update: &((K, V), G::Timestamp, R)| (update.0).0.hashed().into());
135 self.mz_arrange_core::<_, Ba, Bu, _>(exchange, name)
136 }
137}
138
139impl<G, K, V, R, C> MzArrangeCore for Collection<G, (K, V), R, C>
140where
141 G: Scope,
142 G::Timestamp: Lattice,
143 C: Container + Clone + 'static,
144{
145 type Scope = G;
146 type Input = C;
147
148 fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
149 where
150 P: ParallelizationContract<G::Timestamp, Self::Input>,
151 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
152 Bu: Builder<
153 Time = <Self::Scope as ScopeParent>::Timestamp,
154 Input = Ba::Output,
155 Output = Tr::Batch,
156 >,
157 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
158 Tr::Batch: Batch,
159 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
160 {
161 self.inner.mz_arrange_core::<_, Ba, Bu, _>(pact, name)
162 }
163}
164
165pub struct KeyCollection<G: Scope, K, R = usize>(Collection<G, K, R>);
169
170impl<G: Scope, K, R: Semigroup> From<Collection<G, K, R>> for KeyCollection<G, K, R> {
171 fn from(value: Collection<G, K, R>) -> Self {
172 KeyCollection(value)
173 }
174}
175
176impl<G, K, R> MzArrange for KeyCollection<G, K, R>
177where
178 G: Scope,
179 K: ExchangeData + Hashable,
180 G::Timestamp: Lattice,
181 R: ExchangeData,
182{
183 fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
184 where
185 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
186 Bu: Builder<
187 Time = <Self::Scope as ScopeParent>::Timestamp,
188 Input = Ba::Output,
189 Output = Tr::Batch,
190 >,
191 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
192 Tr::Batch: Batch,
193 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
194 {
195 self.0.map(|d| (d, ())).mz_arrange::<Ba, Bu, _>(name)
196 }
197}
198
199impl<G, K, R> MzArrangeCore for KeyCollection<G, K, R>
200where
201 G: Scope,
202 K: Data,
203 G::Timestamp: Lattice,
204 R: Data,
205{
206 type Scope = G;
207 type Input = Vec<((K, ()), G::Timestamp, R)>;
208
209 fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
210 where
211 P: ParallelizationContract<G::Timestamp, Self::Input>,
212 Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
213 Bu: Builder<
214 Time = <Self::Scope as ScopeParent>::Timestamp,
215 Input = Ba::Output,
216 Output = Tr::Batch,
217 >,
218 Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
219 Tr::Batch: Batch,
220 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
221 {
222 self.0
223 .map(|d| (d, ()))
224 .mz_arrange_core::<_, Ba, Bu, _>(pact, name)
225 }
226}
227
228pub trait ArrangementSize {
230 fn log_arrangement_size(self) -> Self;
232}
233
234fn log_arrangement_size_inner<G, Tr, L>(
240 arranged: Arranged<G, TraceAgent<Tr>>,
241 mut logic: L,
242) -> Arranged<G, TraceAgent<Tr>>
243where
244 G: Scope,
245 G::Timestamp: Timestamp + Lattice + Ord,
246 Tr: TraceReader + 'static,
247 Tr::Time: Timestamp + Lattice + Ord + Clone + 'static,
248 L: FnMut(&Tr) -> (usize, usize, usize) + 'static,
249{
250 let scope = arranged.stream.scope();
251 let Some(logger) = scope
252 .log_register()
253 .get::<ComputeEventBuilder>("materialize/compute")
254 else {
255 return arranged;
256 };
257 let operator_id = arranged.trace.operator().global_id;
258 let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
259
260 let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
261
262 let stream = arranged
263 .stream
264 .unary(Pipeline, "ArrangementSize", |_cap, info| {
265 let address = info.address;
266 logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
267 ArrangementHeapSizeOperator {
268 operator_id,
269 address: address.to_vec(),
270 },
271 ));
272 move |input, output| {
273 while let Some((time, data)) = input.next() {
274 output.session(&time).give_container(data);
275 }
276 let Some(trace) = trace.upgrade() else {
277 return;
278 };
279
280 let (size, capacity, allocations) = logic(&trace.borrow().trace);
281
282 let size = size.try_into().expect("must fit");
283 if size != old_size {
284 logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
285 operator_id,
286 delta_size: size - old_size,
287 }));
288 }
289
290 let capacity = capacity.try_into().expect("must fit");
291 if capacity != old_capacity {
292 logger.log(&ComputeEvent::ArrangementHeapCapacity(
293 ArrangementHeapCapacity {
294 operator_id,
295 delta_capacity: capacity - old_capacity,
296 },
297 ));
298 }
299
300 let allocations = allocations.try_into().expect("must fit");
301 if allocations != old_allocations {
302 logger.log(&ComputeEvent::ArrangementHeapAllocations(
303 ArrangementHeapAllocations {
304 operator_id,
305 delta_allocations: allocations - old_allocations,
306 },
307 ));
308 }
309
310 old_size = size;
311 old_capacity = capacity;
312 old_allocations = allocations;
313 }
314 });
315 Arranged {
316 trace: arranged.trace,
317 stream,
318 }
319}
320
321impl<G, K, V, T, R> ArrangementSize for Arranged<G, KeyValAgent<K, V, T, R>>
322where
323 G: Scope<Timestamp = T>,
324 G::Timestamp: Lattice + Ord + Columnation,
325 K: Data + Columnation,
326 V: Data + Columnation,
327 T: Lattice + Timestamp,
328 R: Semigroup + Ord + Columnation + 'static,
329{
330 fn log_arrangement_size(self) -> Self {
331 log_arrangement_size_inner(self, |trace| {
332 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
333 let mut callback = |siz, cap| {
334 size += siz;
335 capacity += cap;
336 allocations += usize::from(cap > 0);
337 };
338 trace.map_batches(|batch| {
339 batch.storage.keys.heap_size(&mut callback);
340 batch.storage.keys_offs.heap_size(&mut callback);
341 batch.storage.vals.heap_size(&mut callback);
342 batch.storage.vals_offs.heap_size(&mut callback);
343 batch.storage.times.heap_size(&mut callback);
344 batch.storage.diffs.heap_size(&mut callback);
345 });
346 (size, capacity, allocations)
347 })
348 }
349}
350
351impl<G, K, T, R> ArrangementSize for Arranged<G, KeyAgent<K, T, R>>
352where
353 G: Scope<Timestamp = T>,
354 G::Timestamp: Lattice + Ord,
355 K: Data + Columnation,
356 T: Lattice + Timestamp + Columnation,
357 R: Semigroup + Ord + Columnation + 'static,
358{
359 fn log_arrangement_size(self) -> Self {
360 log_arrangement_size_inner(self, |trace| {
361 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
362 let mut callback = |siz, cap| {
363 size += siz;
364 capacity += cap;
365 allocations += usize::from(cap > 0);
366 };
367 trace.map_batches(|batch| {
368 batch.storage.keys.heap_size(&mut callback);
369 batch.storage.keys_offs.heap_size(&mut callback);
370 batch.storage.times.heap_size(&mut callback);
371 batch.storage.diffs.heap_size(&mut callback);
372 });
373 (size, capacity, allocations)
374 })
375 }
376}
377
378impl<G, V, T, R> ArrangementSize for Arranged<G, RowValAgent<V, T, R>>
379where
380 G: Scope<Timestamp = T>,
381 G::Timestamp: Lattice + Ord + Columnation,
382 V: Data + Columnation,
383 T: Lattice + Timestamp,
384 R: Semigroup + Ord + Columnation + 'static,
385{
386 fn log_arrangement_size(self) -> Self {
387 log_arrangement_size_inner(self, |trace| {
388 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
389 let mut callback = |siz, cap| {
390 size += siz;
391 capacity += cap;
392 allocations += usize::from(cap > 0);
393 };
394 trace.map_batches(|batch| {
395 batch.storage.keys.heap_size(&mut callback);
396 batch.storage.keys_offs.heap_size(&mut callback);
397 batch.storage.vals.heap_size(&mut callback);
398 batch.storage.vals_offs.heap_size(&mut callback);
399 batch.storage.times.heap_size(&mut callback);
400 batch.storage.diffs.heap_size(&mut callback);
401 });
402 (size, capacity, allocations)
403 })
404 }
405}
406
407impl<G, T, R> ArrangementSize for Arranged<G, RowRowAgent<T, R>>
408where
409 G: Scope<Timestamp = T>,
410 G::Timestamp: Lattice + Ord + Columnation,
411 T: Lattice + Timestamp,
412 R: Semigroup + Ord + Columnation + 'static,
413{
414 fn log_arrangement_size(self) -> Self {
415 log_arrangement_size_inner(self, |trace| {
416 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
417 let mut callback = |siz, cap| {
418 size += siz;
419 capacity += cap;
420 allocations += usize::from(cap > 0);
421 };
422 trace.map_batches(|batch| {
423 batch.storage.keys.heap_size(&mut callback);
424 batch.storage.keys_offs.heap_size(&mut callback);
425 batch.storage.vals.heap_size(&mut callback);
426 batch.storage.vals_offs.heap_size(&mut callback);
427 batch.storage.times.heap_size(&mut callback);
428 batch.storage.diffs.heap_size(&mut callback);
429 });
430 (size, capacity, allocations)
431 })
432 }
433}
434
435impl<G, T, R> ArrangementSize for Arranged<G, RowAgent<T, R>>
436where
437 G: Scope<Timestamp = T>,
438 G::Timestamp: Lattice + Ord + Columnation,
439 T: Lattice + Timestamp,
440 R: Semigroup + Ord + Columnation + 'static,
441{
442 fn log_arrangement_size(self) -> Self {
443 log_arrangement_size_inner(self, |trace| {
444 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
445 let mut callback = |siz, cap| {
446 size += siz;
447 capacity += cap;
448 allocations += usize::from(cap > 0);
449 };
450 trace.map_batches(|batch| {
451 batch.storage.keys.heap_size(&mut callback);
452 batch.storage.keys_offs.heap_size(&mut callback);
453 batch.storage.times.heap_size(&mut callback);
454 batch.storage.diffs.heap_size(&mut callback);
455 });
456 (size, capacity, allocations)
457 })
458 }
459}