mz_compute/
logging.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Logging dataflows for events generated by various subsystems.

pub mod compute;
mod differential;
mod initialize;
mod reachability;
mod timely;

use std::any::Any;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::rc::Rc;
use std::time::Duration;

use ::timely::container::ContainerBuilder;
use ::timely::dataflow::channels::pact::Pipeline;
use ::timely::dataflow::channels::pushers::buffer::Session;
use ::timely::dataflow::channels::pushers::{Counter, Tee};
use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
use ::timely::dataflow::operators::Operator;
use ::timely::dataflow::StreamCore;
use ::timely::progress::Timestamp as TimelyTimestamp;
use ::timely::scheduling::Activator;
use ::timely::Container;
use differential_dataflow::trace::Batcher;
use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
use mz_timely_util::activator::RcActivator;
use mz_timely_util::operator::consolidate_pact;

use crate::logging::compute::Logger as ComputeLogger;
use crate::typedefs::RowRowAgent;

pub use crate::logging::initialize::initialize;

/// Logs events as a timely stream, with progress statements.
struct BatchLogger<C, P>
where
    P: EventPusher<Timestamp, C>,
{
    /// Time in milliseconds of the current expressed capability.
    time_ms: Timestamp,
    /// Pushes events to the logging dataflow.
    event_pusher: P,
    /// Each time is advanced to the strictly next millisecond that is a multiple of this interval.
    /// This means we should be able to perform the same action on timestamp capabilities, and only
    /// flush buffers when this timestamp advances.
    interval_ms: u128,
    _marker: PhantomData<C>,
}

impl<C, P> BatchLogger<C, P>
where
    P: EventPusher<Timestamp, C>,
{
    /// Creates a new batch logger.
    fn new(event_pusher: P, interval_ms: u128) -> Self {
        BatchLogger {
            time_ms: Timestamp::minimum(),
            event_pusher,
            interval_ms,
            _marker: PhantomData,
        }
    }
}

impl<C, P> BatchLogger<C, P>
where
    P: EventPusher<Timestamp, C>,
    C: Container,
{
    /// Publishes a batch of logged events.
    fn publish_batch(&mut self, data: C) {
        self.event_pusher.push(Event::Messages(self.time_ms, data));
    }

    /// Indicate progress up to `time`, advances the capability.
    ///
    /// Returns `true` if the capability was advanced.
    fn report_progress(&mut self, time: Duration) -> bool {
        let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
        let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
        if self.time_ms < new_time_ms {
            self.event_pusher
                .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
            self.time_ms = new_time_ms;
            true
        } else {
            false
        }
    }
}

impl<C, P> Drop for BatchLogger<C, P>
where
    P: EventPusher<Timestamp, C>,
{
    fn drop(&mut self) {
        self.event_pusher
            .push(Event::Progress(vec![(self.time_ms, -1)]));
    }
}

/// Parts to connect a logging dataflows the timely runtime.
///
/// This is just a bundle-type intended to make passing around its contents in the logging
/// initialization code more convenient.
///
/// The `N` type parameter specifies the number of links to create for the event queue. We need
/// separate links for queues that feed from multiple loggers because the `EventLink` type is not
/// multi-producer safe (it is a linked-list, and multiple writers would blindly append, replacing
/// existing new data, and cutting off other writers).
#[derive(Clone)]
struct EventQueue<C, const N: usize = 1> {
    links: [Rc<EventLink<Timestamp, C>>; N],
    activator: RcActivator,
}

impl<C, const N: usize> EventQueue<C, N> {
    fn new(name: &str) -> Self {
        let activator_name = format!("{name}_activator");
        let activate_after = 128;
        Self {
            links: [(); N].map(|_| Rc::new(EventLink::new())),
            activator: RcActivator::new(activator_name, activate_after),
        }
    }
}

