mz_compute/logging/
differential.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by differential dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::logging::{
19    BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
20};
21use mz_ore::cast::CastFrom;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::containers::{
24    Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange,
25};
26use mz_timely_util::replay::MzReplay;
27use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
28use timely::dataflow::channels::pushers::buffer::Session;
29use timely::dataflow::channels::pushers::{Counter, Tee};
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::dataflow::{Scope, Stream};
32
33use crate::extensions::arrange::MzArrangeCore;
34use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
35use crate::logging::{
36    DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
37    consolidate_and_pack,
38};
39use crate::row_spine::RowRowBuilder;
40use crate::typedefs::{KeyBatcher, RowRowSpine};
41
42/// The return type of [`construct`].
43pub(super) struct Return {
44    /// Collections to export.
45    pub collections: BTreeMap<LogVariant, LogCollection>,
46}
47
48/// Constructs the logging dataflow fragment for differential logs.
49///
50/// Params
51/// * `scope`: The Timely scope hosting the log analysis dataflow.
52/// * `config`: Logging configuration
53/// * `event_queue`: The source to read log events from.
54/// * `shared_state`: Shared state across all logging dataflow fragments.
55pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
56    mut scope: G,
57    config: &mz_compute_client::logging::LoggingConfig,
58    event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
59    shared_state: Rc<RefCell<SharedLoggingState>>,
60) -> Return {
61    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
62
63    scope.scoped("differential logging", move |scope| {
64        let enable_logging = config.enable_logging;
65        let (logs, token) = event_queue.links
66            .mz_replay::<_, ProvidedBuilder<_>, _>(
67                scope,
68                "differential logs",
69                config.interval,
70                event_queue.activator,
71                move |mut session, mut data|{
72                    // If logging is disabled, we still need to install the indexes, but we can leave them
73                    // empty. We do so by immediately filtering all logs events.
74                    if enable_logging {
75                        session.give_container(data.to_mut())
76                    }
77                }
78            );
79
80        // Build a demux operator that splits the replayed event stream up into the separate
81        // logging streams.
82        let mut demux =
83            OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
84        let mut input = demux.new_input(&logs, Pipeline);
85        let (mut batches_out, batches) = demux.new_output();
86        let (mut records_out, records) = demux.new_output();
87        let (mut sharing_out, sharing) = demux.new_output();
88        let (mut batcher_records_out, batcher_records) = demux.new_output();
89        let (mut batcher_size_out, batcher_size) = demux.new_output();
90        let (mut batcher_capacity_out, batcher_capacity) = demux.new_output();
91        let (mut batcher_allocations_out, batcher_allocations) = demux.new_output();
92
93        let mut demux_state = Default::default();
94        demux.build(move |_capability| {
95            move |_frontiers| {
96                let mut batches = batches_out.activate();
97                let mut records = records_out.activate();
98                let mut sharing = sharing_out.activate();
99                let mut batcher_records = batcher_records_out.activate();
100                let mut batcher_size = batcher_size_out.activate();
101                let mut batcher_capacity = batcher_capacity_out.activate();
102                let mut batcher_allocations = batcher_allocations_out.activate();
103
104                input.for_each(|cap, data| {
105                    let mut output_buffers = DemuxOutput {
106                        batches: batches.session_with_builder(&cap),
107                        records: records.session_with_builder(&cap),
108                        sharing: sharing.session_with_builder(&cap),
109                        batcher_records: batcher_records.session_with_builder(&cap),
110                        batcher_size: batcher_size.session_with_builder(&cap),
111                        batcher_capacity: batcher_capacity.session_with_builder(&cap),
112                        batcher_allocations: batcher_allocations.session_with_builder(&cap),
113                    };
114
115                    for (time, event) in data.drain(..) {
116                        DemuxHandler {
117                            state: &mut demux_state,
118                            output: &mut output_buffers,
119                            logging_interval_ms,
120                            time,
121                            shared_state: &mut shared_state.borrow_mut(),
122                        }
123                        .handle(event);
124                    }
125                });
126            }
127        });
128
129        // We're lucky and the differential logs all have the same stream format, so just implement
130        // the call once.
131        let stream_to_collection = |input: &Stream<_, ((usize, ()), Timestamp, Diff)>, log| {
132            let worker_id = scope.index();
133            consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
134                input,
135                log,
136                move |((op, ()), time, diff), packer, session| {
137                    let data = packer.pack_slice(&[
138                        Datum::UInt64(u64::cast_from(*op)),
139                        Datum::UInt64(u64::cast_from(worker_id)),
140                    ]);
141                    session.give((data, *time, *diff))
142                },
143            )
144        };
145
146        // Encode the contents of each logging stream into its expected `Row` format.
147        let arrangement_batches = stream_to_collection(&batches, ArrangementBatches);
148        let arrangement_records = stream_to_collection(&records, ArrangementRecords);
149        let sharing = stream_to_collection(&sharing, Sharing);
150        let batcher_records = stream_to_collection(&batcher_records, BatcherRecords);
151        let batcher_size = stream_to_collection(&batcher_size, BatcherSize);
152        let batcher_capacity = stream_to_collection(&batcher_capacity, BatcherCapacity);
153        let batcher_allocations = stream_to_collection(&batcher_allocations, BatcherAllocations);
154
155        use DifferentialLog::*;
156        let logs = [
157            (ArrangementBatches, arrangement_batches),
158            (ArrangementRecords, arrangement_records),
159            (Sharing, sharing),
160            (BatcherRecords, batcher_records),
161            (BatcherSize, batcher_size),
162            (BatcherCapacity, batcher_capacity),
163            (BatcherAllocations, batcher_allocations),
164        ];
165
166        // Build the output arrangements.
167        let mut collections = BTreeMap::new();
168        for (variant, collection) in logs {
169            let variant = LogVariant::Differential(variant);
170            if config.index_logs.contains_key(&variant) {
171                let trace = collection
172                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
173                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>),
174                        &format!("Arrange {variant:?}"),
175                    )
176                    .trace;
177                let collection = LogCollection {
178                    trace,
179                    token: Rc::clone(&token),
180                };
181                collections.insert(variant, collection);
182            }
183        }
184
185        Return { collections, }
186    })
187}
188
189type Pusher<D> =
190    Counter<Timestamp, Vec<(D, Timestamp, Diff)>, Tee<Timestamp, Vec<(D, Timestamp, Diff)>>>;
191type OutputSession<'a, D> =
192    Session<'a, Timestamp, ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>, Pusher<D>>;
193
194/// Bundled output buffers used by the demux operator.
195struct DemuxOutput<'a> {
196    batches: OutputSession<'a, (usize, ())>,
197    records: OutputSession<'a, (usize, ())>,
198    sharing: OutputSession<'a, (usize, ())>,
199    batcher_records: OutputSession<'a, (usize, ())>,
200    batcher_size: OutputSession<'a, (usize, ())>,
201    batcher_capacity: OutputSession<'a, (usize, ())>,
202    batcher_allocations: OutputSession<'a, (usize, ())>,
203}
204
205/// State maintained by the demux operator.
206#[derive(Default)]
207struct DemuxState {
208    /// Arrangement trace sharing
209    sharing: BTreeMap<usize, usize>,
210}
211
212/// Event handler of the demux operator.
213struct DemuxHandler<'a, 'b> {
214    /// State kept by the demux operator
215    state: &'a mut DemuxState,
216    /// Demux output buffers.
217    output: &'a mut DemuxOutput<'b>,
218    /// The logging interval specifying the time granularity for the updates.
219    logging_interval_ms: u128,
220    /// The current event time.
221    time: Duration,
222    /// State shared across log receivers.
223    shared_state: &'a mut SharedLoggingState,
224}
225
226impl DemuxHandler<'_, '_> {
227    /// Return the timestamp associated with the current event, based on the event time and the
228    /// logging interval.
229    fn ts(&self) -> Timestamp {
230        let time_ms = self.time.as_millis();
231        let interval = self.logging_interval_ms;
232        let rounded = (time_ms / interval + 1) * interval;
233        rounded.try_into().expect("must fit")
234    }
235
236    /// Handle the given differential event.
237    fn handle(&mut self, event: DifferentialEvent) {
238        use DifferentialEvent::*;
239
240        match event {
241            Batch(e) => self.handle_batch(e),
242            Merge(e) => self.handle_merge(e),
243            Drop(e) => self.handle_drop(e),
244            TraceShare(e) => self.handle_trace_share(e),
245            Batcher(e) => self.handle_batcher_event(e),
246            _ => (),
247        }
248    }
249
250    fn handle_batch(&mut self, event: BatchEvent) {
251        let ts = self.ts();
252        let operator_id = event.operator;
253        self.output.batches.give(((operator_id, ()), ts, Diff::ONE));
254
255        let diff = Diff::try_from(event.length).expect("must fit");
256        self.output.records.give(((operator_id, ()), ts, diff));
257        self.notify_arrangement_size(operator_id);
258    }
259
260    fn handle_merge(&mut self, event: MergeEvent) {
261        let Some(done) = event.complete else { return };
262
263        let ts = self.ts();
264        let operator_id = event.operator;
265        self.output
266            .batches
267            .give(((operator_id, ()), ts, Diff::MINUS_ONE));
268
269        let diff = Diff::try_from(done).expect("must fit")
270            - Diff::try_from(event.length1 + event.length2).expect("must fit");
271        if diff != Diff::ZERO {
272            self.output.records.give(((operator_id, ()), ts, diff));
273        }
274        self.notify_arrangement_size(operator_id);
275    }
276
277    fn handle_drop(&mut self, event: DropEvent) {
278        let ts = self.ts();
279        let operator_id = event.operator;
280        self.output
281            .batches
282            .give(((operator_id, ()), ts, Diff::MINUS_ONE));
283
284        let diff = -Diff::try_from(event.length).expect("must fit");
285        if diff != Diff::ZERO {
286            self.output.records.give(((operator_id, ()), ts, diff));
287        }
288        self.notify_arrangement_size(operator_id);
289    }
290
291    fn handle_trace_share(&mut self, event: TraceShare) {
292        let ts = self.ts();
293        let operator_id = event.operator;
294        let diff = Diff::cast_from(event.diff);
295        debug_assert_ne!(diff, Diff::ZERO);
296        self.output.sharing.give(((operator_id, ()), ts, diff));
297
298        let sharing = self.state.sharing.entry(operator_id).or_default();
299        *sharing = (Diff::try_from(*sharing).expect("must fit") + diff)
300            .into_inner()
301            .try_into()
302            .expect("under/overflow");
303        if *sharing == 0 {
304            self.state.sharing.remove(&operator_id);
305            self.shared_state.compute_logger.as_ref().map(|logger| {
306                logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
307                    ArrangementHeapSizeOperatorDrop { operator_id },
308                ))
309            });
310        }
311    }
312
313    fn handle_batcher_event(&mut self, event: BatcherEvent) {
314        let ts = self.ts();
315        let operator_id = event.operator;
316        let records_diff = Diff::cast_from(event.records_diff);
317        let size_diff = Diff::cast_from(event.size_diff);
318        let capacity_diff = Diff::cast_from(event.capacity_diff);
319        let allocations_diff = Diff::cast_from(event.allocations_diff);
320        self.output
321            .batcher_records
322            .give(((operator_id, ()), ts, records_diff));
323        self.output
324            .batcher_size
325            .give(((operator_id, ()), ts, size_diff));
326        self.output
327            .batcher_capacity
328            .give(((operator_id, ()), ts, capacity_diff));
329        self.output
330            .batcher_allocations
331            .give(((operator_id, ()), ts, allocations_diff));
332    }
333
334    fn notify_arrangement_size(&self, operator: usize) {
335        // While every arrangement should have a corresponding arrangement size operator,
336        // we have no guarantee that it already/still exists. Otherwise we could print a warning
337        // here, but it's difficult to implement without maintaining state for a longer period than
338        // while the arrangement actually exists.
339        if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
340            activator.activate();
341        }
342    }
343}