1use std::collections::BTreeMap;
11use std::rc::Rc;
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection};
20use timely::Container;
21use timely::dataflow::Stream;
22use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
23use timely::dataflow::operators::Operator;
24use timely::progress::Timestamp;
25
26use crate::logging::compute::{
27 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
28 ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
29};
30use crate::typedefs::{
31 KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
32};
33
34pub trait MzArrange<'scope>: MzArrangeCore<'scope> {
36 fn mz_arrange<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
42 where
43 Ba: Batcher<Input = Self::Input, Time = Self::Timestamp> + 'static,
44 Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
45 Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
46 Tr::Batch: Batch,
47 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
48}
49
50pub trait MzArrangeCore<'scope> {
52 type Timestamp: Timestamp + Lattice;
54 type Input: Container + Clone + 'static;
56
57 fn mz_arrange_core<P, Ba, Bu, Tr>(
64 self,
65 pact: P,
66 name: &str,
67 ) -> Arranged<'scope, TraceAgent<Tr>>
68 where
69 P: ParallelizationContract<Self::Timestamp, Self::Input>,
70 Ba: Batcher<Input = Self::Input, Time = Self::Timestamp> + 'static,
71 Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
73 Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
74 Tr::Batch: Batch,
75 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
76}
77
78impl<'scope, T, C> MzArrangeCore<'scope> for Stream<'scope, T, C>
79where
80 T: Timestamp + Lattice,
81 C: Container + Clone + 'static,
82{
83 type Timestamp = T;
84 type Input = C;
85
86 fn mz_arrange_core<P, Ba, Bu, Tr>(self, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
87 where
88 P: ParallelizationContract<T, Self::Input>,
89 Ba: Batcher<Input = Self::Input, Time = T> + 'static,
90 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
91 Tr: Trace + TraceReader<Time = T> + 'static,
92 Tr::Batch: Batch,
93 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
94 {
95 #[allow(clippy::disallowed_methods)]
97 arrange_core::<_, Ba, Bu, _>(self, pact, name).log_arrangement_size()
98 }
99}
100
101impl<'scope, T, K, V, R> MzArrange<'scope> for VecCollection<'scope, T, (K, V), R>
102where
103 T: Timestamp + Lattice,
104 K: ExchangeData + Hashable,
105 V: ExchangeData,
106 R: ExchangeData,
107{
108 fn mz_arrange<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
109 where
110 Ba: Batcher<Input = Self::Input, Time = T> + 'static,
111 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
112 Tr: Trace + TraceReader<Time = T> + 'static,
113 Tr::Batch: Batch,
114 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
115 {
116 let exchange = Exchange::new(move |update: &((K, V), T, R)| (update.0).0.hashed().into());
117 self.mz_arrange_core::<_, Ba, Bu, _>(exchange, name)
118 }
119}
120
121impl<'scope, T, C> MzArrangeCore<'scope> for Collection<'scope, T, C>
122where
123 T: Timestamp + Lattice,
124 C: Container + Clone + 'static,
125{
126 type Timestamp = T;
127 type Input = C;
128
129 fn mz_arrange_core<P, Ba, Bu, Tr>(self, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
130 where
131 P: ParallelizationContract<T, Self::Input>,
132 Ba: Batcher<Input = Self::Input, Time = T> + 'static,
133 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
134 Tr: Trace + TraceReader<Time = T> + 'static,
135 Tr::Batch: Batch,
136 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
137 {
138 self.inner.mz_arrange_core::<_, Ba, Bu, _>(pact, name)
139 }
140}
141
142pub struct KeyCollection<'scope, T: Timestamp, K: 'static, R: 'static = usize>(
146 VecCollection<'scope, T, K, R>,
147);
148
149impl<'scope, T: Timestamp, K, R: Semigroup> From<VecCollection<'scope, T, K, R>>
150 for KeyCollection<'scope, T, K, R>
151{
152 fn from(value: VecCollection<'scope, T, K, R>) -> Self {
153 KeyCollection(value)
154 }
155}
156
157impl<'scope, T, K, R> MzArrange<'scope> for KeyCollection<'scope, T, K, R>
158where
159 T: Timestamp + Lattice,
160 K: ExchangeData + Hashable,
161 R: ExchangeData,
162{
163 fn mz_arrange<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
164 where
165 Ba: Batcher<Input = Self::Input, Time = T> + 'static,
166 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
167 Tr: Trace + TraceReader<Time = T> + 'static,
168 Tr::Batch: Batch,
169 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
170 {
171 self.0.map(|d| (d, ())).mz_arrange::<Ba, Bu, _>(name)
172 }
173}
174
175impl<'scope, T, K, R> MzArrangeCore<'scope> for KeyCollection<'scope, T, K, R>
176where
177 T: Timestamp + Lattice,
178 K: Clone + 'static,
179 R: Clone + 'static,
180{
181 type Timestamp = T;
182 type Input = Vec<((K, ()), T, R)>;
183
184 fn mz_arrange_core<P, Ba, Bu, Tr>(self, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
185 where
186 P: ParallelizationContract<T, Self::Input>,
187 Ba: Batcher<Input = Self::Input, Time = T> + 'static,
188 Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
189 Tr: Trace + TraceReader<Time = T> + 'static,
190 Tr::Batch: Batch,
191 Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
192 {
193 self.0
194 .map(|d| (d, ()))
195 .mz_arrange_core::<_, Ba, Bu, _>(pact, name)
196 }
197}
198
199pub trait ArrangementSize {
201 fn log_arrangement_size(self) -> Self;
203}
204
205fn log_arrangement_size_inner<'scope, B, L>(
211 arranged: Arranged<'scope, TraceAgent<Spine<Rc<B>>>>,
212 mut logic: L,
213) -> Arranged<'scope, TraceAgent<Spine<Rc<B>>>>
214where
215 B: Batch + 'static,
216 L: FnMut(&B) -> (usize, usize, usize) + 'static,
217{
218 let scope = arranged.stream.scope();
219 let Some(logger) = scope
220 .worker()
221 .logger_for::<ComputeEventBuilder>("materialize/compute")
222 else {
223 return arranged;
224 };
225 let operator_id = arranged.trace.operator().global_id;
226 let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
227
228 let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
229
230 let stream = arranged
231 .stream
232 .unary(Pipeline, "ArrangementSize", |_cap, info| {
233 let address = info.address;
234 logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
235 ArrangementHeapSizeOperator {
236 operator_id,
237 address: address.to_vec(),
238 },
239 ));
240
241 let mut batches = BTreeMap::new();
243
244 move |input, output| {
245 input.for_each(|time, data| {
246 batches.extend(
247 data.iter()
248 .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))),
249 );
250 output.session(&time).give_container(data);
251 });
252 let Some(trace) = trace.upgrade() else {
253 return;
254 };
255
256 trace.borrow().trace.map_batches(|batch| {
257 batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch));
258 });
259
260 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
261 batches.retain(|_, weak| {
262 if let Some(batch) = weak.upgrade() {
263 let (sz, c, a) = logic(&batch);
264 (size += sz, capacity += c, allocations += a);
265 true
266 } else {
267 false
268 }
269 });
270
271 let size = size.try_into().expect("must fit");
272 if size != old_size {
273 logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
274 operator_id,
275 delta_size: size - old_size,
276 }));
277 }
278
279 let capacity = capacity.try_into().expect("must fit");
280 if capacity != old_capacity {
281 logger.log(&ComputeEvent::ArrangementHeapCapacity(
282 ArrangementHeapCapacity {
283 operator_id,
284 delta_capacity: capacity - old_capacity,
285 },
286 ));
287 }
288
289 let allocations = allocations.try_into().expect("must fit");
290 if allocations != old_allocations {
291 logger.log(&ComputeEvent::ArrangementHeapAllocations(
292 ArrangementHeapAllocations {
293 operator_id,
294 delta_allocations: allocations - old_allocations,
295 },
296 ));
297 }
298
299 old_size = size;
300 old_capacity = capacity;
301 old_allocations = allocations;
302 }
303 });
304 Arranged {
305 trace: arranged.trace,
306 stream,
307 }
308}
309
310impl<'scope, T, K, V, R> ArrangementSize for Arranged<'scope, KeyValAgent<K, V, T, R>>
311where
312 T: MzTimestamp,
313 K: Data + MzData,
314 V: Data + MzData,
315 R: Semigroup + Ord + MzData + 'static,
316{
317 fn log_arrangement_size(self) -> Self {
318 log_arrangement_size_inner(self, |batch| {
319 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
320 let mut callback = |siz, cap| {
321 size += siz;
322 capacity += cap;
323 allocations += usize::from(cap > 0);
324 };
325 batch.storage.keys.heap_size(&mut callback);
326 batch.storage.vals.offs.heap_size(&mut callback);
327 batch.storage.vals.vals.heap_size(&mut callback);
328 batch.storage.upds.offs.heap_size(&mut callback);
329 batch.storage.upds.times.heap_size(&mut callback);
330 batch.storage.upds.diffs.heap_size(&mut callback);
331 (size, capacity, allocations)
332 })
333 }
334}
335
336impl<'scope, T, K, R> ArrangementSize for Arranged<'scope, KeyAgent<K, T, R>>
337where
338 T: MzTimestamp,
339 K: Data + MzArrangeData,
340 R: Semigroup + Ord + MzData + 'static,
341{
342 fn log_arrangement_size(self) -> Self {
343 log_arrangement_size_inner(self, |batch| {
344 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
345 let mut callback = |siz, cap| {
346 size += siz;
347 capacity += cap;
348 allocations += usize::from(cap > 0);
349 };
350 batch.storage.keys.heap_size(&mut callback);
351 batch.storage.upds.offs.heap_size(&mut callback);
352 batch.storage.upds.times.heap_size(&mut callback);
353 batch.storage.upds.diffs.heap_size(&mut callback);
354 (size, capacity, allocations)
355 })
356 }
357}
358
359impl<'scope, T, V, R> ArrangementSize for Arranged<'scope, RowValAgent<V, T, R>>
360where
361 T: MzTimestamp,
362 V: Data + MzArrangeData,
363 R: Semigroup + Ord + MzArrangeData + 'static,
364{
365 fn log_arrangement_size(self) -> Self {
366 log_arrangement_size_inner(self, |batch| {
367 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
368 let mut callback = |siz, cap| {
369 size += siz;
370 capacity += cap;
371 allocations += usize::from(cap > 0);
372 };
373 batch.storage.keys.heap_size(&mut callback);
374 batch.storage.vals.offs.heap_size(&mut callback);
375 batch.storage.vals.vals.heap_size(&mut callback);
376 batch.storage.upds.offs.heap_size(&mut callback);
377 batch.storage.upds.times.heap_size(&mut callback);
378 batch.storage.upds.diffs.heap_size(&mut callback);
379 (size, capacity, allocations)
380 })
381 }
382}
383
384impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowRowAgent<T, R>>
385where
386 T: MzTimestamp,
387 R: Semigroup + Ord + MzArrangeData + 'static,
388{
389 fn log_arrangement_size(self) -> Self {
390 log_arrangement_size_inner(self, |batch| {
391 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
392 let mut callback = |siz, cap| {
393 size += siz;
394 capacity += cap;
395 allocations += usize::from(cap > 0);
396 };
397 batch.storage.keys.heap_size(&mut callback);
398 batch.storage.vals.offs.heap_size(&mut callback);
399 batch.storage.vals.vals.heap_size(&mut callback);
400 batch.storage.upds.offs.heap_size(&mut callback);
401 batch.storage.upds.times.heap_size(&mut callback);
402 batch.storage.upds.diffs.heap_size(&mut callback);
403 (size, capacity, allocations)
404 })
405 }
406}
407
408impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowAgent<T, R>>
409where
410 T: MzTimestamp,
411 R: Semigroup + Ord + MzArrangeData + 'static,
412{
413 fn log_arrangement_size(self) -> Self {
414 log_arrangement_size_inner(self, |batch| {
415 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
416 let mut callback = |siz, cap| {
417 size += siz;
418 capacity += cap;
419 allocations += usize::from(cap > 0);
420 };
421 batch.storage.keys.heap_size(&mut callback);
422 batch.storage.upds.offs.heap_size(&mut callback);
423 batch.storage.upds.times.heap_size(&mut callback);
424 batch.storage.upds.diffs.heap_size(&mut callback);
425 (size, capacity, allocations)
426 })
427 }
428}