/// State shared between different logging dataflows.
#[derive(Default)]
struct SharedLoggingState {
    /// Activators for arrangement heap size operators.
    arrangement_size_activators: BTreeMap<usize, Activator>,
    /// Shared compute logger.
    compute_logger: Option<ComputeLogger>,
}

/// Helper to pack collections of [`Datum`]s into key and value row.
pub(crate) struct PermutedRowPacker {
    key: Vec<usize>,
    value: Vec<usize>,
    key_row: Row,
    value_row: Row,
}

impl PermutedRowPacker {
    /// Construct based on the information within the log variant.
    pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
        let variant = variant.into();
        let key = variant.index_by();
        let (_, value) = permutation_for_arrangement(
            &key.iter()
                .cloned()
                .map(MirScalarExpr::Column)
                .collect::<Vec<_>>(),
            variant.desc().arity(),
        );
        Self {
            key,
            value,
            key_row: Row::default(),
            value_row: Row::default(),
        }
    }

    /// Pack a slice of datums suitable for the key columns in the log variant.
    pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) {
        self.pack_by_index(|packer, index| packer.push(datums[index]))
    }

    /// Pack a slice of datums suitable for the key columns in the log variant, returning owned
    /// rows.
    ///
    /// This is equivalent to calling [`PermutedRowPacker::pack_slice`] and then calling `to_owned`
    /// on the returned rows.
    pub(crate) fn pack_slice_owned(&mut self, datums: &[Datum]) -> (Row, Row) {
        let (key, value) = self.pack_slice(datums);
        (key.to_owned(), value.to_owned())
    }

    /// Pack using a callback suitable for the key columns in the log variant.
    pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(
        &mut self,
        logic: F,
    ) -> (&RowRef, &RowRef) {
        let mut packer = self.key_row.packer();
        for index in &self.key {
            logic(&mut packer, *index);
        }

        let mut packer = self.value_row.packer();
        for index in &self.value {
            logic(&mut packer, *index);
        }

        (&self.key_row, &self.value_row)
    }
}

/// Information about a collection exported from a logging dataflow.
struct LogCollection {
    /// Trace handle providing access to the logged records.
    trace: RowRowAgent<Timestamp, Diff>,
    /// Token that should be dropped to drop this collection.
    token: Rc<dyn Any>,
    /// Index of the dataflow exporting this collection.
    dataflow_index: usize,
}

pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
pub(super) type OutputSession<'a, CB> =
    Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;

/// A single-purpose function to consolidate and pack updates for log collection.
///
/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts
/// the updates into `(Row, Row)` pairs using the provided logic function. It is crucial that the
/// data is not exchanged between workers, as the consolidation would not function as desired
/// otherwise.
pub(super) fn consolidate_and_pack<G, B, CB, L, F>(
    input: &StreamCore<G, B::Input>,
    log: L,
    mut logic: F,
) -> StreamCore<G, CB::Container>
where
    G: ::timely::dataflow::Scope<Timestamp = Timestamp>,
    B: Batcher<Time = G::Timestamp> + 'static,
    B::Input: Container + Clone + 'static,
    B::Output: Container + Clone + 'static,
    CB: ContainerBuilder,
    L: Into<LogVariant>,
    F: for<'a> FnMut(
            <B::Output as Container>::ItemRef<'a>,
            &mut PermutedRowPacker,
            &mut OutputSession<CB>,
        ) + 'static,
{
    let log = log.into();
    // TODO: Use something other than the debug representation of the log variant as a name.
    let c_name = &format!("Consolidate {log:?}");
    let u_name = &format!("ToRow {log:?}");
    let mut packer = PermutedRowPacker::new(log);
    let consolidated = consolidate_pact::<B, _, _>(input, Pipeline, c_name);
    consolidated.unary::<CB, _, _, _>(Pipeline, u_name, |_, _| {
        move |input, output| {
            while let Some((time, data)) = input.next() {
                let mut session = output.session_with_builder(&time);
                for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
                    logic(item, &mut packer, &mut session);
                }
            }
        }
    })
}