mz_compute_client/
logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute layer logging configuration.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::{GlobalId, RelationDesc, SqlScalarType};
16use serde::{Deserialize, Serialize};
17
18/// Logging configuration.
19///
20/// Setting `enable_logging` to `false` specifies that logging is disabled.
21//
22// Ideally we'd want to instead signal disabled logging by leaving `index_logs`
23// empty. Unfortunately, we have to always provide `index_logs`, because we must
24// install the logging dataflows even on replicas that have logging disabled. See database-issues#4545.
25#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
26pub struct LoggingConfig {
27    /// The logging interval
28    pub interval: Duration,
29    /// Whether logging is enabled
30    pub enable_logging: bool,
31    /// Whether we should report logs for the log-processing dataflows
32    pub log_logging: bool,
33    /// Logs to keep in an arrangement
34    pub index_logs: BTreeMap<LogVariant, GlobalId>,
35}
36
37/// TODO(database-issues#7533): Add documentation.
38#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
39pub enum LogVariant {
40    /// TODO(database-issues#7533): Add documentation.
41    Timely(TimelyLog),
42    /// TODO(database-issues#7533): Add documentation.
43    Differential(DifferentialLog),
44    /// TODO(database-issues#7533): Add documentation.
45    Compute(ComputeLog),
46}
47
48impl From<TimelyLog> for LogVariant {
49    fn from(value: TimelyLog) -> Self {
50        Self::Timely(value)
51    }
52}
53
54impl From<DifferentialLog> for LogVariant {
55    fn from(value: DifferentialLog) -> Self {
56        Self::Differential(value)
57    }
58}
59
60impl From<ComputeLog> for LogVariant {
61    fn from(value: ComputeLog) -> Self {
62        Self::Compute(value)
63    }
64}
65
66/// TODO(database-issues#7533): Add documentation.
67#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
68pub enum TimelyLog {
69    /// TODO(database-issues#7533): Add documentation.
70    Operates,
71    /// TODO(database-issues#7533): Add documentation.
72    Channels,
73    /// TODO(database-issues#7533): Add documentation.
74    Elapsed,
75    /// TODO(database-issues#7533): Add documentation.
76    Histogram,
77    /// TODO(database-issues#7533): Add documentation.
78    Addresses,
79    /// TODO(database-issues#7533): Add documentation.
80    Parks,
81    /// TODO(database-issues#7533): Add documentation.
82    MessagesSent,
83    /// TODO(database-issues#7533): Add documentation.
84    MessagesReceived,
85    /// TODO(database-issues#7533): Add documentation.
86    Reachability,
87    /// TODO(database-issues#7533): Add documentation.
88    BatchesSent,
89    /// TODO(database-issues#7533): Add documentation.
90    BatchesReceived,
91}
92
93/// TODO(database-issues#7533): Add documentation.
94#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
95pub enum DifferentialLog {
96    /// TODO(database-issues#7533): Add documentation.
97    ArrangementBatches,
98    /// TODO(database-issues#7533): Add documentation.
99    ArrangementRecords,
100    /// TODO(database-issues#7533): Add documentation.
101    Sharing,
102    /// TODO(database-issues#7533): Add documentation.
103    BatcherRecords,
104    /// TODO(database-issues#7533): Add documentation.
105    BatcherSize,
106    /// TODO(database-issues#7533): Add documentation.
107    BatcherCapacity,
108    /// TODO(database-issues#7533): Add documentation.
109    BatcherAllocations,
110}
111
112/// Variants of compute introspection sources.
113#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
114pub enum ComputeLog {
115    /// Installed dataflow exports.
116    DataflowCurrent,
117    /// Dataflow write frontiers.
118    FrontierCurrent,
119    /// Pending peeks.
120    PeekCurrent,
121    /// A histogram over peek durations.
122    PeekDuration,
123    /// Dataflow import frontiers.
124    ImportFrontierCurrent,
125    /// Arrangement heap sizes.
126    ArrangementHeapSize,
127    /// Arrangement heap capacities.
128    ArrangementHeapCapacity,
129    /// Arrangement heap allocations.
130    ArrangementHeapAllocations,
131    /// A histogram over dataflow shutdown durations.
132    ShutdownDuration,
133    /// Counts of errors in exported collections.
134    ErrorCount,
135    /// Hydration times of exported collections.
136    HydrationTime,
137    /// Hydration status of dataflow operators.
138    OperatorHydrationStatus,
139    /// Mappings from `GlobalId`/`LirId`` pairs to dataflow addresses.
140    LirMapping,
141    /// Mappings from dataflows to `GlobalId`s.
142    DataflowGlobal,
143}
144
145impl LogVariant {
146    /// By which columns should the logs be indexed.
147    ///
148    /// This is distinct from the `keys` property of the type, which indicates uniqueness.
149    /// When keys exist these are good choices for indexing, but when they do not we still
150    /// require index guidance.
151    pub fn index_by(&self) -> Vec<usize> {
152        let desc = self.desc();
153        let arity = desc.arity();
154        desc.typ()
155            .keys
156            .get(0)
157            .cloned()
158            .unwrap_or_else(|| (0..arity).collect())
159    }
160
161    /// Relation schemas for the logs.
162    ///
163    /// This types need to agree with the values that are produced
164    /// in `logging::compute::construct` and with the description in
165    /// `catalog/src/builtin.rs`.
166    pub fn desc(&self) -> RelationDesc {
167        match self {
168            LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
169                .with_column("id", SqlScalarType::UInt64.nullable(false))
170                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
171                .with_column("name", SqlScalarType::String.nullable(false))
172                .with_key(vec![0, 1])
173                .finish(),
174
175            LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
176                .with_column("id", SqlScalarType::UInt64.nullable(false))
177                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
178                .with_column("from_index", SqlScalarType::UInt64.nullable(false))
179                .with_column("from_port", SqlScalarType::UInt64.nullable(false))
180                .with_column("to_index", SqlScalarType::UInt64.nullable(false))
181                .with_column("to_port", SqlScalarType::UInt64.nullable(false))
182                .with_column("type", SqlScalarType::String.nullable(false))
183                .with_key(vec![0, 1])
184                .finish(),
185
186            LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
187                .with_column("id", SqlScalarType::UInt64.nullable(false))
188                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
189                .finish(),
190
191            LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
192                .with_column("id", SqlScalarType::UInt64.nullable(false))
193                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
194                .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
195                .finish(),
196
197            LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
198                .with_column("id", SqlScalarType::UInt64.nullable(false))
199                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
200                .with_column(
201                    "address",
202                    SqlScalarType::List {
203                        element_type: Box::new(SqlScalarType::UInt64),
204                        custom_id: None,
205                    }
206                    .nullable(false),
207                )
208                .with_key(vec![0, 1])
209                .finish(),
210
211            LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
212                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
213                .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
214                .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
215                .finish(),
216
217            LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
218                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
219                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
220                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
221                .finish(),
222
223            LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
224                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
225                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
226                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
227                .finish(),
228
229            LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
230                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
231                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
232                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
233                .finish(),
234
235            LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
236                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
237                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
238                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
239                .finish(),
240
241            LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
242                .with_column("id", SqlScalarType::UInt64.nullable(false))
243                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
244                .with_column("source", SqlScalarType::UInt64.nullable(false))
245                .with_column("port", SqlScalarType::UInt64.nullable(false))
246                .with_column("update_type", SqlScalarType::String.nullable(false))
247                .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
248                .finish(),
249
250            LogVariant::Differential(DifferentialLog::ArrangementBatches)
251            | LogVariant::Differential(DifferentialLog::ArrangementRecords)
252            | LogVariant::Differential(DifferentialLog::Sharing)
253            | LogVariant::Differential(DifferentialLog::BatcherRecords)
254            | LogVariant::Differential(DifferentialLog::BatcherSize)
255            | LogVariant::Differential(DifferentialLog::BatcherCapacity)
256            | LogVariant::Differential(DifferentialLog::BatcherAllocations)
257            | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
258            | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
259            | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
260                RelationDesc::builder()
261                    .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
262                    .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
263                    .finish()
264            }
265
266            LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
267                .with_column("export_id", SqlScalarType::String.nullable(false))
268                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
269                .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
270                .with_key(vec![0, 1])
271                .finish(),
272
273            LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
274                .with_column("export_id", SqlScalarType::String.nullable(false))
275                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
276                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
277                .with_key(vec![0, 1])
278                .finish(),
279
280            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
281                .with_column("export_id", SqlScalarType::String.nullable(false))
282                .with_column("import_id", SqlScalarType::String.nullable(false))
283                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
284                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
285                .with_key(vec![0, 1, 2])
286                .finish(),
287
288            LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
289                .with_column("id", SqlScalarType::Uuid.nullable(false))
290                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
291                .with_column("object_id", SqlScalarType::String.nullable(false))
292                .with_column("type", SqlScalarType::String.nullable(false))
293                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
294                .with_key(vec![0, 1])
295                .finish(),
296
297            LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
298                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
299                .with_column("type", SqlScalarType::String.nullable(false))
300                .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
301                .finish(),
302
303            LogVariant::Compute(ComputeLog::ShutdownDuration) => RelationDesc::builder()
304                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
305                .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
306                .finish(),
307
308            LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
309                .with_column("export_id", SqlScalarType::String.nullable(false))
310                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
311                .with_column("count", SqlScalarType::Int64.nullable(false))
312                .with_key(vec![0, 1])
313                .finish(),
314
315            LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
316                .with_column("export_id", SqlScalarType::String.nullable(false))
317                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
318                .with_column("time_ns", SqlScalarType::UInt64.nullable(true))
319                .with_key(vec![0, 1])
320                .finish(),
321
322            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => RelationDesc::builder()
323                .with_column("export_id", SqlScalarType::String.nullable(false))
324                .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
325                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
326                .with_column("hydrated", SqlScalarType::Bool.nullable(false))
327                .with_key(vec![0, 1])
328                .finish(),
329
330            LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
331                .with_column("global_id", SqlScalarType::String.nullable(false))
332                .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
333                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
334                .with_column("operator", SqlScalarType::String.nullable(false))
335                .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
336                .with_column("nesting", SqlScalarType::UInt16.nullable(false))
337                .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
338                .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
339                .with_key(vec![0, 1, 2])
340                .finish(),
341
342            LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
343                .with_column("id", SqlScalarType::UInt64.nullable(false))
344                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
345                .with_column("global_id", SqlScalarType::String.nullable(false))
346                .with_key(vec![0, 1])
347                .finish(),
348        }
349    }
350}