Skip to main content

mz_compute/logging/
differential.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by differential dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::logging::{
19    BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
20};
21use mz_ore::cast::CastFrom;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25use mz_timely_util::replay::MzReplay;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::InputCapability;
28use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
29use timely::dataflow::operators::generic::operator::empty;
30use timely::dataflow::operators::generic::{OutputBuilder, Session};
31use timely::dataflow::{Scope, Stream};
32
33use crate::extensions::arrange::MzArrangeCore;
34use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
35use crate::logging::{
36    DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
37    consolidate_and_pack,
38};
39use crate::row_spine::RowRowBuilder;
40use crate::typedefs::{KeyBatcher, RowRowSpine};
41
42/// The return type of [`construct`].
43pub(super) struct Return {
44    /// Collections to export.
45    pub collections: BTreeMap<LogVariant, LogCollection>,
46}
47
48/// Constructs the logging dataflow fragment for differential logs.
49///
50/// Params
51/// * `scope`: The Timely scope hosting the log analysis dataflow.
52/// * `config`: Logging configuration
53/// * `event_queue`: The source to read log events from.
54/// * `shared_state`: Shared state across all logging dataflow fragments.
55pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
56    mut scope: G,
57    config: &mz_compute_client::logging::LoggingConfig,
58    event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
59    shared_state: Rc<RefCell<SharedLoggingState>>,
60) -> Return {
61    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
62
63    scope.scoped("differential logging", move |scope| {
64        let enable_logging = config.enable_logging;
65        let (logs, token) = if enable_logging {
66            event_queue.links.mz_replay(
67                scope,
68                "differential logs",
69                config.interval,
70                event_queue.activator,
71            )
72        } else {
73            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
74            (empty(scope), token)
75        };
76
77        // Build a demux operator that splits the replayed event stream up into the separate
78        // logging streams.
79        let mut demux =
80            OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
81        let mut input = demux.new_input(&logs, Pipeline);
82        let (batches_out, batches) = demux.new_output();
83        let (records_out, records) = demux.new_output();
84        let (sharing_out, sharing) = demux.new_output();
85        let (batcher_records_out, batcher_records) = demux.new_output();
86        let (batcher_size_out, batcher_size) = demux.new_output();
87        let (batcher_capacity_out, batcher_capacity) = demux.new_output();
88        let (batcher_allocations_out, batcher_allocations) = demux.new_output();
89
90        let mut batches_out = OutputBuilder::from(batches_out);
91        let mut records_out = OutputBuilder::from(records_out);
92        let mut sharing_out = OutputBuilder::from(sharing_out);
93        let mut batcher_records_out = OutputBuilder::from(batcher_records_out);
94        let mut batcher_size_out = OutputBuilder::from(batcher_size_out);
95        let mut batcher_capacity_out = OutputBuilder::from(batcher_capacity_out);
96        let mut batcher_allocations_out = OutputBuilder::from(batcher_allocations_out);
97
98        let mut demux_state = Default::default();
99        demux.build(move |_capability| {
100            move |_frontiers| {
101                let mut batches = batches_out.activate();
102                let mut records = records_out.activate();
103                let mut sharing = sharing_out.activate();
104                let mut batcher_records = batcher_records_out.activate();
105                let mut batcher_size = batcher_size_out.activate();
106                let mut batcher_capacity = batcher_capacity_out.activate();
107                let mut batcher_allocations = batcher_allocations_out.activate();
108
109                input.for_each_time(|cap, data| {
110                    let mut output_buffers = DemuxOutput {
111                        batches: batches.session_with_builder(&cap),
112                        records: records.session_with_builder(&cap),
113                        sharing: sharing.session_with_builder(&cap),
114                        batcher_records: batcher_records.session_with_builder(&cap),
115                        batcher_size: batcher_size.session_with_builder(&cap),
116                        batcher_capacity: batcher_capacity.session_with_builder(&cap),
117                        batcher_allocations: batcher_allocations.session_with_builder(&cap),
118                    };
119
120                    for (time, event) in data.flat_map(|data: &mut Vec<_>| data.drain(..)) {
121                        DemuxHandler {
122                            state: &mut demux_state,
123                            output: &mut output_buffers,
124                            logging_interval_ms,
125                            time,
126                            shared_state: &mut shared_state.borrow_mut(),
127                        }
128                        .handle(event);
129                    }
130                });
131            }
132        });
133
134        // We're lucky and the differential logs all have the same stream format, so just implement
135        // the call once.
136        let stream_to_collection = |input: &Stream<_, ((usize, ()), Timestamp, Diff)>, log| {
137            let worker_id = scope.index();
138            consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
139                input,
140                log,
141                move |data, packer, session| {
142                    for ((op, ()), time, diff) in data.iter() {
143                        let data = packer.pack_slice(&[
144                            Datum::UInt64(u64::cast_from(*op)),
145                            Datum::UInt64(u64::cast_from(worker_id)),
146                        ]);
147                        session.give((data, *time, *diff))
148                    }
149                },
150            )
151        };
152
153        // Encode the contents of each logging stream into its expected `Row` format.
154        let arrangement_batches = stream_to_collection(&batches, ArrangementBatches);
155        let arrangement_records = stream_to_collection(&records, ArrangementRecords);
156        let sharing = stream_to_collection(&sharing, Sharing);
157        let batcher_records = stream_to_collection(&batcher_records, BatcherRecords);
158        let batcher_size = stream_to_collection(&batcher_size, BatcherSize);
159        let batcher_capacity = stream_to_collection(&batcher_capacity, BatcherCapacity);
160        let batcher_allocations = stream_to_collection(&batcher_allocations, BatcherAllocations);
161
162        use DifferentialLog::*;
163        let logs = [
164            (ArrangementBatches, arrangement_batches),
165            (ArrangementRecords, arrangement_records),
166            (Sharing, sharing),
167            (BatcherRecords, batcher_records),
168            (BatcherSize, batcher_size),
169            (BatcherCapacity, batcher_capacity),
170            (BatcherAllocations, batcher_allocations),
171        ];
172
173        // Build the output arrangements.
174        let mut collections = BTreeMap::new();
175        for (variant, collection) in logs {
176            let variant = LogVariant::Differential(variant);
177            if config.index_logs.contains_key(&variant) {
178                let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
179                    columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
180                );
181                let trace = collection
182                    .mz_arrange_core::<
183                        _,
184                        Col2ValBatcher<_, _, _, _>,
185                        RowRowBuilder<_, _>,
186                        RowRowSpine<_, _>,
187                    >(exchange, &format!("Arrange {variant:?}"))
188                    .trace;
189                let collection = LogCollection {
190                    trace,
191                    token: Rc::clone(&token),
192                };
193                collections.insert(variant, collection);
194            }
195        }
196
197        Return { collections }
198    })
199}
200
201type OutputSession<'a, 'b, D> = Session<
202    'a,
203    'b,
204    Timestamp,
205    ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>,
206    InputCapability<Timestamp>,
207>;
208
209/// Bundled output buffers used by the demux operator.
210struct DemuxOutput<'a, 'b> {
211    batches: OutputSession<'a, 'b, (usize, ())>,
212    records: OutputSession<'a, 'b, (usize, ())>,
213    sharing: OutputSession<'a, 'b, (usize, ())>,
214    batcher_records: OutputSession<'a, 'b, (usize, ())>,
215    batcher_size: OutputSession<'a, 'b, (usize, ())>,
216    batcher_capacity: OutputSession<'a, 'b, (usize, ())>,
217    batcher_allocations: OutputSession<'a, 'b, (usize, ())>,
218}
219
220/// State maintained by the demux operator.
221#[derive(Default)]
222struct DemuxState {
223    /// Arrangement trace sharing
224    sharing: BTreeMap<usize, usize>,
225}
226
227/// Event handler of the demux operator.
228struct DemuxHandler<'a, 'b, 'c> {
229    /// State kept by the demux operator
230    state: &'a mut DemuxState,
231    /// Demux output buffers.
232    output: &'a mut DemuxOutput<'b, 'c>,
233    /// The logging interval specifying the time granularity for the updates.
234    logging_interval_ms: u128,
235    /// The current event time.
236    time: Duration,
237    /// State shared across log receivers.
238    shared_state: &'a mut SharedLoggingState,
239}
240
241impl DemuxHandler<'_, '_, '_> {
242    /// Return the timestamp associated with the current event, based on the event time and the
243    /// logging interval.
244    fn ts(&self) -> Timestamp {
245        let time_ms = self.time.as_millis();
246        let interval = self.logging_interval_ms;
247        let rounded = (time_ms / interval + 1) * interval;
248        rounded.try_into().expect("must fit")
249    }
250
251    /// Handle the given differential event.
252    fn handle(&mut self, event: DifferentialEvent) {
253        use DifferentialEvent::*;
254
255        match event {
256            Batch(e) => self.handle_batch(e),
257            Merge(e) => self.handle_merge(e),
258            Drop(e) => self.handle_drop(e),
259            TraceShare(e) => self.handle_trace_share(e),
260            Batcher(e) => self.handle_batcher_event(e),
261            _ => (),
262        }
263    }
264
265    fn handle_batch(&mut self, event: BatchEvent) {
266        let ts = self.ts();
267        let operator_id = event.operator;
268        self.output.batches.give(((operator_id, ()), ts, Diff::ONE));
269
270        let diff = Diff::try_from(event.length).expect("must fit");
271        self.output.records.give(((operator_id, ()), ts, diff));
272        self.notify_arrangement_size(operator_id);
273    }
274
275    fn handle_merge(&mut self, event: MergeEvent) {
276        let Some(done) = event.complete else { return };
277
278        let ts = self.ts();
279        let operator_id = event.operator;
280        self.output
281            .batches
282            .give(((operator_id, ()), ts, Diff::MINUS_ONE));
283
284        let diff = Diff::try_from(done).expect("must fit")
285            - Diff::try_from(event.length1 + event.length2).expect("must fit");
286        if diff != Diff::ZERO {
287            self.output.records.give(((operator_id, ()), ts, diff));
288        }
289        self.notify_arrangement_size(operator_id);
290    }
291
292    fn handle_drop(&mut self, event: DropEvent) {
293        let ts = self.ts();
294        let operator_id = event.operator;
295        self.output
296            .batches
297            .give(((operator_id, ()), ts, Diff::MINUS_ONE));
298
299        let diff = -Diff::try_from(event.length).expect("must fit");
300        if diff != Diff::ZERO {
301            self.output.records.give(((operator_id, ()), ts, diff));
302        }
303        self.notify_arrangement_size(operator_id);
304    }
305
306    fn handle_trace_share(&mut self, event: TraceShare) {
307        let ts = self.ts();
308        let operator_id = event.operator;
309        let diff = Diff::cast_from(event.diff);
310        debug_assert_ne!(diff, Diff::ZERO);
311        self.output.sharing.give(((operator_id, ()), ts, diff));
312
313        let sharing = self.state.sharing.entry(operator_id).or_default();
314        *sharing = (Diff::try_from(*sharing).expect("must fit") + diff)
315            .into_inner()
316            .try_into()
317            .expect("under/overflow");
318        if *sharing == 0 {
319            self.state.sharing.remove(&operator_id);
320            self.shared_state.compute_logger.as_ref().map(|logger| {
321                logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
322                    ArrangementHeapSizeOperatorDrop { operator_id },
323                ))
324            });
325        }
326    }
327
328    fn handle_batcher_event(&mut self, event: BatcherEvent) {
329        let ts = self.ts();
330        let operator_id = event.operator;
331        let records_diff = Diff::cast_from(event.records_diff);
332        let size_diff = Diff::cast_from(event.size_diff);
333        let capacity_diff = Diff::cast_from(event.capacity_diff);
334        let allocations_diff = Diff::cast_from(event.allocations_diff);
335        self.output
336            .batcher_records
337            .give(((operator_id, ()), ts, records_diff));
338        self.output
339            .batcher_size
340            .give(((operator_id, ()), ts, size_diff));
341        self.output
342            .batcher_capacity
343            .give(((operator_id, ()), ts, capacity_diff));
344        self.output
345            .batcher_allocations
346            .give(((operator_id, ()), ts, allocations_diff));
347    }
348
349    fn notify_arrangement_size(&self, operator: usize) {
350        // While every arrangement should have a corresponding arrangement size operator,
351        // we have no guarantee that it already/still exists. Otherwise we could print a warning
352        // here, but it's difficult to implement without maintaining state for a longer period than
353        // while the arrangement actually exists.
354        if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
355            activator.activate();
356        }
357    }
358}