mz_compute/extensions/
arrange.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::rc::Rc;
11
12use differential_dataflow::containers::Columnation;
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
18use differential_dataflow::{Collection, Data, ExchangeData, Hashable};
19use timely::Container;
20use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
21use timely::dataflow::operators::Operator;
22use timely::dataflow::{Scope, ScopeParent, StreamCore};
23use timely::progress::Timestamp;
24
25use crate::logging::compute::{
26    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
27    ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
28};
29use crate::typedefs::{KeyAgent, KeyValAgent, RowAgent, RowRowAgent, RowValAgent};
30
31/// Extension trait to arrange data.
32pub trait MzArrange: MzArrangeCore
33where
34    <Self::Scope as ScopeParent>::Timestamp: Lattice,
35{
36    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`.
37    ///
38    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
39    /// This trace is current for all times marked completed in the output stream, and probing this stream
40    /// is the correct way to determine that times in the shared trace are committed.
41    fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<Self::Scope, TraceAgent<Tr>>
42    where
43        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
44        Bu: Builder<
45                Time = <Self::Scope as ScopeParent>::Timestamp,
46                Input = Ba::Output,
47                Output = Tr::Batch,
48            >,
49        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
50        Tr::Batch: Batch,
51        Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
52}
53
54/// Extension trait to arrange data.
55pub trait MzArrangeCore
56where
57    <Self::Scope as ScopeParent>::Timestamp: Lattice,
58{
59    /// The current scope.
60    type Scope: Scope;
61    /// The data input container type.
62    type Input: Container + Clone + 'static;
63
64    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`. Partitions
65    /// the data according to `pact`.
66    ///
67    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
68    /// This trace is current for all times marked completed in the output stream, and probing this stream
69    /// is the correct way to determine that times in the shared trace are committed.
70    fn mz_arrange_core<P, Ba, Bu, Tr>(
71        &self,
72        pact: P,
73        name: &str,
74    ) -> Arranged<Self::Scope, TraceAgent<Tr>>
75    where
76        P: ParallelizationContract<<Self::Scope as ScopeParent>::Timestamp, Self::Input>,
77        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
78        // Ba::Input: Container + Clone + 'static,
79        Bu: Builder<
80                Time = <Self::Scope as ScopeParent>::Timestamp,
81                Input = Ba::Output,
82                Output = Tr::Batch,
83            >,
84        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
85        Tr::Batch: Batch,
86        Arranged<Self::Scope, TraceAgent<Tr>>: ArrangementSize;
87}
88
89impl<G, C> MzArrangeCore for StreamCore<G, C>
90where
91    G: Scope,
92    G::Timestamp: Lattice,
93    C: Container + Clone + 'static,
94{
95    type Scope = G;
96    type Input = C;
97
98    fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
99    where
100        P: ParallelizationContract<G::Timestamp, Self::Input>,
101        Ba: Batcher<Input = Self::Input, Time = G::Timestamp> + 'static,
102        Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
103        Tr: Trace + TraceReader<Time = G::Timestamp> + 'static,
104        Tr::Batch: Batch,
105        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
106    {
107        // Allow access to `arrange_named` because we're within Mz's wrapper.
108        #[allow(clippy::disallowed_methods)]
109        arrange_core::<_, _, Ba, Bu, _>(self, pact, name).log_arrangement_size()
110    }
111}
112
113impl<G, K, V, R> MzArrange for Collection<G, (K, V), R>
114where
115    G: Scope,
116    G::Timestamp: Lattice,
117    K: ExchangeData + Hashable,
118    V: ExchangeData,
119    R: ExchangeData,
120{
121    fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
122    where
123        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
124        Bu: Builder<
125                Time = <Self::Scope as ScopeParent>::Timestamp,
126                Input = Ba::Output,
127                Output = Tr::Batch,
128            >,
129        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
130        Tr::Batch: Batch,
131        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
132    {
133        let exchange =
134            Exchange::new(move |update: &((K, V), G::Timestamp, R)| (update.0).0.hashed().into());
135        self.mz_arrange_core::<_, Ba, Bu, _>(exchange, name)
136    }
137}
138
139impl<G, K, V, R, C> MzArrangeCore for Collection<G, (K, V), R, C>
140where
141    G: Scope,
142    G::Timestamp: Lattice,
143    C: Container + Clone + 'static,
144{
145    type Scope = G;
146    type Input = C;
147
148    fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
149    where
150        P: ParallelizationContract<G::Timestamp, Self::Input>,
151        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
152        Bu: Builder<
153                Time = <Self::Scope as ScopeParent>::Timestamp,
154                Input = Ba::Output,
155                Output = Tr::Batch,
156            >,
157        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
158        Tr::Batch: Batch,
159        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
160    {
161        self.inner.mz_arrange_core::<_, Ba, Bu, _>(pact, name)
162    }
163}
164
165/// A specialized collection where data only has a key, but no associated value.
166///
167/// Created by calling `collection.into()`.
168pub struct KeyCollection<G: Scope, K, R = usize>(Collection<G, K, R>);
169
170impl<G: Scope, K, R: Semigroup> From<Collection<G, K, R>> for KeyCollection<G, K, R> {
171    fn from(value: Collection<G, K, R>) -> Self {
172        KeyCollection(value)
173    }
174}
175
176impl<G, K, R> MzArrange for KeyCollection<G, K, R>
177where
178    G: Scope,
179    K: ExchangeData + Hashable,
180    G::Timestamp: Lattice,
181    R: ExchangeData,
182{
183    fn mz_arrange<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
184    where
185        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
186        Bu: Builder<
187                Time = <Self::Scope as ScopeParent>::Timestamp,
188                Input = Ba::Output,
189                Output = Tr::Batch,
190            >,
191        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
192        Tr::Batch: Batch,
193        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
194    {
195        self.0.map(|d| (d, ())).mz_arrange::<Ba, Bu, _>(name)
196    }
197}
198
199impl<G, K, R> MzArrangeCore for KeyCollection<G, K, R>
200where
201    G: Scope,
202    K: Data,
203    G::Timestamp: Lattice,
204    R: Data,
205{
206    type Scope = G;
207    type Input = Vec<((K, ()), G::Timestamp, R)>;
208
209    fn mz_arrange_core<P, Ba, Bu, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
210    where
211        P: ParallelizationContract<G::Timestamp, Self::Input>,
212        Ba: Batcher<Input = Self::Input, Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
213        Bu: Builder<
214                Time = <Self::Scope as ScopeParent>::Timestamp,
215                Input = Ba::Output,
216                Output = Tr::Batch,
217            >,
218        Tr: Trace + TraceReader<Time = <Self::Scope as ScopeParent>::Timestamp> + 'static,
219        Tr::Batch: Batch,
220        Arranged<G, TraceAgent<Tr>>: ArrangementSize,
221    {
222        self.0
223            .map(|d| (d, ()))
224            .mz_arrange_core::<_, Ba, Bu, _>(pact, name)
225    }
226}
227
228/// A type that can log its heap size.
229pub trait ArrangementSize {
230    /// Install a logger to track the heap size of the target.
231    fn log_arrangement_size(self) -> Self;
232}
233
234/// Helper for [`ArrangementSize`] to install a common operator holding on to a trace.
235///
236/// * `arranged`: The arrangement to inspect.
237/// * `logic`: Closure that calculates the heap size/capacity/allocations for a trace. The return
238///    value are size and capacity in bytes, and number of allocations, all in absolute values.
239fn log_arrangement_size_inner<G, Tr, L>(
240    arranged: Arranged<G, TraceAgent<Tr>>,
241    mut logic: L,
242) -> Arranged<G, TraceAgent<Tr>>
243where
244    G: Scope,
245    G::Timestamp: Timestamp + Lattice + Ord,
246    Tr: TraceReader + 'static,
247    Tr::Time: Timestamp + Lattice + Ord + Clone + 'static,
248    L: FnMut(&Tr) -> (usize, usize, usize) + 'static,
249{
250    let scope = arranged.stream.scope();
251    let Some(logger) = scope
252        .log_register()
253        .get::<ComputeEventBuilder>("materialize/compute")
254    else {
255        return arranged;
256    };
257    let operator_id = arranged.trace.operator().global_id;
258    let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
259
260    let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
261
262    let stream = arranged
263        .stream
264        .unary(Pipeline, "ArrangementSize", |_cap, info| {
265            let address = info.address;
266            logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
267                ArrangementHeapSizeOperator {
268                    operator_id,
269                    address: address.to_vec(),
270                },
271            ));
272            move |input, output| {
273                while let Some((time, data)) = input.next() {
274                    output.session(&time).give_container(data);
275                }
276                let Some(trace) = trace.upgrade() else {
277                    return;
278                };
279
280                let (size, capacity, allocations) = logic(&trace.borrow().trace);
281
282                let size = size.try_into().expect("must fit");
283                if size != old_size {
284                    logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
285                        operator_id,
286                        delta_size: size - old_size,
287                    }));
288                }
289
290                let capacity = capacity.try_into().expect("must fit");
291                if capacity != old_capacity {
292                    logger.log(&ComputeEvent::ArrangementHeapCapacity(
293                        ArrangementHeapCapacity {
294                            operator_id,
295                            delta_capacity: capacity - old_capacity,
296                        },
297                    ));
298                }
299
300                let allocations = allocations.try_into().expect("must fit");
301                if allocations != old_allocations {
302                    logger.log(&ComputeEvent::ArrangementHeapAllocations(
303                        ArrangementHeapAllocations {
304                            operator_id,
305                            delta_allocations: allocations - old_allocations,
306                        },
307                    ));
308                }
309
310                old_size = size;
311                old_capacity = capacity;
312                old_allocations = allocations;
313            }
314        });
315    Arranged {
316        trace: arranged.trace,
317        stream,
318    }
319}
320
321impl<G, K, V, T, R> ArrangementSize for Arranged<G, KeyValAgent<K, V, T, R>>
322where
323    G: Scope<Timestamp = T>,
324    G::Timestamp: Lattice + Ord + Columnation,
325    K: Data + Columnation,
326    V: Data + Columnation,
327    T: Lattice + Timestamp,
328    R: Semigroup + Ord + Columnation + 'static,
329{
330    fn log_arrangement_size(self) -> Self {
331        log_arrangement_size_inner(self, |trace| {
332            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
333            let mut callback = |siz, cap| {
334                size += siz;
335                capacity += cap;
336                allocations += usize::from(cap > 0);
337            };
338            trace.map_batches(|batch| {
339                batch.storage.keys.heap_size(&mut callback);
340                batch.storage.keys_offs.heap_size(&mut callback);
341                batch.storage.vals.heap_size(&mut callback);
342                batch.storage.vals_offs.heap_size(&mut callback);
343                batch.storage.times.heap_size(&mut callback);
344                batch.storage.diffs.heap_size(&mut callback);
345            });
346            (size, capacity, allocations)
347        })
348    }
349}
350
351impl<G, K, T, R> ArrangementSize for Arranged<G, KeyAgent<K, T, R>>
352where
353    G: Scope<Timestamp = T>,
354    G::Timestamp: Lattice + Ord,
355    K: Data + Columnation,
356    T: Lattice + Timestamp + Columnation,
357    R: Semigroup + Ord + Columnation + 'static,
358{
359    fn log_arrangement_size(self) -> Self {
360        log_arrangement_size_inner(self, |trace| {
361            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
362            let mut callback = |siz, cap| {
363                size += siz;
364                capacity += cap;
365                allocations += usize::from(cap > 0);
366            };
367            trace.map_batches(|batch| {
368                batch.storage.keys.heap_size(&mut callback);
369                batch.storage.keys_offs.heap_size(&mut callback);
370                batch.storage.times.heap_size(&mut callback);
371                batch.storage.diffs.heap_size(&mut callback);
372            });
373            (size, capacity, allocations)
374        })
375    }
376}
377
378impl<G, V, T, R> ArrangementSize for Arranged<G, RowValAgent<V, T, R>>
379where
380    G: Scope<Timestamp = T>,
381    G::Timestamp: Lattice + Ord + Columnation,
382    V: Data + Columnation,
383    T: Lattice + Timestamp,
384    R: Semigroup + Ord + Columnation + 'static,
385{
386    fn log_arrangement_size(self) -> Self {
387        log_arrangement_size_inner(self, |trace| {
388            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
389            let mut callback = |siz, cap| {
390                size += siz;
391                capacity += cap;
392                allocations += usize::from(cap > 0);
393            };
394            trace.map_batches(|batch| {
395                batch.storage.keys.heap_size(&mut callback);
396                batch.storage.keys_offs.heap_size(&mut callback);
397                batch.storage.vals.heap_size(&mut callback);
398                batch.storage.vals_offs.heap_size(&mut callback);
399                batch.storage.times.heap_size(&mut callback);
400                batch.storage.diffs.heap_size(&mut callback);
401            });
402            (size, capacity, allocations)
403        })
404    }
405}
406
407impl<G, T, R> ArrangementSize for Arranged<G, RowRowAgent<T, R>>
408where
409    G: Scope<Timestamp = T>,
410    G::Timestamp: Lattice + Ord + Columnation,
411    T: Lattice + Timestamp,
412    R: Semigroup + Ord + Columnation + 'static,
413{
414    fn log_arrangement_size(self) -> Self {
415        log_arrangement_size_inner(self, |trace| {
416            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
417            let mut callback = |siz, cap| {
418                size += siz;
419                capacity += cap;
420                allocations += usize::from(cap > 0);
421            };
422            trace.map_batches(|batch| {
423                batch.storage.keys.heap_size(&mut callback);
424                batch.storage.keys_offs.heap_size(&mut callback);
425                batch.storage.vals.heap_size(&mut callback);
426                batch.storage.vals_offs.heap_size(&mut callback);
427                batch.storage.times.heap_size(&mut callback);
428                batch.storage.diffs.heap_size(&mut callback);
429            });
430            (size, capacity, allocations)
431        })
432    }
433}
434
435impl<G, T, R> ArrangementSize for Arranged<G, RowAgent<T, R>>
436where
437    G: Scope<Timestamp = T>,
438    G::Timestamp: Lattice + Ord + Columnation,
439    T: Lattice + Timestamp,
440    R: Semigroup + Ord + Columnation + 'static,
441{
442    fn log_arrangement_size(self) -> Self {
443        log_arrangement_size_inner(self, |trace| {
444            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
445            let mut callback = |siz, cap| {
446                size += siz;
447                capacity += cap;
448                allocations += usize::from(cap > 0);
449            };
450            trace.map_batches(|batch| {
451                batch.storage.keys.heap_size(&mut callback);
452                batch.storage.keys_offs.heap_size(&mut callback);
453                batch.storage.times.heap_size(&mut callback);
454                batch.storage.diffs.heap_size(&mut callback);
455            });
456            (size, capacity, allocations)
457        })
458    }
459}