Skip to main content

mz_compute/logging/
differential.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by differential dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::logging::{
19    BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
20};
21use mz_ore::cast::CastFrom;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::batcher;
24use mz_timely_util::columnar::builder::ColumnBuilder;
25use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
26use mz_timely_util::columnation::ColumnationChunker;
27use mz_timely_util::replay::MzReplay;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::InputCapability;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::dataflow::operators::generic::operator::empty;
32use timely::dataflow::operators::generic::{OutputBuilder, Session};
33use timely::dataflow::{Scope, StreamVec};
34
35use crate::extensions::arrange::MzArrangeCore;
36use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
37use crate::logging::{
38    DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
39    consolidate_and_pack,
40};
41use crate::typedefs::{KeyBatcher, RowRowSpine};
42use mz_row_spine::RowRowBuilder;
43
44/// The return type of [`construct`].
45pub(super) struct Return {
46    /// Collections to export.
47    pub collections: BTreeMap<LogVariant, LogCollection>,
48}
49
50/// Constructs the logging dataflow fragment for differential logs.
51///
52/// Params
53/// * `scope`: The Timely scope hosting the log analysis dataflow.
54/// * `config`: Logging configuration
55/// * `event_queue`: The source to read log events from.
56/// * `shared_state`: Shared state across all logging dataflow fragments.
57pub(super) fn construct(
58    scope: Scope<'_, Timestamp>,
59    config: &mz_compute_client::logging::LoggingConfig,
60    event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
61    shared_state: Rc<RefCell<SharedLoggingState>>,
62) -> Return {
63    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
64
65    scope.scoped("differential logging", move |scope| {
66        let enable_logging = config.enable_logging;
67        let (logs, token) = if enable_logging {
68            event_queue.links.mz_replay(
69                scope,
70                "differential logs",
71                config.interval,
72                event_queue.activator,
73            )
74        } else {
75            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
76            (empty(scope), token)
77        };
78
79        // Build a demux operator that splits the replayed event stream up into the separate
80        // logging streams.
81        let mut demux =
82            OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
83        let mut input = demux.new_input(logs, Pipeline);
84        let (batches_out, batches) = demux.new_output();
85        let (records_out, records) = demux.new_output();
86        let (sharing_out, sharing) = demux.new_output();
87        let (batcher_records_out, batcher_records) = demux.new_output();
88        let (batcher_size_out, batcher_size) = demux.new_output();
89        let (batcher_capacity_out, batcher_capacity) = demux.new_output();
90        let (batcher_allocations_out, batcher_allocations) = demux.new_output();
91
92        let mut batches_out = OutputBuilder::from(batches_out);
93        let mut records_out = OutputBuilder::from(records_out);
94        let mut sharing_out = OutputBuilder::from(sharing_out);
95        let mut batcher_records_out = OutputBuilder::from(batcher_records_out);
96        let mut batcher_size_out = OutputBuilder::from(batcher_size_out);
97        let mut batcher_capacity_out = OutputBuilder::from(batcher_capacity_out);
98        let mut batcher_allocations_out = OutputBuilder::from(batcher_allocations_out);
99
100        let mut demux_state = Default::default();
101        demux.build(move |_capability| {
102            move |_frontiers| {
103                let mut batches = batches_out.activate();
104                let mut records = records_out.activate();
105                let mut sharing = sharing_out.activate();
106                let mut batcher_records = batcher_records_out.activate();
107                let mut batcher_size = batcher_size_out.activate();
108                let mut batcher_capacity = batcher_capacity_out.activate();
109                let mut batcher_allocations = batcher_allocations_out.activate();
110
111                input.for_each_time(|cap, data| {
112                    let mut output_buffers = DemuxOutput {
113                        batches: batches.session_with_builder(&cap),
114                        records: records.session_with_builder(&cap),
115                        sharing: sharing.session_with_builder(&cap),
116                        batcher_records: batcher_records.session_with_builder(&cap),
117                        batcher_size: batcher_size.session_with_builder(&cap),
118                        batcher_capacity: batcher_capacity.session_with_builder(&cap),
119                        batcher_allocations: batcher_allocations.session_with_builder(&cap),
120                    };
121
122                    for (time, event) in data.flat_map(|data: &mut Vec<_>| data.drain(..)) {
123                        DemuxHandler {
124                            state: &mut demux_state,
125                            output: &mut output_buffers,
126                            logging_interval_ms,
127                            time,
128                            shared_state: &mut shared_state.borrow_mut(),
129                        }
130                        .handle(event);
131                    }
132                });
133            }
134        });
135
136        // We're lucky and the differential logs all have the same stream format, so just implement
137        // the call once.
138        fn stream_to_collection<'scope>(
139            input: StreamVec<'scope, Timestamp, ((usize, ()), Timestamp, Diff)>,
140            log: DifferentialLog,
141            worker_id: usize,
142        ) -> timely::dataflow::Stream<
143            'scope,
144            Timestamp,
145            mz_timely_util::columnar::Column<((mz_repr::Row, mz_repr::Row), Timestamp, Diff)>,
146        > {
147            consolidate_and_pack::<
148                ColumnationChunker<_>,
149                KeyBatcher<_, _, _>,
150                ColumnBuilder<_>,
151                _,
152                _,
153                _,
154            >(input, log, move |data, packer, session| {
155                for ((op, ()), time, diff) in data.iter() {
156                    let data = packer.pack_slice(&[
157                        Datum::UInt64(u64::cast_from(*op)),
158                        Datum::UInt64(u64::cast_from(worker_id)),
159                    ]);
160                    session.give((data, *time, *diff))
161                }
162            })
163        }
164        let worker_id = scope.index();
165
166        // Encode the contents of each logging stream into its expected `Row` format.
167        let arrangement_batches = stream_to_collection(batches, ArrangementBatches, worker_id);
168        let arrangement_records = stream_to_collection(records, ArrangementRecords, worker_id);
169        let sharing = stream_to_collection(sharing, Sharing, worker_id);
170        let batcher_records = stream_to_collection(batcher_records, BatcherRecords, worker_id);
171        let batcher_size = stream_to_collection(batcher_size, BatcherSize, worker_id);
172        let batcher_capacity = stream_to_collection(batcher_capacity, BatcherCapacity, worker_id);
173        let batcher_allocations =
174            stream_to_collection(batcher_allocations, BatcherAllocations, worker_id);
175
176        use DifferentialLog::*;
177        let logs = [
178            (ArrangementBatches, arrangement_batches),
179            (ArrangementRecords, arrangement_records),
180            (Sharing, sharing),
181            (BatcherRecords, batcher_records),
182            (BatcherSize, batcher_size),
183            (BatcherCapacity, batcher_capacity),
184            (BatcherAllocations, batcher_allocations),
185        ];
186
187        // Build the output arrangements.
188        let mut collections = BTreeMap::new();
189        for (variant, collection) in logs {
190            let variant = LogVariant::Differential(variant);
191            if config.index_logs.contains_key(&variant) {
192                let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
193                    columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
194                );
195                let trace = collection
196                    .mz_arrange_core::<
197                        _,
198                        batcher::Chunker<_>,
199                        Col2ValBatcher<_, _, _, _>,
200                        RowRowBuilder<_, _>,
201                        RowRowSpine<_, _>,
202                    >(exchange, &format!("Arrange {variant:?}"))
203                    .trace;
204                let collection = LogCollection {
205                    trace,
206                    token: Rc::clone(&token),
207                };
208                collections.insert(variant, collection);
209            }
210        }
211
212        Return { collections }
213    })
214}
215
216type OutputSession<'a, 'b, D> = Session<
217    'a,
218    'b,
219    Timestamp,
220    ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>,
221    InputCapability<Timestamp>,
222>;
223
224/// Bundled output buffers used by the demux operator.
225struct DemuxOutput<'a, 'b> {
226    batches: OutputSession<'a, 'b, (usize, ())>,
227    records: OutputSession<'a, 'b, (usize, ())>,
228    sharing: OutputSession<'a, 'b, (usize, ())>,
229    batcher_records: OutputSession<'a, 'b, (usize, ())>,
230    batcher_size: OutputSession<'a, 'b, (usize, ())>,
231    batcher_capacity: OutputSession<'a, 'b, (usize, ())>,
232    batcher_allocations: OutputSession<'a, 'b, (usize, ())>,
233}
234
235/// State maintained by the demux operator.
236#[derive(Default)]
237struct DemuxState {
238    /// Arrangement trace sharing
239    sharing: BTreeMap<usize, usize>,
240}
241
242/// Event handler of the demux operator.
243struct DemuxHandler<'a, 'b, 'c> {
244    /// State kept by the demux operator
245    state: &'a mut DemuxState,
246    /// Demux output buffers.
247    output: &'a mut DemuxOutput<'b, 'c>,
248    /// The logging interval specifying the time granularity for the updates.
249    logging_interval_ms: u128,
250    /// The current event time.
251    time: Duration,
252    /// State shared across log receivers.
253    shared_state: &'a mut SharedLoggingState,
254}
255
256impl DemuxHandler<'_, '_, '_> {
257    /// Return the timestamp associated with the current event, based on the event time and the
258    /// logging interval.
259    fn ts(&self) -> Timestamp {
260        let time_ms = self.time.as_millis();
261        let interval = self.logging_interval_ms;
262        let rounded = (time_ms / interval + 1) * interval;
263        rounded.try_into().expect("must fit")
264    }
265
266    /// Handle the given differential event.
267    fn handle(&mut self, event: DifferentialEvent) {
268        use DifferentialEvent::*;
269
270        match event {
271            Batch(e) => self.handle_batch(e),
272            Merge(e) => self.handle_merge(e),
273            Drop(e) => self.handle_drop(e),
274            TraceShare(e) => self.handle_trace_share(e),
275            Batcher(e) => self.handle_batcher_event(e),
276            MergeShortfall(_) => (),
277        }
278    }
279
280    fn handle_batch(&mut self, event: BatchEvent) {
281        let ts = self.ts();
282        let operator_id = event.operator;
283        self.output.batches.give(((operator_id, ()), ts, Diff::ONE));
284
285        let diff = Diff::try_from(event.length).expect("must fit");
286        self.output.records.give(((operator_id, ()), ts, diff));
287        self.notify_arrangement_size(operator_id);
288    }
289
290    fn handle_merge(&mut self, event: MergeEvent) {
291        let Some(done) = event.complete else { return };
292
293        let ts = self.ts();
294        let operator_id = event.operator;
295        self.output
296            .batches
297            .give(((operator_id, ()), ts, Diff::MINUS_ONE));
298
299        let diff = Diff::try_from(done).expect("must fit")
300            - Diff::try_from(event.length1 + event.length2).expect("must fit");
301        if diff != Diff::ZERO {
302            self.output.records.give(((operator_id, ()), ts, diff));
303        }
304        self.notify_arrangement_size(operator_id);
305    }
306
307    fn handle_drop(&mut self, event: DropEvent) {
308        let ts = self.ts();
309        let operator_id = event.operator;
310        self.output
311            .batches
312            .give(((operator_id, ()), ts, Diff::MINUS_ONE));
313
314        let diff = -Diff::try_from(event.length).expect("must fit");
315        if diff != Diff::ZERO {
316            self.output.records.give(((operator_id, ()), ts, diff));
317        }
318        self.notify_arrangement_size(operator_id);
319    }
320
321    fn handle_trace_share(&mut self, event: TraceShare) {
322        let ts = self.ts();
323        let operator_id = event.operator;
324        let diff = Diff::cast_from(event.diff);
325        debug_assert_ne!(diff, Diff::ZERO);
326        self.output.sharing.give(((operator_id, ()), ts, diff));
327
328        let sharing = self.state.sharing.entry(operator_id).or_default();
329        *sharing = (Diff::try_from(*sharing).expect("must fit") + diff)
330            .into_inner()
331            .try_into()
332            .expect("under/overflow");
333        if *sharing == 0 {
334            self.state.sharing.remove(&operator_id);
335            self.shared_state.compute_logger.as_ref().map(|logger| {
336                logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
337                    ArrangementHeapSizeOperatorDrop { operator_id },
338                ))
339            });
340        }
341    }
342
343    fn handle_batcher_event(&mut self, event: BatcherEvent) {
344        let ts = self.ts();
345        let operator_id = event.operator;
346        let records_diff = Diff::cast_from(event.records_diff);
347        let size_diff = Diff::cast_from(event.size_diff);
348        let capacity_diff = Diff::cast_from(event.capacity_diff);
349        let allocations_diff = Diff::cast_from(event.allocations_diff);
350        self.output
351            .batcher_records
352            .give(((operator_id, ()), ts, records_diff));
353        self.output
354            .batcher_size
355            .give(((operator_id, ()), ts, size_diff));
356        self.output
357            .batcher_capacity
358            .give(((operator_id, ()), ts, capacity_diff));
359        self.output
360            .batcher_allocations
361            .give(((operator_id, ()), ts, allocations_diff));
362    }
363
364    fn notify_arrangement_size(&self, operator: usize) {
365        // While every arrangement should have a corresponding arrangement size operator,
366        // we have no guarantee that it already/still exists. Otherwise we could print a warning
367        // here, but it's difficult to implement without maintaining state for a longer period than
368        // while the arrangement actually exists.
369        if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
370            activator.activate();
371        }
372    }
373}