Skip to main content

mz_compute/extensions/
arrange.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::rc::{Rc, Weak};
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection};
20use timely::Container;
21use timely::container::{ContainerBuilder, PushInto};
22use timely::dataflow::Stream;
23use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
24use timely::dataflow::operators::Operator;
25use timely::progress::Timestamp;
26
27use crate::logging::compute::{
28    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
29    ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
30};
31use crate::typedefs::{
32    KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
33};
34
35/// Extension trait to arrange data.
36pub trait MzArrange<'scope>: MzArrangeCore<'scope> {
37    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`.
38    ///
39    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
40    /// This trace is current for all times marked completed in the output stream, and probing this stream
41    /// is the correct way to determine that times in the shared trace are committed.
42    fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
43    where
44        Ba: Batcher<Time = Self::Timestamp> + 'static,
45        Chu: ContainerBuilder<Container = Ba::Output>
46            + for<'a> PushInto<&'a mut Self::Input>
47            + 'static,
48        Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
49        Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
50        Tr::Batch: Batch,
51        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
52}
53
54/// Extension trait to arrange data.
55pub trait MzArrangeCore<'scope> {
56    /// The current scope.
57    type Timestamp: Timestamp + Lattice;
58    /// The data input container type.
59    type Input: Container + Clone + 'static;
60
61    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`. Partitions
62    /// the data according to `pact`.
63    ///
64    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
65    /// This trace is current for all times marked completed in the output stream, and probing this stream
66    /// is the correct way to determine that times in the shared trace are committed.
67    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
68        self,
69        pact: P,
70        name: &str,
71    ) -> Arranged<'scope, TraceAgent<Tr>>
72    where
73        P: ParallelizationContract<Self::Timestamp, Self::Input>,
74        Ba: Batcher<Time = Self::Timestamp> + 'static,
75        Chu: ContainerBuilder<Container = Ba::Output>
76            + for<'a> PushInto<&'a mut Self::Input>
77            + 'static,
78        Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
79        Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
80        Tr::Batch: Batch,
81        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
82}
83
84impl<'scope, T, C> MzArrangeCore<'scope> for Stream<'scope, T, C>
85where
86    T: Timestamp + Lattice,
87    C: Container + Clone + 'static,
88{
89    type Timestamp = T;
90    type Input = C;
91
92    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
93        self,
94        pact: P,
95        name: &str,
96    ) -> Arranged<'scope, TraceAgent<Tr>>
97    where
98        P: ParallelizationContract<T, Self::Input>,
99        Ba: Batcher<Time = T> + 'static,
100        Chu: ContainerBuilder<Container = Ba::Output>
101            + for<'a> PushInto<&'a mut Self::Input>
102            + 'static,
103        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
104        Tr: Trace + TraceReader<Time = T> + 'static,
105        Tr::Batch: Batch,
106        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
107    {
108        // Allow access to `arrange_named` because we're within Mz's wrapper.
109        #[allow(clippy::disallowed_methods)]
110        arrange_core::<_, _, Chu, Ba, Bu, _>(self, pact, name).log_arrangement_size()
111    }
112}
113
114impl<'scope, T, K, V, R> MzArrange<'scope> for VecCollection<'scope, T, (K, V), R>
115where
116    T: Timestamp + Lattice,
117    K: ExchangeData + Hashable,
118    V: ExchangeData,
119    R: ExchangeData,
120{
121    fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
122    where
123        Ba: Batcher<Time = T> + 'static,
124        Chu: ContainerBuilder<Container = Ba::Output>
125            + for<'a> PushInto<&'a mut Self::Input>
126            + 'static,
127        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
128        Tr: Trace + TraceReader<Time = T> + 'static,
129        Tr::Batch: Batch,
130        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
131    {
132        let exchange = Exchange::new(move |update: &((K, V), T, R)| (update.0).0.hashed().into());
133        self.mz_arrange_core::<_, Chu, Ba, Bu, _>(exchange, name)
134    }
135}
136
137impl<'scope, T, C> MzArrangeCore<'scope> for Collection<'scope, T, C>
138where
139    T: Timestamp + Lattice,
140    C: Container + Clone + 'static,
141{
142    type Timestamp = T;
143    type Input = C;
144
145    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
146        self,
147        pact: P,
148        name: &str,
149    ) -> Arranged<'scope, TraceAgent<Tr>>
150    where
151        P: ParallelizationContract<T, Self::Input>,
152        Ba: Batcher<Time = T> + 'static,
153        Chu: ContainerBuilder<Container = Ba::Output>
154            + for<'a> PushInto<&'a mut Self::Input>
155            + 'static,
156        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
157        Tr: Trace + TraceReader<Time = T> + 'static,
158        Tr::Batch: Batch,
159        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
160    {
161        self.inner.mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
162    }
163}
164
165/// A specialized collection where data only has a key, but no associated value.
166///
167/// Created by calling `collection.into()`.
168pub struct KeyCollection<'scope, T: Timestamp, K: 'static, R: 'static = usize>(
169    VecCollection<'scope, T, K, R>,
170);
171
172impl<'scope, T: Timestamp, K, R: Semigroup> From<VecCollection<'scope, T, K, R>>
173    for KeyCollection<'scope, T, K, R>
174{
175    fn from(value: VecCollection<'scope, T, K, R>) -> Self {
176        KeyCollection(value)
177    }
178}
179
180impl<'scope, T, K, R> MzArrange<'scope> for KeyCollection<'scope, T, K, R>
181where
182    T: Timestamp + Lattice,
183    K: ExchangeData + Hashable,
184    R: ExchangeData,
185{
186    fn mz_arrange<Chu, Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
187    where
188        Ba: Batcher<Time = T> + 'static,
189        Chu: ContainerBuilder<Container = Ba::Output>
190            + for<'a> PushInto<&'a mut Self::Input>
191            + 'static,
192        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
193        Tr: Trace + TraceReader<Time = T> + 'static,
194        Tr::Batch: Batch,
195        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
196    {
197        self.0.map(|d| (d, ())).mz_arrange::<Chu, Ba, Bu, _>(name)
198    }
199}
200
201impl<'scope, T, K, R> MzArrangeCore<'scope> for KeyCollection<'scope, T, K, R>
202where
203    T: Timestamp + Lattice,
204    K: Clone + 'static,
205    R: Clone + 'static,
206{
207    type Timestamp = T;
208    type Input = Vec<((K, ()), T, R)>;
209
210    fn mz_arrange_core<P, Chu, Ba, Bu, Tr>(
211        self,
212        pact: P,
213        name: &str,
214    ) -> Arranged<'scope, TraceAgent<Tr>>
215    where
216        P: ParallelizationContract<T, Self::Input>,
217        Ba: Batcher<Time = T> + 'static,
218        Chu: ContainerBuilder<Container = Ba::Output>
219            + for<'a> PushInto<&'a mut Self::Input>
220            + 'static,
221        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
222        Tr: Trace + TraceReader<Time = T> + 'static,
223        Tr::Batch: Batch,
224        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
225    {
226        self.0
227            .map(|d| (d, ()))
228            .mz_arrange_core::<_, Chu, Ba, Bu, _>(pact, name)
229    }
230}
231
232/// A type that can log its heap size.
233pub trait ArrangementSize {
234    /// Install a logger to track the heap size of the target.
235    fn log_arrangement_size(self) -> Self;
236}
237
238/// Helper for [`ArrangementSize`] to install a common operator holding on to a trace.
239///
240/// * `arranged`: The arrangement to inspect.
241/// * `logic`: Closure that calculates the heap size/capacity/allocations for a batch. The return
242///    value are size and capacity in bytes, and number of allocations, all in absolute values.
243fn log_arrangement_size_inner<'scope, B, L>(
244    arranged: Arranged<'scope, TraceAgent<Spine<Rc<B>>>>,
245    mut logic: L,
246) -> Arranged<'scope, TraceAgent<Spine<Rc<B>>>>
247where
248    B: Batch + 'static,
249    L: FnMut(&B) -> (usize, usize, usize) + 'static,
250{
251    let scope = arranged.stream.scope();
252    let Some(logger) = scope
253        .worker()
254        .logger_for::<ComputeEventBuilder>("materialize/compute")
255    else {
256        return arranged;
257    };
258    let operator_id = arranged.trace.operator().global_id;
259    let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
260
261    let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
262
263    let stream = arranged
264        .stream
265        .unary(Pipeline, "ArrangementSize", |_cap, info| {
266            let address = info.address;
267            logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
268                ArrangementHeapSizeOperator {
269                    operator_id,
270                    address: address.to_vec(),
271                },
272            ));
273
274            // Weak references to batches, so we can observe batches outside the trace.
275            // Batches are immutable once sealed, so we compute their size exactly
276            // once (when first observed) and cache it alongside the weak reference.
277            // Subsequent activations only sum the cached values for live batches,
278            // avoiding a repeated walk of every batch's backing regions.
279            let mut batches: BTreeMap<*const B, (Weak<B>, (usize, usize, usize))> = BTreeMap::new();
280
281            move |input, output| {
282                input.for_each(|time, data| {
283                    for batch in data.iter() {
284                        batches
285                            .entry(Rc::as_ptr(batch))
286                            .or_insert_with(|| (Rc::downgrade(batch), logic(batch)));
287                    }
288                    output.session(&time).give_container(data);
289                });
290                let Some(trace) = trace.upgrade() else {
291                    return;
292                };
293
294                trace.borrow().trace().map_batches(|batch| {
295                    batches
296                        .entry(Rc::as_ptr(batch))
297                        .or_insert_with(|| (Rc::downgrade(batch), logic(batch)));
298                });
299
300                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
301                batches.retain(|_, (weak, cached)| {
302                    if weak.strong_count() > 0 {
303                        let (sz, c, a) = *cached;
304                        (size += sz, capacity += c, allocations += a);
305                        true
306                    } else {
307                        false
308                    }
309                });
310
311                let size = size.try_into().expect("must fit");
312                if size != old_size {
313                    logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
314                        operator_id,
315                        delta_size: size - old_size,
316                    }));
317                }
318
319                let capacity = capacity.try_into().expect("must fit");
320                if capacity != old_capacity {
321                    logger.log(&ComputeEvent::ArrangementHeapCapacity(
322                        ArrangementHeapCapacity {
323                            operator_id,
324                            delta_capacity: capacity - old_capacity,
325                        },
326                    ));
327                }
328
329                let allocations = allocations.try_into().expect("must fit");
330                if allocations != old_allocations {
331                    logger.log(&ComputeEvent::ArrangementHeapAllocations(
332                        ArrangementHeapAllocations {
333                            operator_id,
334                            delta_allocations: allocations - old_allocations,
335                        },
336                    ));
337                }
338
339                old_size = size;
340                old_capacity = capacity;
341                old_allocations = allocations;
342            }
343        });
344    Arranged {
345        trace: arranged.trace,
346        stream,
347    }
348}
349
350impl<'scope, T, K, V, R> ArrangementSize for Arranged<'scope, KeyValAgent<K, V, T, R>>
351where
352    T: MzTimestamp,
353    K: Data + MzData,
354    V: Data + MzData,
355    R: Semigroup + Ord + MzData + 'static,
356{
357    fn log_arrangement_size(self) -> Self {
358        log_arrangement_size_inner(self, |batch| {
359            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
360            let mut callback = |siz, cap| {
361                size += siz;
362                capacity += cap;
363                allocations += usize::from(cap > 0);
364            };
365            batch.storage.keys.heap_size(&mut callback);
366            batch.storage.vals.offs.heap_size(&mut callback);
367            batch.storage.vals.vals.heap_size(&mut callback);
368            batch.storage.upds.offs.heap_size(&mut callback);
369            batch.storage.upds.times.heap_size(&mut callback);
370            batch.storage.upds.diffs.heap_size(&mut callback);
371            (size, capacity, allocations)
372        })
373    }
374}
375
376impl<'scope, T, K, R> ArrangementSize for Arranged<'scope, KeyAgent<K, T, R>>
377where
378    T: MzTimestamp,
379    K: Data + MzArrangeData,
380    R: Semigroup + Ord + MzData + 'static,
381{
382    fn log_arrangement_size(self) -> Self {
383        log_arrangement_size_inner(self, |batch| {
384            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
385            let mut callback = |siz, cap| {
386                size += siz;
387                capacity += cap;
388                allocations += usize::from(cap > 0);
389            };
390            batch.storage.keys.heap_size(&mut callback);
391            batch.storage.upds.offs.heap_size(&mut callback);
392            batch.storage.upds.times.heap_size(&mut callback);
393            batch.storage.upds.diffs.heap_size(&mut callback);
394            (size, capacity, allocations)
395        })
396    }
397}
398
399impl<'scope, T, V, R> ArrangementSize for Arranged<'scope, RowValAgent<V, T, R>>
400where
401    T: MzTimestamp,
402    V: Data + MzArrangeData,
403    R: Semigroup + Ord + MzArrangeData + 'static,
404{
405    fn log_arrangement_size(self) -> Self {
406        log_arrangement_size_inner(self, |batch| {
407            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
408            let mut callback = |siz, cap| {
409                size += siz;
410                capacity += cap;
411                allocations += usize::from(cap > 0);
412            };
413            batch.storage.keys.heap_size(&mut callback);
414            batch.storage.vals.offs.heap_size(&mut callback);
415            batch.storage.vals.vals.heap_size(&mut callback);
416            batch.storage.upds.offs.heap_size(&mut callback);
417            batch.storage.upds.times.heap_size(&mut callback);
418            batch.storage.upds.diffs.heap_size(&mut callback);
419            (size, capacity, allocations)
420        })
421    }
422}
423
424impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowRowAgent<T, R>>
425where
426    T: MzTimestamp,
427    R: Semigroup + Ord + MzArrangeData + 'static,
428{
429    fn log_arrangement_size(self) -> Self {
430        log_arrangement_size_inner(self, |batch| {
431            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
432            let mut callback = |siz, cap| {
433                size += siz;
434                capacity += cap;
435                allocations += usize::from(cap > 0);
436            };
437            batch.storage.keys.heap_size(&mut callback);
438            batch.storage.vals.offs.heap_size(&mut callback);
439            batch.storage.vals.vals.heap_size(&mut callback);
440            batch.storage.upds.offs.heap_size(&mut callback);
441            batch.storage.upds.times.heap_size(&mut callback);
442            batch.storage.upds.diffs.heap_size(&mut callback);
443            (size, capacity, allocations)
444        })
445    }
446}
447
448impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowAgent<T, R>>
449where
450    T: MzTimestamp,
451    R: Semigroup + Ord + MzArrangeData + 'static,
452{
453    fn log_arrangement_size(self) -> Self {
454        log_arrangement_size_inner(self, |batch| {
455            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
456            let mut callback = |siz, cap| {
457                size += siz;
458                capacity += cap;
459                allocations += usize::from(cap > 0);
460            };
461            batch.storage.keys.heap_size(&mut callback);
462            batch.storage.upds.offs.heap_size(&mut callback);
463            batch.storage.upds.times.heap_size(&mut callback);
464            batch.storage.upds.diffs.heap_size(&mut callback);
465            (size, capacity, allocations)
466        })
467    }
468}