mz_compute/
logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by various subsystems.
11
12pub mod compute;
13mod differential;
14pub(super) mod initialize;
15mod reachability;
16mod timely;
17
18use std::any::Any;
19use std::collections::BTreeMap;
20use std::marker::PhantomData;
21use std::rc::Rc;
22use std::time::Duration;
23
24use ::timely::container::CapacityContainerBuilder;
25use ::timely::dataflow::StreamCore;
26use ::timely::dataflow::channels::pact::Pipeline;
27use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
28use ::timely::dataflow::operators::generic::Session;
29use ::timely::dataflow::operators::{InputCapability, Operator};
30use ::timely::progress::Timestamp as TimelyTimestamp;
31use ::timely::scheduling::Activator;
32use ::timely::{Container, ContainerBuilder};
33use differential_dataflow::trace::Batcher;
34use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
35use mz_expr::{MirScalarExpr, permutation_for_arrangement};
36use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
37use mz_timely_util::activator::RcActivator;
38use mz_timely_util::columnar::builder::ColumnBuilder;
39use mz_timely_util::operator::consolidate_pact;
40
41use crate::logging::compute::Logger as ComputeLogger;
42use crate::typedefs::RowRowAgent;
43
44pub use crate::logging::initialize::initialize;
45
46/// An update of value `D` at a time and with a diff.
47pub(super) type Update<D> = (D, Timestamp, Diff);
48/// A pusher for containers `C`.
49/// An output session for the specified container builder.
50pub(super) type OutputSession<'a, 'b, CB> =
51    Session<'a, 'b, Timestamp, CB, InputCapability<Timestamp>>;
52/// An output session for vector-based containers of updates `D`, using a capacity container builder.
53pub(super) type OutputSessionVec<'a, 'b, D> =
54    OutputSession<'a, 'b, CapacityContainerBuilder<Vec<D>>>;
55/// An output session for columnar containers of updates `D`, using a column builder.
56pub(super) type OutputSessionColumnar<'a, 'b, D> = OutputSession<'a, 'b, ColumnBuilder<D>>;
57
58/// Logs events as a timely stream, with progress statements.
59struct BatchLogger<C, P>
60where
61    P: EventPusher<Timestamp, C>,
62{
63    /// Time in milliseconds of the current expressed capability.
64    time_ms: Timestamp,
65    /// Pushes events to the logging dataflow.
66    event_pusher: P,
67    /// Each time is advanced to the strictly next millisecond that is a multiple of this interval.
68    /// This means we should be able to perform the same action on timestamp capabilities, and only
69    /// flush buffers when this timestamp advances.
70    interval_ms: u128,
71    _marker: PhantomData<C>,
72}
73
74impl<C, P> BatchLogger<C, P>
75where
76    P: EventPusher<Timestamp, C>,
77{
78    /// Creates a new batch logger.
79    fn new(event_pusher: P, interval_ms: u128) -> Self {
80        BatchLogger {
81            time_ms: Timestamp::minimum(),
82            event_pusher,
83            interval_ms,
84            _marker: PhantomData,
85        }
86    }
87}
88
89impl<C, P> BatchLogger<C, P>
90where
91    P: EventPusher<Timestamp, C>,
92    C: Container,
93{
94    /// Publishes a batch of logged events.
95    fn publish_batch(&mut self, data: C) {
96        self.event_pusher.push(Event::Messages(self.time_ms, data));
97    }
98
99    /// Indicate progress up to `time`, advances the capability.
100    ///
101    /// Returns `true` if the capability was advanced.
102    fn report_progress(&mut self, time: Duration) -> bool {
103        let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
104        let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
105        if self.time_ms < new_time_ms {
106            self.event_pusher
107                .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
108            self.time_ms = new_time_ms;
109            true
110        } else {
111            false
112        }
113    }
114}
115
116impl<C, P> Drop for BatchLogger<C, P>
117where
118    P: EventPusher<Timestamp, C>,
119{
120    fn drop(&mut self) {
121        self.event_pusher
122            .push(Event::Progress(vec![(self.time_ms, -1)]));
123    }
124}
125
126/// Parts to connect a logging dataflows the timely runtime.
127///
128/// This is just a bundle-type intended to make passing around its contents in the logging
129/// initialization code more convenient.
130///
131/// The `N` type parameter specifies the number of links to create for the event queue. We need
132/// separate links for queues that feed from multiple loggers because the `EventLink` type is not
133/// multi-producer safe (it is a linked-list, and multiple writers would blindly append, replacing
134/// existing new data, and cutting off other writers).
135#[derive(Clone)]
136struct EventQueue<C, const N: usize = 1> {
137    links: [Rc<EventLink<Timestamp, C>>; N],
138    activator: RcActivator,
139}
140
141impl<C, const N: usize> EventQueue<C, N> {
142    fn new(name: &str) -> Self {
143        let activator_name = format!("{name}_activator");
144        let activate_after = 128;
145        Self {
146            links: [(); N].map(|_| Rc::new(EventLink::new())),
147            activator: RcActivator::new(activator_name, activate_after),
148        }
149    }
150}
151
152/// State shared between different logging dataflow fragments.
153#[derive(Default)]
154struct SharedLoggingState {
155    /// Activators for arrangement heap size operators.
156    arrangement_size_activators: BTreeMap<usize, Activator>,
157    /// Shared compute logger.
158    compute_logger: Option<ComputeLogger>,
159}
160
161/// Helper to pack collections of [`Datum`]s into key and value row.
162pub(crate) struct PermutedRowPacker {
163    key: Vec<usize>,
164    value: Vec<usize>,
165    key_row: Row,
166    value_row: Row,
167}
168
169impl PermutedRowPacker {
170    /// Construct based on the information within the log variant.
171    pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
172        let variant = variant.into();
173        let key = variant.index_by();
174        let (_, value) = permutation_for_arrangement(
175            &key.iter()
176                .cloned()
177                .map(MirScalarExpr::column)
178                .collect::<Vec<_>>(),
179            variant.desc().arity(),
180        );
181        Self {
182            key,
183            value,
184            key_row: Row::default(),
185            value_row: Row::default(),
186        }
187    }
188
189    /// Pack a slice of datums suitable for the key columns in the log variant.
190    pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) {
191        self.pack_by_index(|packer, index| packer.push(datums[index]))
192    }
193
194    /// Pack using a callback suitable for the key columns in the log variant.
195    pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(
196        &mut self,
197        logic: F,
198    ) -> (&RowRef, &RowRef) {
199        let mut packer = self.key_row.packer();
200        for index in &self.key {
201            logic(&mut packer, *index);
202        }
203
204        let mut packer = self.value_row.packer();
205        for index in &self.value {
206            logic(&mut packer, *index);
207        }
208
209        (&self.key_row, &self.value_row)
210    }
211}
212
213/// Information about a collection exported from a logging dataflow.
214struct LogCollection {
215    /// Trace handle providing access to the logged records.
216    trace: RowRowAgent<Timestamp, Diff>,
217    /// Token that should be dropped to drop this collection.
218    token: Rc<dyn Any>,
219}
220
221/// A single-purpose function to consolidate and pack updates for log collection.
222///
223/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts
224/// the updates into `(Row, Row)` pairs using the provided logic function. It is crucial that the
225/// data is not exchanged between workers, as the consolidation would not function as desired
226/// otherwise.
227pub(super) fn consolidate_and_pack<G, B, CB, L, F>(
228    input: &StreamCore<G, B::Input>,
229    log: L,
230    mut logic: F,
231) -> StreamCore<G, CB::Container>
232where
233    G: ::timely::dataflow::Scope<Timestamp = Timestamp>,
234    B: Batcher<Time = G::Timestamp> + 'static,
235    B::Input: Container + Clone + 'static,
236    B::Output: Clone,
237    CB: ContainerBuilder,
238    L: Into<LogVariant>,
239    F: for<'a> FnMut(B::Output, &mut PermutedRowPacker, &mut OutputSession<CB>) + 'static,
240{
241    let log = log.into();
242    // TODO: Use something other than the debug representation of the log variant as a name.
243    let c_name = &format!("Consolidate {log:?}");
244    let u_name = &format!("ToRow {log:?}");
245    let mut packer = PermutedRowPacker::new(log);
246    let consolidated = consolidate_pact::<B, _, _>(input, Pipeline, c_name);
247    consolidated.unary::<CB, _, _, _>(Pipeline, u_name, |_, _| {
248        move |input, output| {
249            input.for_each_time(|time, data| {
250                let mut session = output.session_with_builder(&time);
251                for item in data.flatten().flat_map(|data| data.drain(..)) {
252                    logic(item, &mut packer, &mut session);
253                }
254            });
255        }
256    })
257}