mz_compute/extensions/
arrange.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::rc::Rc;
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable};
20use timely::Container;
21use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
22use timely::dataflow::operators::Operator;
23use timely::dataflow::{Scope, ScopeParent, StreamCore};
24
25use crate::logging::compute::{
26    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
27    ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
28};
29use crate::typedefs::{
30    KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
31};
32
33/// Extension trait to arrange data.
34pub trait MzArrange: MzArrangeCore
35where
36    <Self::Scope as ScopeParent>::Timestamp: Lattice,
37{
38    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`.
39    ///
40    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
41    /// This trace is current for all times marked completed in the output stream, and probing this stream
42    /// is the correct way to determine that times in the shared trace are committed.
43    fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<Self::Scope, TraceAgent<Tr>>
44    where
45        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
46        Bu: Builder<
47                Time = <Self::Scope as ScopeParent>::Timestamp,
48                Input = Ba::Output,
49                Output = Tr::Batch,
50            >,
51        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
52        Tr::Batch: Batch,
53        Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
54}
55
56/// Extension trait to arrange data.
57pub trait MzArrangeCore
58where
59    <Self::Scope as ScopeParent>::Timestamp: Lattice,
60{
61    /// The current scope.
62    type Scope: Scope;
63    /// The data input container type.
64    type Input: Container + Clone + 'static;
65
66    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`. Partitions
67    /// the data according to `pact`.
68    ///
69    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
70    /// This trace is current for all times marked completed in the output stream, and probing this stream
71    /// is the correct way to determine that times in the shared trace are committed.
72    fn mz_arrange_core<P, Ba, Bu, Tr>(
73        &self,
74        pact: P,
75        name: &str,
76    ) -> Arranged<Self::Scope, TraceAgent<Tr>>
77    where
78        P: ParallelizationContract<<Self::Scope as ScopeParent>::Timestamp, Self::Input>,
79        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
80        // Ba::Input: Container + Clone + 'static,
81        Bu: Builder<
82                Time = <Self::Scope as ScopeParent>::Timestamp,
83                Input = Ba::Output,
84                Output = Tr::Batch,
85            >,
86        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
87        Tr::Batch: Batch,
88        Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
89}
90
91impl<G, C> MzArrangeCore for StreamCore<G, C>
92where
93    G: Scope,
94    G::Timestamp: Lattice,
95    C: Container + Clone + 'static,
96{
97    type Scope = G;
98    type Input = C;
99
100    fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
101    where
102        P: ParallelizationContract<G::Timestamp, Self::Input>,
103        Ba: Batcher<Input = Self::Input, Time = G::Timestamp> + 'static,
104        Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
105        Tr: Trace + TraceReader<Time = G::Timestamp> + 'static,
106        Tr::Batch: Batch,
107        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
108    {
109        // Allow access to `arrange_named` because we're within Mz's wrapper.
110        #[allow(clippy::disallowed_methods)]
111        arrange_core::<_, _, Ba, Bu, _>(self, pact, name).log_arrangement_size()
112    }
113}
114
115impl<G, K, V, R> MzArrange for Collection<G, (K, V), R>
116where
117    G: Scope,
118    G::Timestamp: Lattice,
119    K: ExchangeData + Hashable,
120    V: ExchangeData,
121    R: ExchangeData,
122{
123    fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
124    where
125        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
126        Bu: Builder<
127                Time = <Self::Scope as ScopeParent>::Timestamp,
128                Input = Ba::Output,
129                Output = Tr::Batch,
130            >,
131        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
132        Tr::Batch: Batch,
133        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
134    {
135        let exchange =
136            Exchange::new(move |update: &((K, V), G::Timestamp, R)| (update.0).0.hashed().into());
137        self.mz_arrange_core::<_, Ba, Bu, _>(exchange, name)
138    }
139}
140
141impl<G, K, V, R, C> MzArrangeCore for Collection<G, (K, V), R, C>
142where
143    G: Scope,
144    G::Timestamp: Lattice,
145    C: Container + Clone + 'static,
146{
147    type Scope = G;
148    type Input = C;
149
150    fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
151    where
152        P: ParallelizationContract<G::Timestamp, Self::Input>,
153        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
154        Bu: Builder<
155                Time = <Self::Scope as ScopeParent>::Timestamp,
156                Input = Ba::Output,
157                Output = Tr::Batch,
158            >,
159        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
160        Tr::Batch: Batch,
161        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
162    {
163        self.inner.mz_arrange_core::<_, Ba, Bu, _>(pact, name)
164    }
165}
166
167/// A specialized collection where data only has a key, but no associated value.
168///
169/// Created by calling `collection.into()`.
170pub struct KeyCollection<G: Scope, K, R = usize>(Collection<G, K, R>);
171
172impl<G: Scope, K, R: Semigroup> From<Collection<G, K, R>> for KeyCollection<G, K, R> {
173    fn from(value: Collection<G, K, R>) -> Self {
174        KeyCollection(value)
175    }
176}
177
178impl<G, K, R> MzArrange for KeyCollection<G, K, R>
179where
180    G: Scope,
181    K: ExchangeData + Hashable,
182    G::Timestamp: Lattice,
183    R: ExchangeData,
184{
185    fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
186    where
187        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
188        Bu: Builder<
189                Time = <Self::Scope as ScopeParent>::Timestamp,
190                Input = Ba::Output,
191                Output = Tr::Batch,
192            >,
193        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
194        Tr::Batch: Batch,
195        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
196    {
197        self.0.map(|d| (d, ())).mz_arrange::<Ba, Bu, _>(name)
198    }
199}
200
201impl<G, K, R> MzArrangeCore for KeyCollection<G, K, R>
202where
203    G: Scope,
204    K: Data,
205    G::Timestamp: Lattice,
206    R: Data,
207{
208    type Scope = G;
209    type Input = Vec<((K, ()), G::Timestamp, R)>;
210
211    fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
212    where
213        P: ParallelizationContract<G::Timestamp, Self::Input>,
214        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
215        Bu: Builder<
216                Time = <Self::Scope as ScopeParent>::Timestamp,
217                Input = Ba::Output,
218                Output = Tr::Batch,
219            >,
220        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
221        Tr::Batch: Batch,
222        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
223    {
224        self.0
225            .map(|d| (d, ()))
226            .mz_arrange_core::<_, Ba, Bu, _>(pact, name)
227    }
228}
229
230/// A type that can log its heap size.
231pub trait ArrangementSize {
232    /// Install a logger to track the heap size of the target.
233    fn log_arrangement_size(self) -> Self;
234}
235
236/// Helper for [`ArrangementSize`] to install a common operator holding on to a trace.
237///
238/// * `arranged`: The arrangement to inspect.
239/// * `logic`: Closure that calculates the heap size/capacity/allocations for a batch. The return
240///    value are size and capacity in bytes, and number of allocations, all in absolute values.
241fn log_arrangement_size_inner<G, B, L>(
242    arranged: Arranged<G, TraceAgent<Spine<Rc<B>>>>,
243    mut logic: L,
244) -> Arranged<G, TraceAgent<Spine<Rc<B>>>>
245where
246    G: Scope<Timestamp: Lattice>,
247    B: Batch + 'static,
248    L: FnMut(&B) -> (usize, usize, usize) + 'static,
249{
250    let scope = arranged.stream.scope();
251    let Some(logger) = scope.logger_for::<ComputeEventBuilder>("materialize/compute") else {
252        return arranged;
253    };
254    let operator_id = arranged.trace.operator().global_id;
255    let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
256
257    let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
258
259    let stream = arranged
260        .stream
261        .unary(Pipeline, "ArrangementSize", |_cap, info| {
262            let address = info.address;
263            logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
264                ArrangementHeapSizeOperator {
265                    operator_id,
266                    address: address.to_vec(),
267                },
268            ));
269
270            // Weak references to batches, so we can observe batches outside the trace.
271            let mut batches = BTreeMap::new();
272
273            move |input, output| {
274                while let Some((time, data)) = input.next() {
275                    batches.extend(
276                        data.iter()
277                            .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))),
278                    );
279                    output.session(&time).give_container(data);
280                }
281                let Some(trace) = trace.upgrade() else {
282                    return;
283                };
284
285                trace.borrow().trace.map_batches(|batch| {
286                    batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch));
287                });
288
289                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
290                batches.retain(|_, weak| {
291                    if let Some(batch) = weak.upgrade() {
292                        let (sz, c, a) = logic(&batch);
293                        (size += sz, capacity += c, allocations += a);
294                        true
295                    } else {
296                        false
297                    }
298                });
299
300                let size = size.try_into().expect("must fit");
301                if size != old_size {
302                    logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
303                        operator_id,
304                        delta_size: size - old_size,
305                    }));
306                }
307
308                let capacity = capacity.try_into().expect("must fit");
309                if capacity != old_capacity {
310                    logger.log(&ComputeEvent::ArrangementHeapCapacity(
311                        ArrangementHeapCapacity {
312                            operator_id,
313                            delta_capacity: capacity - old_capacity,
314                        },
315                    ));
316                }
317
318                let allocations = allocations.try_into().expect("must fit");
319                if allocations != old_allocations {
320                    logger.log(&ComputeEvent::ArrangementHeapAllocations(
321                        ArrangementHeapAllocations {
322                            operator_id,
323                            delta_allocations: allocations - old_allocations,
324                        },
325                    ));
326                }
327
328                old_size = size;
329                old_capacity = capacity;
330                old_allocations = allocations;
331            }
332        });
333    Arranged {
334        trace: arranged.trace,
335        stream,
336    }
337}
338
339impl<G, K, V, R> ArrangementSize for Arranged<G, KeyValAgent<K, V, G::Timestamp, R>>
340where
341    G: Scope,
342    G::Timestamp: MzTimestamp,
343    K: Data + MzData,
344    V: Data + MzData,
345    R: Semigroup + Ord + MzData + 'static,
346{
347    fn log_arrangement_size(self) -> Self {
348        log_arrangement_size_inner(self, |batch| {
349            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
350            let mut callback = |siz, cap| {
351                size += siz;
352                capacity += cap;
353                allocations += usize::from(cap > 0);
354            };
355            batch.storage.keys.heap_size(&mut callback);
356            batch.storage.vals.offs.heap_size(&mut callback);
357            batch.storage.vals.vals.heap_size(&mut callback);
358            batch.storage.upds.offs.heap_size(&mut callback);
359            batch.storage.upds.times.heap_size(&mut callback);
360            batch.storage.upds.diffs.heap_size(&mut callback);
361            (size, capacity, allocations)
362        })
363    }
364}
365
366impl<G, K, R> ArrangementSize for Arranged<G, KeyAgent<K, G::Timestamp, R>>
367where
368    G: Scope,
369    G::Timestamp: MzTimestamp,
370    K: Data + MzArrangeData,
371    R: Semigroup + Ord + MzData + 'static,
372{
373    fn log_arrangement_size(self) -> Self {
374        log_arrangement_size_inner(self, |batch| {
375            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
376            let mut callback = |siz, cap| {
377                size += siz;
378                capacity += cap;
379                allocations += usize::from(cap > 0);
380            };
381            batch.storage.keys.heap_size(&mut callback);
382            batch.storage.upds.offs.heap_size(&mut callback);
383            batch.storage.upds.times.heap_size(&mut callback);
384            batch.storage.upds.diffs.heap_size(&mut callback);
385            (size, capacity, allocations)
386        })
387    }
388}
389
390impl<G, V, R> ArrangementSize for Arranged<G, RowValAgent<V, G::Timestamp, R>>
391where
392    G: Scope,
393    G::Timestamp: MzTimestamp,
394    V: Data + MzArrangeData,
395    R: Semigroup + Ord + MzArrangeData + 'static,
396{
397    fn log_arrangement_size(self) -> Self {
398        log_arrangement_size_inner(self, |batch| {
399            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
400            let mut callback = |siz, cap| {
401                size += siz;
402                capacity += cap;
403                allocations += usize::from(cap > 0);
404            };
405            batch.storage.keys.heap_size(&mut callback);
406            batch.storage.vals.offs.heap_size(&mut callback);
407            batch.storage.vals.vals.heap_size(&mut callback);
408            batch.storage.upds.offs.heap_size(&mut callback);
409            batch.storage.upds.times.heap_size(&mut callback);
410            batch.storage.upds.diffs.heap_size(&mut callback);
411            (size, capacity, allocations)
412        })
413    }
414}
415
416impl<G, R> ArrangementSize for Arranged<G, RowRowAgent<G::Timestamp, R>>
417where
418    G: Scope,
419    G::Timestamp: MzTimestamp,
420    R: Semigroup + Ord + MzArrangeData + 'static,
421{
422    fn log_arrangement_size(self) -> Self {
423        log_arrangement_size_inner(self, |batch| {
424            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
425            let mut callback = |siz, cap| {
426                size += siz;
427                capacity += cap;
428                allocations += usize::from(cap > 0);
429            };
430            batch.storage.keys.heap_size(&mut callback);
431            batch.storage.vals.offs.heap_size(&mut callback);
432            batch.storage.vals.vals.heap_size(&mut callback);
433            batch.storage.upds.offs.heap_size(&mut callback);
434            batch.storage.upds.times.heap_size(&mut callback);
435            batch.storage.upds.diffs.heap_size(&mut callback);
436            (size, capacity, allocations)
437        })
438    }
439}
440
441impl<G, R> ArrangementSize for Arranged<G, RowAgent<G::Timestamp, R>>
442where
443    G: Scope,
444    G::Timestamp: MzTimestamp,
445    R: Semigroup + Ord + MzArrangeData + 'static,
446{
447    fn log_arrangement_size(self) -> Self {
448        log_arrangement_size_inner(self, |batch| {
449            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
450            let mut callback = |siz, cap| {
451                size += siz;
452                capacity += cap;
453                allocations += usize::from(cap > 0);
454            };
455            batch.storage.keys.heap_size(&mut callback);
456            batch.storage.upds.offs.heap_size(&mut callback);
457            batch.storage.upds.times.heap_size(&mut callback);
458            batch.storage.upds.diffs.heap_size(&mut callback);
459            (size, capacity, allocations)
460        })
461    }
462}