Skip to main content

mz_compute/
logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by various subsystems.
11
12pub mod compute;
13mod differential;
14pub(super) mod initialize;
15mod prometheus;
16mod reachability;
17mod timely;
18
19use std::any::Any;
20use std::collections::BTreeMap;
21use std::marker::PhantomData;
22use std::rc::Rc;
23use std::time::Duration;
24
25use ::timely::container::CapacityContainerBuilder;
26use ::timely::dataflow::Stream;
27use ::timely::dataflow::channels::pact::Pipeline;
28use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
29use ::timely::dataflow::operators::generic::Session;
30use ::timely::dataflow::operators::{InputCapability, Operator};
31use ::timely::progress::Timestamp as TimelyTimestamp;
32use ::timely::scheduling::Activator;
33use ::timely::{Container, ContainerBuilder};
34use differential_dataflow::trace::Batcher;
35use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
36use mz_expr::{MirScalarExpr, permutation_for_arrangement};
37use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
38use mz_timely_util::activator::RcActivator;
39use mz_timely_util::columnar::builder::ColumnBuilder;
40use mz_timely_util::operator::consolidate_pact;
41
42use crate::logging::compute::Logger as ComputeLogger;
43use crate::typedefs::RowRowAgent;
44
45pub use crate::logging::initialize::initialize;
46
47/// An update of value `D` at a time and with a diff.
48pub(super) type Update<D> = (D, Timestamp, Diff);
49/// A pusher for containers `C`.
50/// An output session for the specified container builder.
51pub(super) type OutputSession<'a, 'b, CB> =
52    Session<'a, 'b, Timestamp, CB, InputCapability<Timestamp>>;
53/// An output session for vector-based containers of updates `D`, using a capacity container builder.
54pub(super) type OutputSessionVec<'a, 'b, D> =
55    OutputSession<'a, 'b, CapacityContainerBuilder<Vec<D>>>;
56/// An output session for columnar containers of updates `D`, using a column builder.
57pub(super) type OutputSessionColumnar<'a, 'b, D> = OutputSession<'a, 'b, ColumnBuilder<D>>;
58
59/// Logs events as a timely stream, with progress statements.
60struct BatchLogger<C, P>
61where
62    P: EventPusher<Timestamp, C>,
63{
64    /// Time in milliseconds of the current expressed capability.
65    time_ms: Timestamp,
66    /// Pushes events to the logging dataflow.
67    event_pusher: P,
68    /// Each time is advanced to the strictly next millisecond that is a multiple of this interval.
69    /// This means we should be able to perform the same action on timestamp capabilities, and only
70    /// flush buffers when this timestamp advances.
71    interval_ms: u128,
72    _marker: PhantomData<C>,
73}
74
75impl<C, P> BatchLogger<C, P>
76where
77    P: EventPusher<Timestamp, C>,
78{
79    /// Creates a new batch logger.
80    fn new(event_pusher: P, interval_ms: u128) -> Self {
81        BatchLogger {
82            time_ms: Timestamp::minimum(),
83            event_pusher,
84            interval_ms,
85            _marker: PhantomData,
86        }
87    }
88}
89
90impl<C, P> BatchLogger<C, P>
91where
92    P: EventPusher<Timestamp, C>,
93    C: Container,
94{
95    /// Publishes a batch of logged events.
96    fn publish_batch(&mut self, data: C) {
97        self.event_pusher.push(Event::Messages(self.time_ms, data));
98    }
99
100    /// Indicate progress up to `time`, advances the capability.
101    ///
102    /// Returns `true` if the capability was advanced.
103    fn report_progress(&mut self, time: Duration) -> bool {
104        let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
105        let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
106        if self.time_ms < new_time_ms {
107            self.event_pusher
108                .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
109            self.time_ms = new_time_ms;
110            true
111        } else {
112            false
113        }
114    }
115}
116
117impl<C, P> Drop for BatchLogger<C, P>
118where
119    P: EventPusher<Timestamp, C>,
120{
121    fn drop(&mut self) {
122        self.event_pusher
123            .push(Event::Progress(vec![(self.time_ms, -1)]));
124    }
125}
126
127/// Parts to connect a logging dataflows the timely runtime.
128///
129/// This is just a bundle-type intended to make passing around its contents in the logging
130/// initialization code more convenient.
131///
132/// The `N` type parameter specifies the number of links to create for the event queue. We need
133/// separate links for queues that feed from multiple loggers because the `EventLink` type is not
134/// multi-producer safe (it is a linked-list, and multiple writers would blindly append, replacing
135/// existing new data, and cutting off other writers).
136#[derive(Clone)]
137struct EventQueue<C, const N: usize = 1> {
138    links: [Rc<EventLink<Timestamp, C>>; N],
139    activator: RcActivator,
140}
141
142impl<C, const N: usize> EventQueue<C, N> {
143    fn new(name: &str) -> Self {
144        let activator_name = format!("{name}_activator");
145        let activate_after = 128;
146        Self {
147            links: [(); N].map(|_| Rc::new(EventLink::new())),
148            activator: RcActivator::new(activator_name, activate_after),
149        }
150    }
151}
152
153/// State shared between different logging dataflow fragments.
154#[derive(Default)]
155struct SharedLoggingState {
156    /// Activators for arrangement heap size operators.
157    arrangement_size_activators: BTreeMap<usize, Activator>,
158    /// Shared compute logger.
159    compute_logger: Option<ComputeLogger>,
160}
161
162/// Helper to pack collections of [`Datum`]s into key and value row.
163pub(crate) struct PermutedRowPacker {
164    key: Vec<usize>,
165    value: Vec<usize>,
166    key_row: Row,
167    value_row: Row,
168}
169
170impl PermutedRowPacker {
171    /// Construct based on the information within the log variant.
172    pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
173        let variant = variant.into();
174        let key = variant.index_by();
175        let (_, value) = permutation_for_arrangement(
176            &key.iter()
177                .cloned()
178                .map(MirScalarExpr::column)
179                .collect::<Vec<_>>(),
180            variant.desc().arity(),
181        );
182        Self {
183            key,
184            value,
185            key_row: Row::default(),
186            value_row: Row::default(),
187        }
188    }
189
190    /// Pack a slice of datums suitable for the key columns in the log variant.
191    pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) {
192        self.pack_by_index(|packer, index| packer.push(datums[index]))
193    }
194
195    /// Pack using a callback suitable for the key columns in the log variant.
196    pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(
197        &mut self,
198        logic: F,
199    ) -> (&RowRef, &RowRef) {
200        let mut packer = self.key_row.packer();
201        for index in &self.key {
202            logic(&mut packer, *index);
203        }
204
205        let mut packer = self.value_row.packer();
206        for index in &self.value {
207            logic(&mut packer, *index);
208        }
209
210        (&self.key_row, &self.value_row)
211    }
212}
213
214/// Information about a collection exported from a logging dataflow.
215struct LogCollection {
216    /// Trace handle providing access to the logged records.
217    trace: RowRowAgent<Timestamp, Diff>,
218    /// Token that should be dropped to drop this collection.
219    token: Rc<dyn Any>,
220}
221
222/// A single-purpose function to consolidate and pack updates for log collection.
223///
224/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts
225/// the updates into `(Row, Row)` pairs using the provided logic function. It is crucial that the
226/// data is not exchanged between workers, as the consolidation would not function as desired
227/// otherwise.
228pub(super) fn consolidate_and_pack<G, B, CB, L, F>(
229    input: Stream<G, B::Input>,
230    log: L,
231    mut logic: F,
232) -> Stream<G, CB::Container>
233where
234    G: ::timely::dataflow::Scope<Timestamp = Timestamp>,
235    B: Batcher<Time = G::Timestamp> + 'static,
236    B::Input: Container + Clone + 'static,
237    B::Output: Clone,
238    CB: ContainerBuilder,
239    L: Into<LogVariant>,
240    F: for<'a> FnMut(B::Output, &mut PermutedRowPacker, &mut OutputSession<CB>) + 'static,
241{
242    let log = log.into();
243    // TODO: Use something other than the debug representation of the log variant as a name.
244    let c_name = &format!("Consolidate {log:?}");
245    let u_name = &format!("ToRow {log:?}");
246    let mut packer = PermutedRowPacker::new(log);
247    let consolidated = consolidate_pact::<B, _, _>(input, Pipeline, c_name);
248    consolidated.unary::<CB, _, _, _>(Pipeline, u_name, |_, _| {
249        move |input, output| {
250            input.for_each_time(|time, data| {
251                let mut session = output.session_with_builder(&time);
252                for item in data.flatten().flat_map(|data| data.drain(..)) {
253                    logic(item, &mut packer, &mut session);
254                }
255            });
256        }
257    })
258}