Skip to main content

mz_compute/extensions/
arrange.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::rc::Rc;
12
13use differential_dataflow::difference::Semigroup;
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::arrangement::arrange_core;
16use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
17use differential_dataflow::trace::implementations::spine_fueled::Spine;
18use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
19use differential_dataflow::{Collection, Data, ExchangeData, Hashable, VecCollection};
20use timely::Container;
21use timely::dataflow::Stream;
22use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
23use timely::dataflow::operators::Operator;
24use timely::progress::Timestamp;
25
26use crate::logging::compute::{
27    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
28    ArrangementHeapSizeOperator, ComputeEvent, ComputeEventBuilder,
29};
30use crate::typedefs::{
31    KeyAgent, KeyValAgent, MzArrangeData, MzData, MzTimestamp, RowAgent, RowRowAgent, RowValAgent,
32};
33
34/// Extension trait to arrange data.
35pub trait MzArrange<'scope>: MzArrangeCore<'scope> {
36    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`.
37    ///
38    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
39    /// This trace is current for all times marked completed in the output stream, and probing this stream
40    /// is the correct way to determine that times in the shared trace are committed.
41    fn mz_arrange<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
42    where
43        Ba: Batcher<Input = Self::Input, Time = Self::Timestamp> + 'static,
44        Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
45        Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
46        Tr::Batch: Batch,
47        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
48}
49
50/// Extension trait to arrange data.
51pub trait MzArrangeCore<'scope> {
52    /// The current scope.
53    type Timestamp: Timestamp + Lattice;
54    /// The data input container type.
55    type Input: Container + Clone + 'static;
56
57    /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`. Partitions
58    /// the data according to `pact`.
59    ///
60    /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
61    /// This trace is current for all times marked completed in the output stream, and probing this stream
62    /// is the correct way to determine that times in the shared trace are committed.
63    fn mz_arrange_core<P, Ba, Bu, Tr>(
64        self,
65        pact: P,
66        name: &str,
67    ) -> Arranged<'scope, TraceAgent<Tr>>
68    where
69        P: ParallelizationContract<Self::Timestamp, Self::Input>,
70        Ba: Batcher<Input = Self::Input, Time = Self::Timestamp> + 'static,
71        // Ba::Input: Container + Clone + 'static,
72        Bu: Builder<Time = Self::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
73        Tr: Trace + TraceReader<Time = Self::Timestamp> + 'static,
74        Tr::Batch: Batch,
75        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize;
76}
77
78impl<'scope, T, C> MzArrangeCore<'scope> for Stream<'scope, T, C>
79where
80    T: Timestamp + Lattice,
81    C: Container + Clone + 'static,
82{
83    type Timestamp = T;
84    type Input = C;
85
86    fn mz_arrange_core<P, Ba, Bu, Tr>(self, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
87    where
88        P: ParallelizationContract<T, Self::Input>,
89        Ba: Batcher<Input = Self::Input, Time = T> + 'static,
90        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
91        Tr: Trace + TraceReader<Time = T> + 'static,
92        Tr::Batch: Batch,
93        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
94    {
95        // Allow access to `arrange_named` because we're within Mz's wrapper.
96        #[allow(clippy::disallowed_methods)]
97        arrange_core::<_, Ba, Bu, _>(self, pact, name).log_arrangement_size()
98    }
99}
100
101impl<'scope, T, K, V, R> MzArrange<'scope> for VecCollection<'scope, T, (K, V), R>
102where
103    T: Timestamp + Lattice,
104    K: ExchangeData + Hashable,
105    V: ExchangeData,
106    R: ExchangeData,
107{
108    fn mz_arrange<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
109    where
110        Ba: Batcher<Input = Self::Input, Time = T> + 'static,
111        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
112        Tr: Trace + TraceReader<Time = T> + 'static,
113        Tr::Batch: Batch,
114        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
115    {
116        let exchange = Exchange::new(move |update: &((K, V), T, R)| (update.0).0.hashed().into());
117        self.mz_arrange_core::<_, Ba, Bu, _>(exchange, name)
118    }
119}
120
121impl<'scope, T, C> MzArrangeCore<'scope> for Collection<'scope, T, C>
122where
123    T: Timestamp + Lattice,
124    C: Container + Clone + 'static,
125{
126    type Timestamp = T;
127    type Input = C;
128
129    fn mz_arrange_core<P, Ba, Bu, Tr>(self, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
130    where
131        P: ParallelizationContract<T, Self::Input>,
132        Ba: Batcher<Input = Self::Input, Time = T> + 'static,
133        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
134        Tr: Trace + TraceReader<Time = T> + 'static,
135        Tr::Batch: Batch,
136        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
137    {
138        self.inner.mz_arrange_core::<_, Ba, Bu, _>(pact, name)
139    }
140}
141
142/// A specialized collection where data only has a key, but no associated value.
143///
144/// Created by calling `collection.into()`.
145pub struct KeyCollection<'scope, T: Timestamp, K: 'static, R: 'static = usize>(
146    VecCollection<'scope, T, K, R>,
147);
148
149impl<'scope, T: Timestamp, K, R: Semigroup> From<VecCollection<'scope, T, K, R>>
150    for KeyCollection<'scope, T, K, R>
151{
152    fn from(value: VecCollection<'scope, T, K, R>) -> Self {
153        KeyCollection(value)
154    }
155}
156
157impl<'scope, T, K, R> MzArrange<'scope> for KeyCollection<'scope, T, K, R>
158where
159    T: Timestamp + Lattice,
160    K: ExchangeData + Hashable,
161    R: ExchangeData,
162{
163    fn mz_arrange<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
164    where
165        Ba: Batcher<Input = Self::Input, Time = T> + 'static,
166        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
167        Tr: Trace + TraceReader<Time = T> + 'static,
168        Tr::Batch: Batch,
169        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
170    {
171        self.0.map(|d| (d, ())).mz_arrange::<Ba, Bu, _>(name)
172    }
173}
174
175impl<'scope, T, K, R> MzArrangeCore<'scope> for KeyCollection<'scope, T, K, R>
176where
177    T: Timestamp + Lattice,
178    K: Clone + 'static,
179    R: Clone + 'static,
180{
181    type Timestamp = T;
182    type Input = Vec<((K, ()), T, R)>;
183
184    fn mz_arrange_core<P, Ba, Bu, Tr>(self, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
185    where
186        P: ParallelizationContract<T, Self::Input>,
187        Ba: Batcher<Input = Self::Input, Time = T> + 'static,
188        Bu: Builder<Time = T, Input = Ba::Output, Output = Tr::Batch>,
189        Tr: Trace + TraceReader<Time = T> + 'static,
190        Tr::Batch: Batch,
191        Arranged<'scope, TraceAgent<Tr>>: ArrangementSize,
192    {
193        self.0
194            .map(|d| (d, ()))
195            .mz_arrange_core::<_, Ba, Bu, _>(pact, name)
196    }
197}
198
199/// A type that can log its heap size.
200pub trait ArrangementSize {
201    /// Install a logger to track the heap size of the target.
202    fn log_arrangement_size(self) -> Self;
203}
204
205/// Helper for [`ArrangementSize`] to install a common operator holding on to a trace.
206///
207/// * `arranged`: The arrangement to inspect.
208/// * `logic`: Closure that calculates the heap size/capacity/allocations for a batch. The return
209///    value are size and capacity in bytes, and number of allocations, all in absolute values.
210fn log_arrangement_size_inner<'scope, B, L>(
211    arranged: Arranged<'scope, TraceAgent<Spine<Rc<B>>>>,
212    mut logic: L,
213) -> Arranged<'scope, TraceAgent<Spine<Rc<B>>>>
214where
215    B: Batch + 'static,
216    L: FnMut(&B) -> (usize, usize, usize) + 'static,
217{
218    let scope = arranged.stream.scope();
219    let Some(logger) = scope
220        .worker()
221        .logger_for::<ComputeEventBuilder>("materialize/compute")
222    else {
223        return arranged;
224    };
225    let operator_id = arranged.trace.operator().global_id;
226    let trace = Rc::downgrade(&arranged.trace.trace_box_unstable());
227
228    let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize);
229
230    let stream = arranged
231        .stream
232        .unary(Pipeline, "ArrangementSize", |_cap, info| {
233            let address = info.address;
234            logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
235                ArrangementHeapSizeOperator {
236                    operator_id,
237                    address: address.to_vec(),
238                },
239            ));
240
241            // Weak references to batches, so we can observe batches outside the trace.
242            let mut batches = BTreeMap::new();
243
244            move |input, output| {
245                input.for_each(|time, data| {
246                    batches.extend(
247                        data.iter()
248                            .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))),
249                    );
250                    output.session(&time).give_container(data);
251                });
252                let Some(trace) = trace.upgrade() else {
253                    return;
254                };
255
256                trace.borrow().trace.map_batches(|batch| {
257                    batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch));
258                });
259
260                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
261                batches.retain(|_, weak| {
262                    if let Some(batch) = weak.upgrade() {
263                        let (sz, c, a) = logic(&batch);
264                        (size += sz, capacity += c, allocations += a);
265                        true
266                    } else {
267                        false
268                    }
269                });
270
271                let size = size.try_into().expect("must fit");
272                if size != old_size {
273                    logger.log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
274                        operator_id,
275                        delta_size: size - old_size,
276                    }));
277                }
278
279                let capacity = capacity.try_into().expect("must fit");
280                if capacity != old_capacity {
281                    logger.log(&ComputeEvent::ArrangementHeapCapacity(
282                        ArrangementHeapCapacity {
283                            operator_id,
284                            delta_capacity: capacity - old_capacity,
285                        },
286                    ));
287                }
288
289                let allocations = allocations.try_into().expect("must fit");
290                if allocations != old_allocations {
291                    logger.log(&ComputeEvent::ArrangementHeapAllocations(
292                        ArrangementHeapAllocations {
293                            operator_id,
294                            delta_allocations: allocations - old_allocations,
295                        },
296                    ));
297                }
298
299                old_size = size;
300                old_capacity = capacity;
301                old_allocations = allocations;
302            }
303        });
304    Arranged {
305        trace: arranged.trace,
306        stream,
307    }
308}
309
310impl<'scope, T, K, V, R> ArrangementSize for Arranged<'scope, KeyValAgent<K, V, T, R>>
311where
312    T: MzTimestamp,
313    K: Data + MzData,
314    V: Data + MzData,
315    R: Semigroup + Ord + MzData + 'static,
316{
317    fn log_arrangement_size(self) -> Self {
318        log_arrangement_size_inner(self, |batch| {
319            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
320            let mut callback = |siz, cap| {
321                size += siz;
322                capacity += cap;
323                allocations += usize::from(cap > 0);
324            };
325            batch.storage.keys.heap_size(&mut callback);
326            batch.storage.vals.offs.heap_size(&mut callback);
327            batch.storage.vals.vals.heap_size(&mut callback);
328            batch.storage.upds.offs.heap_size(&mut callback);
329            batch.storage.upds.times.heap_size(&mut callback);
330            batch.storage.upds.diffs.heap_size(&mut callback);
331            (size, capacity, allocations)
332        })
333    }
334}
335
336impl<'scope, T, K, R> ArrangementSize for Arranged<'scope, KeyAgent<K, T, R>>
337where
338    T: MzTimestamp,
339    K: Data + MzArrangeData,
340    R: Semigroup + Ord + MzData + 'static,
341{
342    fn log_arrangement_size(self) -> Self {
343        log_arrangement_size_inner(self, |batch| {
344            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
345            let mut callback = |siz, cap| {
346                size += siz;
347                capacity += cap;
348                allocations += usize::from(cap > 0);
349            };
350            batch.storage.keys.heap_size(&mut callback);
351            batch.storage.upds.offs.heap_size(&mut callback);
352            batch.storage.upds.times.heap_size(&mut callback);
353            batch.storage.upds.diffs.heap_size(&mut callback);
354            (size, capacity, allocations)
355        })
356    }
357}
358
359impl<'scope, T, V, R> ArrangementSize for Arranged<'scope, RowValAgent<V, T, R>>
360where
361    T: MzTimestamp,
362    V: Data + MzArrangeData,
363    R: Semigroup + Ord + MzArrangeData + 'static,
364{
365    fn log_arrangement_size(self) -> Self {
366        log_arrangement_size_inner(self, |batch| {
367            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
368            let mut callback = |siz, cap| {
369                size += siz;
370                capacity += cap;
371                allocations += usize::from(cap > 0);
372            };
373            batch.storage.keys.heap_size(&mut callback);
374            batch.storage.vals.offs.heap_size(&mut callback);
375            batch.storage.vals.vals.heap_size(&mut callback);
376            batch.storage.upds.offs.heap_size(&mut callback);
377            batch.storage.upds.times.heap_size(&mut callback);
378            batch.storage.upds.diffs.heap_size(&mut callback);
379            (size, capacity, allocations)
380        })
381    }
382}
383
384impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowRowAgent<T, R>>
385where
386    T: MzTimestamp,
387    R: Semigroup + Ord + MzArrangeData + 'static,
388{
389    fn log_arrangement_size(self) -> Self {
390        log_arrangement_size_inner(self, |batch| {
391            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
392            let mut callback = |siz, cap| {
393                size += siz;
394                capacity += cap;
395                allocations += usize::from(cap > 0);
396            };
397            batch.storage.keys.heap_size(&mut callback);
398            batch.storage.vals.offs.heap_size(&mut callback);
399            batch.storage.vals.vals.heap_size(&mut callback);
400            batch.storage.upds.offs.heap_size(&mut callback);
401            batch.storage.upds.times.heap_size(&mut callback);
402            batch.storage.upds.diffs.heap_size(&mut callback);
403            (size, capacity, allocations)
404        })
405    }
406}
407
408impl<'scope, T, R> ArrangementSize for Arranged<'scope, RowAgent<T, R>>
409where
410    T: MzTimestamp,
411    R: Semigroup + Ord + MzArrangeData + 'static,
412{
413    fn log_arrangement_size(self) -> Self {
414        log_arrangement_size_inner(self, |batch| {
415            let (mut size, mut capacity, mut allocations) = (0, 0, 0);
416            let mut callback = |siz, cap| {
417                size += siz;
418                capacity += cap;
419                allocations += usize::from(cap > 0);
420            };
421            batch.storage.keys.heap_size(&mut callback);
422            batch.storage.upds.offs.heap_size(&mut callback);
423            batch.storage.upds.times.heap_size(&mut callback);
424            batch.storage.upds.diffs.heap_size(&mut callback);
425            (size, capacity, allocations)
426        })
427    }
428}