Skip to main content

mz_compute/extensions/
arrange.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::rc::Rc;
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection};
20use timely::Container;
21use timely::container::{ContainerBuilder, PushInto};
22use timely::dataflow::Stream;
23use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
24use timely::dataflow::operators::Operator;
25use timely::progress::Timestamp;
26
27use crate::logging::compute::{
28    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
29    ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
30};
31use crate::typedefs::{
32    KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
33};
34
35/// Extension trait to arrange data.
36pub trait MzArrange<'scope>: MzArrangeCore<'scope> {
37    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`.
38    ///
39    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
40    /// This trace is current for all times marked completed in the output stream, and probing this stream
41    /// is the correct way to determine that times in the shared trace are committed.
42    fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
43    where
44        Ba: Batcher<Time = Self::Timestamp> + 'static,
45        Chu: ContainerBuilder<Container = Ba::Output>
46            + for<'a> PushInto<&'a mut Self::Input>
47            + 'static,
48        Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
49        Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
50        Tr::Batch: Batch,
51        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
52}
53
54/// Extension trait to arrange data.
55pub trait MzArrangeCore<'scope> {
56    /// The current scope.
57    type Timestamp: Timestamp + Lattice;
58    /// The data input container type.
59    type Input: Container + Clone + 'static;
60
61    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`. Partitions
62    /// the data according to `pact`.
63    ///
64    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
65    /// This trace is current for all times marked completed in the output stream, and probing this stream
66    /// is the correct way to determine that times in the shared trace are committed.
67    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
68        self,
69        pact: P,
70        name: &str,
71    ) -> Arranged<'scope, TraceAgent<Tr>>
72    where
73        P: ParallelizationContract<Self::Timestamp, Self::Input>,
74        Ba: Batcher<Time = Self::Timestamp> + 'static,
75        Chu: ContainerBuilder<Container = Ba::Output>
76            + for<'a> PushInto<&'a mut Self::Input>
77            + 'static,
78        Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
79        Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
80        Tr::Batch: Batch,
81        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
82}
83
84impl<'scope, T, C> MzArrangeCore<'scope> for Stream<'scope, T, C>
85where
86    T: Timestamp + Lattice,
87    C: Container + Clone + 'static,
88{
89    type Timestamp = T;
90    type Input = C;
91
92    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
93        self,
94        pact: P,
95        name: &str,
96    ) -> Arranged<'scope, TraceAgent<Tr>>
97    where
98        P: ParallelizationContract<T, Self::Input>,
99        Ba: Batcher<Time = T> + 'static,
100        Chu: ContainerBuilder<Container = Ba::Output>
101            + for<'a> PushInto<&'a mut Self::Input>
102            + 'static,
103        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
104        Tr: Trace + TraceReader<Time = T> + 'static,
105        Tr::Batch: Batch,
106        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
107    {
108        // Allow access to `arrange_named` because we're within Mz's wrapper.
109        #[allow(clippy::disallowed_methods)]
110        arrange_core::<_, _, Chu, Ba, Bu, _>(self, pact, name).log_arrangement_size()
111    }
112}
113
114impl<'scope, T, K, V, R> MzArrange<'scope> for VecCollection<'scope, T, (K, V), R>
115where
116    T: Timestamp + Lattice,
117    K: ExchangeData + Hashable,
118    V: ExchangeData,
119    R: ExchangeData,
120{
121    fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
122    where
123        Ba: Batcher<Time = T> + 'static,
124        Chu: ContainerBuilder<Container = Ba::Output>
125            + for<'a> PushInto<&'a mut Self::Input>
126            + 'static,
127        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
128        Tr: Trace + TraceReader<Time = T> + 'static,
129        Tr::Batch: Batch,
130        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
131    {
132        let exchange = Exchange::new(move |update: &((K, V), T, R)| (update.0).0.hashed().into());
133        self.mz_arrange_core::<_, Chu, Ba, Bu, _>(exchange, name)
134    }
135}
136
137impl<'scope, T, C> MzArrangeCore<'scope> for Collection<'scope, T, C>
138where
139    T: Timestamp + Lattice,
140    C: Container + Clone + 'static,
141{
142    type Timestamp = T;
143    type Input = C;
144
145    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
146        self,
147        pact: P,
148        name: &str,
149    ) -> Arranged<'scope, TraceAgent<Tr>>
150    where
151        P: ParallelizationContract<T, Self::Input>,
152        Ba: Batcher<Time = T> + 'static,
153        Chu: ContainerBuilder<Container = Ba::Output>
154            + for<'a> PushInto<&'a mut Self::Input>
155            + 'static,
156        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
157        Tr: Trace + TraceReader<Time = T> + 'static,
158        Tr::Batch: Batch,
159        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
160    {
161        self.inner.mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
162    }
163}
164
165/// A specialized collection where data only has a key, but no associated value.
166///
167/// Created by calling `collection.into()`.
168pub struct KeyCollection<'scope, T: Timestamp, K: 'static, R: 'static = usize>(
169    VecCollection<'scope, T, K, R>,
170);
171
172impl<'scope, T: Timestamp, K, R: Semigroup> From<VecCollection<'scope, T, K, R>>
173    for KeyCollection<'scope, T, K, R>
174{
175    fn from(value: VecCollection<'scope, T, K, R>) -> Self {
176        KeyCollection(value)
177    }
178}
179
180impl<'scope, T, K, R> MzArrange<'scope> for KeyCollection<'scope, T, K, R>
181where
182    T: Timestamp + Lattice,
183    K: ExchangeData + Hashable,
184    R: ExchangeData,
185{
186    fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
187    where
188        Ba: Batcher<Time = T> + 'static,
189        Chu: ContainerBuilder<Container = Ba::Output>
190            + for<'a> PushInto<&'a mut Self::Input>
191            + 'static,
192        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
193        Tr: Trace + TraceReader<Time = T> + 'static,
194        Tr::Batch: Batch,
195        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
196    {
197        self.0.map(|d| (d, ())).mz_arrange::<Chu, Ba, Bu, _>(name)
198    }
199}
200
201impl<'scope, T, K, R> MzArrangeCore<'scope> for KeyCollection<'scope, T, K, R>
202where
203    T: Timestamp + Lattice,
204    K: Clone + 'static,
205    R: Clone + 'static,
206{
207    type Timestamp = T;
208    type Input = Vec<((K, ()), T, R)>;
209
210    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
211        self,
212        pact: P,
213        name: &str,
214    ) -> Arranged<'scope, TraceAgent<Tr>>
215    where
216        P: ParallelizationContract<T, Self::Input>,
217        Ba: Batcher<Time = T> + 'static,
218        Chu: ContainerBuilder<Container = Ba::Output>
219            + for<'a> PushInto<&'a mut Self::Input>
220            + 'static,
221        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
222        Tr: Trace + TraceReader<Time = T> + 'static,
223        Tr::Batch: Batch,
224        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
225    {
226        self.0
227            .map(|d| (d, ()))
228            .mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
229    }
230}
231
232/// A type that can log its heap size.
233pub trait ArrangementSize {
234    /// Install a logger to track the heap size of the target.
235    fn log_arrangement_size(self) -> Self;
236}
237
238/// Helper for [`ArrangementSize`] to install a common operator holding on to a trace.
239///
240/// * `arranged`: The arrangement to inspect.
241/// * `logic`: Closure that calculates the heap size/capacity/allocations for a batch. The return
242///    value are size and capacity in bytes, and number of allocations, all in absolute values.
243fn log_arrangement_size_inner<'scope, B, L>(
244    arranged: Arranged<'scope, TraceAgent<Spine<Rc<B>>>>,
245    mut logic: L,
246) -> Arranged<'scope, TraceAgent<Spine<Rc<B>>>>
247where
248    B: Batch + 'static,
249    L: FnMut(&B) -> (usize, usize, usize) + 'static,
250{
251    let scope = arranged.stream.scope();
252    let Some(logger) = scope
253        .worker()
254        .logger_for::<ComputeEventBuilder>("materialize/compute")
255    else {
256        return arranged;
257    };
258    let operator_id = arranged.trace.operator().global_id;
259    let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
260
261    let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
262
263    let stream = arranged
264        .stream
265        .unary(Pipeline, "ArrangementSize", |_cap, info| {
266            let address = info.address;
267            logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
268                ArrangementHeapSizeOperator {
269                    operator_id,
270                    address: address.to_vec(),
271                },
272            ));
273
274            // Weak references to batches, so we can observe batches outside the trace.
275            let mut batches = BTreeMap::new();
276
277            move |input, output| {
278                input.for_each(|time, data| {
279                    batches.extend(
280                        data.iter()
281                            .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))),
282                    );
283                    output.session(&time).give_container(data);
284                });
285                let Some(trace) = trace.upgrade() else {
286                    return;
287                };
288
289                trace.borrow().trace().map_batches(|batch| {
290                    batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch));
291                });
292
293                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
294                batches.retain(|_, weak| {
295                    if let Some(batch) = weak.upgrade() {
296                        let (sz, c, a) = logic(&batch);
297                        (size += sz, capacity += c, allocations += a);
298                        true
299                    } else {
300                        false
301                    }
302                });
303
304                let size = size.try_into().expect("must fit");
305                if size != old_size {
306                    logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
307                        operator_id,
308                        delta_size: size - old_size,
309                    }));
310                }
311
312                let capacity = capacity.try_into().expect("must fit");
313                if capacity != old_capacity {
314                    logger.log(&ComputeEvent::ArrangementHeapCapacity(
315                        ArrangementHeapCapacity {
316                            operator_id,
317                            delta_capacity: capacity - old_capacity,
318                        },
319                    ));
320                }
321
322                let allocations = allocations.try_into().expect("must fit");
323                if allocations != old_allocations {
324                    logger.log(&ComputeEvent::ArrangementHeapAllocations(
325                        ArrangementHeapAllocations {
326                            operator_id,
327                            delta_allocations: allocations - old_allocations,
328                        },
329                    ));
330                }
331
332                old_size = size;
333                old_capacity = capacity;
334                old_allocations = allocations;
335            }
336        });
337    Arranged {
338        trace: arranged.trace,
339        stream,
340    }
341}
342
343impl<'scope, T, K, V, R> ArrangementSize for Arranged<'scope, KeyValAgent<K, V, T, R>>
344where
345    T: MzTimestamp,
346    K: Data + MzData,
347    V: Data + MzData,
348    R: Semigroup + Ord + MzData + 'static,
349{
350    fn log_arrangement_size(self) -> Self {
351        log_arrangement_size_inner(self, |batch| {
352            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
353            let mut callback = |siz, cap| {
354                size += siz;
355                capacity += cap;
356                allocations += usize::from(cap > 0);
357            };
358            batch.storage.keys.heap_size(&mut callback);
359            batch.storage.vals.offs.heap_size(&mut callback);
360            batch.storage.vals.vals.heap_size(&mut callback);
361            batch.storage.upds.offs.heap_size(&mut callback);
362            batch.storage.upds.times.heap_size(&mut callback);
363            batch.storage.upds.diffs.heap_size(&mut callback);
364            (size, capacity, allocations)
365        })
366    }
367}
368
369impl<'scope, T, K, R> ArrangementSize for Arranged<'scope, KeyAgent<K, T, R>>
370where
371    T: MzTimestamp,
372    K: Data + MzArrangeData,
373    R: Semigroup + Ord + MzData + 'static,
374{
375    fn log_arrangement_size(self) -> Self {
376        log_arrangement_size_inner(self, |batch| {
377            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
378            let mut callback = |siz, cap| {
379                size += siz;
380                capacity += cap;
381                allocations += usize::from(cap > 0);
382            };
383            batch.storage.keys.heap_size(&mut callback);
384            batch.storage.upds.offs.heap_size(&mut callback);
385            batch.storage.upds.times.heap_size(&mut callback);
386            batch.storage.upds.diffs.heap_size(&mut callback);
387            (size, capacity, allocations)
388        })
389    }
390}
391
392impl<'scope, T, V, R> ArrangementSize for Arranged<'scope, RowValAgent<V, T, R>>
393where
394    T: MzTimestamp,
395    V: Data + MzArrangeData,
396    R: Semigroup + Ord + MzArrangeData + 'static,
397{
398    fn log_arrangement_size(self) -> Self {
399        log_arrangement_size_inner(self, |batch| {
400            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
401            let mut callback = |siz, cap| {
402                size += siz;
403                capacity += cap;
404                allocations += usize::from(cap > 0);
405            };
406            batch.storage.keys.heap_size(&mut callback);
407            batch.storage.vals.offs.heap_size(&mut callback);
408            batch.storage.vals.vals.heap_size(&mut callback);
409            batch.storage.upds.offs.heap_size(&mut callback);
410            batch.storage.upds.times.heap_size(&mut callback);
411            batch.storage.upds.diffs.heap_size(&mut callback);
412            (size, capacity, allocations)
413        })
414    }
415}
416
417impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowRowAgent<T, R>>
418where
419    T: MzTimestamp,
420    R: Semigroup + Ord + MzArrangeData + 'static,
421{
422    fn log_arrangement_size(self) -> Self {
423        log_arrangement_size_inner(self, |batch| {
424            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
425            let mut callback = |siz, cap| {
426                size += siz;
427                capacity += cap;
428                allocations += usize::from(cap > 0);
429            };
430            batch.storage.keys.heap_size(&mut callback);
431            batch.storage.vals.offs.heap_size(&mut callback);
432            batch.storage.vals.vals.heap_size(&mut callback);
433            batch.storage.upds.offs.heap_size(&mut callback);
434            batch.storage.upds.times.heap_size(&mut callback);
435            batch.storage.upds.diffs.heap_size(&mut callback);
436            (size, capacity, allocations)
437        })
438    }
439}
440
441impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowAgent<T, R>>
442where
443    T: MzTimestamp,
444    R: Semigroup + Ord + MzArrangeData + 'static,
445{
446    fn log_arrangement_size(self) -> Self {
447        log_arrangement_size_inner(self, |batch| {
448            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
449            let mut callback = |siz, cap| {
450                size += siz;
451                capacity += cap;
452                allocations += usize::from(cap > 0);
453            };
454            batch.storage.keys.heap_size(&mut callback);
455            batch.storage.upds.offs.heap_size(&mut callback);
456            batch.storage.upds.times.heap_size(&mut callback);
457            batch.storage.upds.diffs.heap_size(&mut callback);
458            (size, capacity, allocations)
459        })
460    }
461}