Skip to main content

mz_compute_client/
logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute layer logging configuration.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::{GlobalId, RelationDesc, SqlScalarType};
16use serde::{Deserialize, Serialize};
17
18/// Logging configuration.
19///
20/// Setting `enable_logging` to `false` specifies that logging is disabled.
21//
22// Ideally we'd want to instead signal disabled logging by leaving `index_logs`
23// empty. Unfortunately, we have to always provide `index_logs`, because we must
24// install the logging dataflows even on replicas that have logging disabled. See database-issues#4545.
25#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
26pub struct LoggingConfig {
27    /// The logging interval
28    pub interval: Duration,
29    /// Whether logging is enabled
30    pub enable_logging: bool,
31    /// Whether we should report logs for the log-processing dataflows
32    pub log_logging: bool,
33    /// Logs to keep in an arrangement
34    pub index_logs: BTreeMap<LogVariant, GlobalId>,
35}
36
37/// TODO(database-issues#7533): Add documentation.
38#[derive(
39    Hash,
40    Eq,
41    PartialEq,
42    Ord,
43    PartialOrd,
44    Debug,
45    Clone,
46    Copy,
47    Serialize,
48    Deserialize
49)]
50pub enum LogVariant {
51    /// TODO(database-issues#7533): Add documentation.
52    Timely(TimelyLog),
53    /// TODO(database-issues#7533): Add documentation.
54    Differential(DifferentialLog),
55    /// TODO(database-issues#7533): Add documentation.
56    Compute(ComputeLog),
57}
58
59impl From<TimelyLog> for LogVariant {
60    fn from(value: TimelyLog) -> Self {
61        Self::Timely(value)
62    }
63}
64
65impl From<DifferentialLog> for LogVariant {
66    fn from(value: DifferentialLog) -> Self {
67        Self::Differential(value)
68    }
69}
70
71impl From<ComputeLog> for LogVariant {
72    fn from(value: ComputeLog) -> Self {
73        Self::Compute(value)
74    }
75}
76
77/// TODO(database-issues#7533): Add documentation.
78#[derive(
79    Hash,
80    Eq,
81    Ord,
82    PartialEq,
83    PartialOrd,
84    Debug,
85    Clone,
86    Copy,
87    Serialize,
88    Deserialize
89)]
90pub enum TimelyLog {
91    /// TODO(database-issues#7533): Add documentation.
92    Operates,
93    /// TODO(database-issues#7533): Add documentation.
94    Channels,
95    /// TODO(database-issues#7533): Add documentation.
96    Elapsed,
97    /// TODO(database-issues#7533): Add documentation.
98    Histogram,
99    /// TODO(database-issues#7533): Add documentation.
100    Addresses,
101    /// TODO(database-issues#7533): Add documentation.
102    Parks,
103    /// TODO(database-issues#7533): Add documentation.
104    MessagesSent,
105    /// TODO(database-issues#7533): Add documentation.
106    MessagesReceived,
107    /// TODO(database-issues#7533): Add documentation.
108    Reachability,
109    /// TODO(database-issues#7533): Add documentation.
110    BatchesSent,
111    /// TODO(database-issues#7533): Add documentation.
112    BatchesReceived,
113}
114
115/// TODO(database-issues#7533): Add documentation.
116#[derive(
117    Hash,
118    Eq,
119    Ord,
120    PartialEq,
121    PartialOrd,
122    Debug,
123    Clone,
124    Copy,
125    Serialize,
126    Deserialize
127)]
128pub enum DifferentialLog {
129    /// TODO(database-issues#7533): Add documentation.
130    ArrangementBatches,
131    /// TODO(database-issues#7533): Add documentation.
132    ArrangementRecords,
133    /// TODO(database-issues#7533): Add documentation.
134    Sharing,
135    /// TODO(database-issues#7533): Add documentation.
136    BatcherRecords,
137    /// TODO(database-issues#7533): Add documentation.
138    BatcherSize,
139    /// TODO(database-issues#7533): Add documentation.
140    BatcherCapacity,
141    /// TODO(database-issues#7533): Add documentation.
142    BatcherAllocations,
143}
144
145/// Variants of compute introspection sources.
146#[derive(
147    Hash,
148    Eq,
149    PartialEq,
150    Ord,
151    PartialOrd,
152    Debug,
153    Clone,
154    Copy,
155    Serialize,
156    Deserialize
157)]
158pub enum ComputeLog {
159    /// Installed dataflow exports.
160    DataflowCurrent,
161    /// Dataflow write frontiers.
162    FrontierCurrent,
163    /// Pending peeks.
164    PeekCurrent,
165    /// A histogram over peek durations.
166    PeekDuration,
167    /// Dataflow import frontiers.
168    ImportFrontierCurrent,
169    /// Arrangement heap sizes.
170    ArrangementHeapSize,
171    /// Arrangement heap capacities.
172    ArrangementHeapCapacity,
173    /// Arrangement heap allocations.
174    ArrangementHeapAllocations,
175    /// Counts of errors in exported collections.
176    ErrorCount,
177    /// Hydration times of exported collections.
178    HydrationTime,
179    /// Hydration status of dataflow operators.
180    OperatorHydrationStatus,
181    /// Mappings from `GlobalId`/`LirId`` pairs to dataflow addresses.
182    LirMapping,
183    /// Mappings from dataflows to `GlobalId`s.
184    DataflowGlobal,
185}
186
187impl LogVariant {
188    /// By which columns should the logs be indexed.
189    ///
190    /// This is distinct from the `keys` property of the type, which indicates uniqueness.
191    /// When keys exist these are good choices for indexing, but when they do not we still
192    /// require index guidance.
193    pub fn index_by(&self) -> Vec<usize> {
194        let desc = self.desc();
195        let arity = desc.arity();
196        desc.typ()
197            .keys
198            .get(0)
199            .cloned()
200            .unwrap_or_else(|| (0..arity).collect())
201    }
202
203    /// Relation schemas for the logs.
204    ///
205    /// This types need to agree with the values that are produced
206    /// in `logging::compute::construct` and with the description in
207    /// `catalog/src/builtin.rs`.
208    pub fn desc(&self) -> RelationDesc {
209        match self {
210            LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
211                .with_column("id", SqlScalarType::UInt64.nullable(false))
212                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
213                .with_column("name", SqlScalarType::String.nullable(false))
214                .with_key(vec![0, 1])
215                .finish(),
216
217            LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
218                .with_column("id", SqlScalarType::UInt64.nullable(false))
219                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
220                .with_column("from_index", SqlScalarType::UInt64.nullable(false))
221                .with_column("from_port", SqlScalarType::UInt64.nullable(false))
222                .with_column("to_index", SqlScalarType::UInt64.nullable(false))
223                .with_column("to_port", SqlScalarType::UInt64.nullable(false))
224                .with_column("type", SqlScalarType::String.nullable(false))
225                .with_key(vec![0, 1])
226                .finish(),
227
228            LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
229                .with_column("id", SqlScalarType::UInt64.nullable(false))
230                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
231                .finish(),
232
233            LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
234                .with_column("id", SqlScalarType::UInt64.nullable(false))
235                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
236                .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
237                .finish(),
238
239            LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
240                .with_column("id", SqlScalarType::UInt64.nullable(false))
241                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
242                .with_column(
243                    "address",
244                    SqlScalarType::List {
245                        element_type: Box::new(SqlScalarType::UInt64),
246                        custom_id: None,
247                    }
248                    .nullable(false),
249                )
250                .with_key(vec![0, 1])
251                .finish(),
252
253            LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
254                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
255                .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
256                .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
257                .finish(),
258
259            LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
260                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
261                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
262                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
263                .finish(),
264
265            LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
266                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
267                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
268                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
269                .finish(),
270
271            LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
272                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
273                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
274                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
275                .finish(),
276
277            LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
278                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
279                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
280                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
281                .finish(),
282
283            LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
284                .with_column("id", SqlScalarType::UInt64.nullable(false))
285                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
286                .with_column("source", SqlScalarType::UInt64.nullable(false))
287                .with_column("port", SqlScalarType::UInt64.nullable(false))
288                .with_column("update_type", SqlScalarType::String.nullable(false))
289                .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
290                .finish(),
291
292            LogVariant::Differential(DifferentialLog::ArrangementBatches)
293            | LogVariant::Differential(DifferentialLog::ArrangementRecords)
294            | LogVariant::Differential(DifferentialLog::Sharing)
295            | LogVariant::Differential(DifferentialLog::BatcherRecords)
296            | LogVariant::Differential(DifferentialLog::BatcherSize)
297            | LogVariant::Differential(DifferentialLog::BatcherCapacity)
298            | LogVariant::Differential(DifferentialLog::BatcherAllocations)
299            | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
300            | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
301            | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
302                RelationDesc::builder()
303                    .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
304                    .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
305                    .finish()
306            }
307
308            LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
309                .with_column("export_id", SqlScalarType::String.nullable(false))
310                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
311                .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
312                .with_key(vec![0, 1])
313                .finish(),
314
315            LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
316                .with_column("export_id", SqlScalarType::String.nullable(false))
317                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
318                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
319                .with_key(vec![0, 1])
320                .finish(),
321
322            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
323                .with_column("export_id", SqlScalarType::String.nullable(false))
324                .with_column("import_id", SqlScalarType::String.nullable(false))
325                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
326                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
327                .with_key(vec![0, 1, 2])
328                .finish(),
329
330            LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
331                .with_column("id", SqlScalarType::Uuid.nullable(false))
332                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
333                .with_column("object_id", SqlScalarType::String.nullable(false))
334                .with_column("type", SqlScalarType::String.nullable(false))
335                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
336                .with_key(vec![0, 1])
337                .finish(),
338
339            LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
340                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
341                .with_column("type", SqlScalarType::String.nullable(false))
342                .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
343                .finish(),
344
345            LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
346                .with_column("export_id", SqlScalarType::String.nullable(false))
347                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
348                .with_column("count", SqlScalarType::Int64.nullable(false))
349                .with_key(vec![0, 1])
350                .finish(),
351
352            LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
353                .with_column("export_id", SqlScalarType::String.nullable(false))
354                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
355                .with_column("time_ns", SqlScalarType::UInt64.nullable(true))
356                .with_key(vec![0, 1])
357                .finish(),
358
359            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => RelationDesc::builder()
360                .with_column("export_id", SqlScalarType::String.nullable(false))
361                .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
362                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
363                .with_column("hydrated", SqlScalarType::Bool.nullable(false))
364                .with_key(vec![0, 1])
365                .finish(),
366
367            LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
368                .with_column("global_id", SqlScalarType::String.nullable(false))
369                .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
370                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
371                .with_column("operator", SqlScalarType::String.nullable(false))
372                .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
373                .with_column("nesting", SqlScalarType::UInt16.nullable(false))
374                .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
375                .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
376                .with_key(vec![0, 1, 2])
377                .finish(),
378
379            LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
380                .with_column("id", SqlScalarType::UInt64.nullable(false))
381                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
382                .with_column("global_id", SqlScalarType::String.nullable(false))
383                .with_key(vec![0, 1])
384                .finish(),
385        }
386    }
387}