mz_compute/
logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by various subsystems.
11
12pub mod compute;
13mod differential;
14pub(super) mod initialize;
15mod reachability;
16mod timely;
17
18use std::any::Any;
19use std::collections::BTreeMap;
20use std::marker::PhantomData;
21use std::rc::Rc;
22use std::time::Duration;
23
24use ::timely::Container;
25use ::timely::container::{CapacityContainerBuilder, ContainerBuilder};
26use ::timely::dataflow::StreamCore;
27use ::timely::dataflow::channels::pact::Pipeline;
28use ::timely::dataflow::channels::pushers::buffer::Session;
29use ::timely::dataflow::channels::pushers::{Counter, Tee};
30use ::timely::dataflow::operators::Operator;
31use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
32use ::timely::progress::Timestamp as TimelyTimestamp;
33use ::timely::scheduling::Activator;
34use differential_dataflow::trace::Batcher;
35use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
36use mz_expr::{MirScalarExpr, permutation_for_arrangement};
37use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
38use mz_timely_util::activator::RcActivator;
39use mz_timely_util::containers::ColumnBuilder;
40use mz_timely_util::operator::consolidate_pact;
41
42use crate::logging::compute::Logger as ComputeLogger;
43use crate::typedefs::RowRowAgent;
44
45pub use crate::logging::initialize::initialize;
46
47/// An update of value `D` at a time and with a diff.
48pub(super) type Update<D> = (D, Timestamp, Diff);
49/// A pusher for containers `C`.
50pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
51/// An output session for the specified container builder.
52pub(super) type OutputSession<'a, CB> =
53    Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;
54/// An output session for vector-based containers of updates `D`, using a capacity container builder.
55pub(super) type OutputSessionVec<'a, D> = OutputSession<'a, CapacityContainerBuilder<Vec<D>>>;
56/// An output session for columnar containers of updates `D`, using a column builder.
57pub(super) type OutputSessionColumnar<'a, D> = OutputSession<'a, ColumnBuilder<D>>;
58
59/// Logs events as a timely stream, with progress statements.
60struct BatchLogger<C, P>
61where
62    P: EventPusher<Timestamp, C>,
63{
64    /// Time in milliseconds of the current expressed capability.
65    time_ms: Timestamp,
66    /// Pushes events to the logging dataflow.
67    event_pusher: P,
68    /// Each time is advanced to the strictly next millisecond that is a multiple of this interval.
69    /// This means we should be able to perform the same action on timestamp capabilities, and only
70    /// flush buffers when this timestamp advances.
71    interval_ms: u128,
72    _marker: PhantomData<C>,
73}
74
75impl<C, P> BatchLogger<C, P>
76where
77    P: EventPusher<Timestamp, C>,
78{
79    /// Creates a new batch logger.
80    fn new(event_pusher: P, interval_ms: u128) -> Self {
81        BatchLogger {
82            time_ms: Timestamp::minimum(),
83            event_pusher,
84            interval_ms,
85            _marker: PhantomData,
86        }
87    }
88}
89
90impl<C, P> BatchLogger<C, P>
91where
92    P: EventPusher<Timestamp, C>,
93    C: Container,
94{
95    /// Publishes a batch of logged events.
96    fn publish_batch(&mut self, data: C) {
97        self.event_pusher.push(Event::Messages(self.time_ms, data));
98    }
99
100    /// Indicate progress up to `time`, advances the capability.
101    ///
102    /// Returns `true` if the capability was advanced.
103    fn report_progress(&mut self, time: Duration) -> bool {
104        let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
105        let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
106        if self.time_ms < new_time_ms {
107            self.event_pusher
108                .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
109            self.time_ms = new_time_ms;
110            true
111        } else {
112            false
113        }
114    }
115}
116
117impl<C, P> Drop for BatchLogger<C, P>
118where
119    P: EventPusher<Timestamp, C>,
120{
121    fn drop(&mut self) {
122        self.event_pusher
123            .push(Event::Progress(vec![(self.time_ms, -1)]));
124    }
125}
126
127/// Parts to connect a logging dataflows the timely runtime.
128///
129/// This is just a bundle-type intended to make passing around its contents in the logging
130/// initialization code more convenient.
131///
132/// The `N` type parameter specifies the number of links to create for the event queue. We need
133/// separate links for queues that feed from multiple loggers because the `EventLink` type is not
134/// multi-producer safe (it is a linked-list, and multiple writers would blindly append, replacing
135/// existing new data, and cutting off other writers).
136#[derive(Clone)]
137struct EventQueue<C, const N: usize = 1> {
138    links: [Rc<EventLink<Timestamp, C>>; N],
139    activator: RcActivator,
140}
141
142impl<C, const N: usize> EventQueue<C, N> {
143    fn new(name: &str) -> Self {
144        let activator_name = format!("{name}_activator");
145        let activate_after = 128;
146        Self {
147            links: [(); N].map(|_| Rc::new(EventLink::new())),
148            activator: RcActivator::new(activator_name, activate_after),
149        }
150    }
151}
152
153/// State shared between different logging dataflow fragments.
154#[derive(Default)]
155struct SharedLoggingState {
156    /// Activators for arrangement heap size operators.
157    arrangement_size_activators: BTreeMap<usize, Activator>,
158    /// Shared compute logger.
159    compute_logger: Option<ComputeLogger>,
160}
161
162/// Helper to pack collections of [`Datum`]s into key and value row.
163pub(crate) struct PermutedRowPacker {
164    key: Vec<usize>,
165    value: Vec<usize>,
166    key_row: Row,
167    value_row: Row,
168}
169
170impl PermutedRowPacker {
171    /// Construct based on the information within the log variant.
172    pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
173        let variant = variant.into();
174        let key = variant.index_by();
175        let (_, value) = permutation_for_arrangement(
176            &key.iter()
177                .cloned()
178                .map(MirScalarExpr::Column)
179                .collect::<Vec<_>>(),
180            variant.desc().arity(),
181        );
182        Self {
183            key,
184            value,
185            key_row: Row::default(),
186            value_row: Row::default(),
187        }
188    }
189
190    /// Pack a slice of datums suitable for the key columns in the log variant.
191    pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) {
192        self.pack_by_index(|packer, index| packer.push(datums[index]))
193    }
194
195    /// Pack a slice of datums suitable for the key columns in the log variant, returning owned
196    /// rows.
197    ///
198    /// This is equivalent to calling [`PermutedRowPacker::pack_slice`] and then calling `to_owned`
199    /// on the returned rows.
200    pub(crate) fn pack_slice_owned(&mut self, datums: &[Datum]) -> (Row, Row) {
201        let (key, value) = self.pack_slice(datums);
202        (key.to_owned(), value.to_owned())
203    }
204
205    /// Pack using a callback suitable for the key columns in the log variant.
206    pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(
207        &mut self,
208        logic: F,
209    ) -> (&RowRef, &RowRef) {
210        let mut packer = self.key_row.packer();
211        for index in &self.key {
212            logic(&mut packer, *index);
213        }
214
215        let mut packer = self.value_row.packer();
216        for index in &self.value {
217            logic(&mut packer, *index);
218        }
219
220        (&self.key_row, &self.value_row)
221    }
222}
223
224/// Information about a collection exported from a logging dataflow.
225struct LogCollection {
226    /// Trace handle providing access to the logged records.
227    trace: RowRowAgent<Timestamp, Diff>,
228    /// Token that should be dropped to drop this collection.
229    token: Rc<dyn Any>,
230}
231
232/// A single-purpose function to consolidate and pack updates for log collection.
233///
234/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts
235/// the updates into `(Row, Row)` pairs using the provided logic function. It is crucial that the
236/// data is not exchanged between workers, as the consolidation would not function as desired
237/// otherwise.
238pub(super) fn consolidate_and_pack<G, B, CB, L, F>(
239    input: &StreamCore<G, B::Input>,
240    log: L,
241    mut logic: F,
242) -> StreamCore<G, CB::Container>
243where
244    G: ::timely::dataflow::Scope<Timestamp = Timestamp>,
245    B: Batcher<Time = G::Timestamp> + 'static,
246    B::Input: Container + Clone + 'static,
247    B::Output: Container + Clone + 'static,
248    CB: ContainerBuilder,
249    L: Into<LogVariant>,
250    F: for<'a> FnMut(
251            <B::Output as Container>::ItemRef<'a>,
252            &mut PermutedRowPacker,
253            &mut OutputSession<CB>,
254        ) + 'static,
255{
256    let log = log.into();
257    // TODO: Use something other than the debug representation of the log variant as a name.
258    let c_name = &format!("Consolidate {log:?}");
259    let u_name = &format!("ToRow {log:?}");
260    let mut packer = PermutedRowPacker::new(log);
261    let consolidated = consolidate_pact::<B, _, _>(input, Pipeline, c_name);
262    consolidated.unary::<CB, _, _, _>(Pipeline, u_name, |_, _| {
263        move |input, output| {
264            while let Some((time, data)) = input.next() {
265                let mut session = output.session_with_builder(&time);
266                for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
267                    logic(item, &mut packer, &mut session);
268                }
269            }
270        }
271    })
272}