mz_compute_client/
logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute layer logging configuration.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::{GlobalId, RelationDesc, SqlScalarType};
16use serde::{Deserialize, Serialize};
17
18/// Logging configuration.
19///
20/// Setting `enable_logging` to `false` specifies that logging is disabled.
21//
22// Ideally we'd want to instead signal disabled logging by leaving `index_logs`
23// empty. Unfortunately, we have to always provide `index_logs`, because we must
24// install the logging dataflows even on replicas that have logging disabled. See database-issues#4545.
25#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
26pub struct LoggingConfig {
27    /// The logging interval
28    pub interval: Duration,
29    /// Whether logging is enabled
30    pub enable_logging: bool,
31    /// Whether we should report logs for the log-processing dataflows
32    pub log_logging: bool,
33    /// Logs to keep in an arrangement
34    pub index_logs: BTreeMap<LogVariant, GlobalId>,
35}
36
37/// TODO(database-issues#7533): Add documentation.
38#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
39pub enum LogVariant {
40    /// TODO(database-issues#7533): Add documentation.
41    Timely(TimelyLog),
42    /// TODO(database-issues#7533): Add documentation.
43    Differential(DifferentialLog),
44    /// TODO(database-issues#7533): Add documentation.
45    Compute(ComputeLog),
46}
47
48impl From<TimelyLog> for LogVariant {
49    fn from(value: TimelyLog) -> Self {
50        Self::Timely(value)
51    }
52}
53
54impl From<DifferentialLog> for LogVariant {
55    fn from(value: DifferentialLog) -> Self {
56        Self::Differential(value)
57    }
58}
59
60impl From<ComputeLog> for LogVariant {
61    fn from(value: ComputeLog) -> Self {
62        Self::Compute(value)
63    }
64}
65
66/// TODO(database-issues#7533): Add documentation.
67#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
68pub enum TimelyLog {
69    /// TODO(database-issues#7533): Add documentation.
70    Operates,
71    /// TODO(database-issues#7533): Add documentation.
72    Channels,
73    /// TODO(database-issues#7533): Add documentation.
74    Elapsed,
75    /// TODO(database-issues#7533): Add documentation.
76    Histogram,
77    /// TODO(database-issues#7533): Add documentation.
78    Addresses,
79    /// TODO(database-issues#7533): Add documentation.
80    Parks,
81    /// TODO(database-issues#7533): Add documentation.
82    MessagesSent,
83    /// TODO(database-issues#7533): Add documentation.
84    MessagesReceived,
85    /// TODO(database-issues#7533): Add documentation.
86    Reachability,
87    /// TODO(database-issues#7533): Add documentation.
88    BatchesSent,
89    /// TODO(database-issues#7533): Add documentation.
90    BatchesReceived,
91}
92
93/// TODO(database-issues#7533): Add documentation.
94#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
95pub enum DifferentialLog {
96    /// TODO(database-issues#7533): Add documentation.
97    ArrangementBatches,
98    /// TODO(database-issues#7533): Add documentation.
99    ArrangementRecords,
100    /// TODO(database-issues#7533): Add documentation.
101    Sharing,
102    /// TODO(database-issues#7533): Add documentation.
103    BatcherRecords,
104    /// TODO(database-issues#7533): Add documentation.
105    BatcherSize,
106    /// TODO(database-issues#7533): Add documentation.
107    BatcherCapacity,
108    /// TODO(database-issues#7533): Add documentation.
109    BatcherAllocations,
110}
111
112/// Variants of compute introspection sources.
113#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
114pub enum ComputeLog {
115    /// Installed dataflow exports.
116    DataflowCurrent,
117    /// Dataflow write frontiers.
118    FrontierCurrent,
119    /// Pending peeks.
120    PeekCurrent,
121    /// A histogram over peek durations.
122    PeekDuration,
123    /// Dataflow import frontiers.
124    ImportFrontierCurrent,
125    /// Arrangement heap sizes.
126    ArrangementHeapSize,
127    /// Arrangement heap capacities.
128    ArrangementHeapCapacity,
129    /// Arrangement heap allocations.
130    ArrangementHeapAllocations,
131    /// Counts of errors in exported collections.
132    ErrorCount,
133    /// Hydration times of exported collections.
134    HydrationTime,
135    /// Hydration status of dataflow operators.
136    OperatorHydrationStatus,
137    /// Mappings from `GlobalId`/`LirId`` pairs to dataflow addresses.
138    LirMapping,
139    /// Mappings from dataflows to `GlobalId`s.
140    DataflowGlobal,
141}
142
143impl LogVariant {
144    /// By which columns should the logs be indexed.
145    ///
146    /// This is distinct from the `keys` property of the type, which indicates uniqueness.
147    /// When keys exist these are good choices for indexing, but when they do not we still
148    /// require index guidance.
149    pub fn index_by(&self) -> Vec<usize> {
150        let desc = self.desc();
151        let arity = desc.arity();
152        desc.typ()
153            .keys
154            .get(0)
155            .cloned()
156            .unwrap_or_else(|| (0..arity).collect())
157    }
158
159    /// Relation schemas for the logs.
160    ///
161    /// This types need to agree with the values that are produced
162    /// in `logging::compute::construct` and with the description in
163    /// `catalog/src/builtin.rs`.
164    pub fn desc(&self) -> RelationDesc {
165        match self {
166            LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
167                .with_column("id", SqlScalarType::UInt64.nullable(false))
168                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
169                .with_column("name", SqlScalarType::String.nullable(false))
170                .with_key(vec![0, 1])
171                .finish(),
172
173            LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
174                .with_column("id", SqlScalarType::UInt64.nullable(false))
175                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
176                .with_column("from_index", SqlScalarType::UInt64.nullable(false))
177                .with_column("from_port", SqlScalarType::UInt64.nullable(false))
178                .with_column("to_index", SqlScalarType::UInt64.nullable(false))
179                .with_column("to_port", SqlScalarType::UInt64.nullable(false))
180                .with_column("type", SqlScalarType::String.nullable(false))
181                .with_key(vec![0, 1])
182                .finish(),
183
184            LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
185                .with_column("id", SqlScalarType::UInt64.nullable(false))
186                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
187                .finish(),
188
189            LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
190                .with_column("id", SqlScalarType::UInt64.nullable(false))
191                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
192                .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
193                .finish(),
194
195            LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
196                .with_column("id", SqlScalarType::UInt64.nullable(false))
197                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
198                .with_column(
199                    "address",
200                    SqlScalarType::List {
201                        element_type: Box::new(SqlScalarType::UInt64),
202                        custom_id: None,
203                    }
204                    .nullable(false),
205                )
206                .with_key(vec![0, 1])
207                .finish(),
208
209            LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
210                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
211                .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
212                .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
213                .finish(),
214
215            LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
216                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
217                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
218                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
219                .finish(),
220
221            LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
222                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
223                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
224                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
225                .finish(),
226
227            LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
228                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
229                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
230                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
231                .finish(),
232
233            LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
234                .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
235                .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
236                .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
237                .finish(),
238
239            LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
240                .with_column("id", SqlScalarType::UInt64.nullable(false))
241                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
242                .with_column("source", SqlScalarType::UInt64.nullable(false))
243                .with_column("port", SqlScalarType::UInt64.nullable(false))
244                .with_column("update_type", SqlScalarType::String.nullable(false))
245                .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
246                .finish(),
247
248            LogVariant::Differential(DifferentialLog::ArrangementBatches)
249            | LogVariant::Differential(DifferentialLog::ArrangementRecords)
250            | LogVariant::Differential(DifferentialLog::Sharing)
251            | LogVariant::Differential(DifferentialLog::BatcherRecords)
252            | LogVariant::Differential(DifferentialLog::BatcherSize)
253            | LogVariant::Differential(DifferentialLog::BatcherCapacity)
254            | LogVariant::Differential(DifferentialLog::BatcherAllocations)
255            | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
256            | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
257            | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
258                RelationDesc::builder()
259                    .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
260                    .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
261                    .finish()
262            }
263
264            LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
265                .with_column("export_id", SqlScalarType::String.nullable(false))
266                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
267                .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
268                .with_key(vec![0, 1])
269                .finish(),
270
271            LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
272                .with_column("export_id", SqlScalarType::String.nullable(false))
273                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
274                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
275                .with_key(vec![0, 1])
276                .finish(),
277
278            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
279                .with_column("export_id", SqlScalarType::String.nullable(false))
280                .with_column("import_id", SqlScalarType::String.nullable(false))
281                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
282                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
283                .with_key(vec![0, 1, 2])
284                .finish(),
285
286            LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
287                .with_column("id", SqlScalarType::Uuid.nullable(false))
288                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
289                .with_column("object_id", SqlScalarType::String.nullable(false))
290                .with_column("type", SqlScalarType::String.nullable(false))
291                .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
292                .with_key(vec![0, 1])
293                .finish(),
294
295            LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
296                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
297                .with_column("type", SqlScalarType::String.nullable(false))
298                .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
299                .finish(),
300
301            LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
302                .with_column("export_id", SqlScalarType::String.nullable(false))
303                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
304                .with_column("count", SqlScalarType::Int64.nullable(false))
305                .with_key(vec![0, 1])
306                .finish(),
307
308            LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
309                .with_column("export_id", SqlScalarType::String.nullable(false))
310                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
311                .with_column("time_ns", SqlScalarType::UInt64.nullable(true))
312                .with_key(vec![0, 1])
313                .finish(),
314
315            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => RelationDesc::builder()
316                .with_column("export_id", SqlScalarType::String.nullable(false))
317                .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
318                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
319                .with_column("hydrated", SqlScalarType::Bool.nullable(false))
320                .with_key(vec![0, 1])
321                .finish(),
322
323            LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
324                .with_column("global_id", SqlScalarType::String.nullable(false))
325                .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
326                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
327                .with_column("operator", SqlScalarType::String.nullable(false))
328                .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
329                .with_column("nesting", SqlScalarType::UInt16.nullable(false))
330                .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
331                .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
332                .with_key(vec![0, 1, 2])
333                .finish(),
334
335            LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
336                .with_column("id", SqlScalarType::UInt64.nullable(false))
337                .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
338                .with_column("global_id", SqlScalarType::String.nullable(false))
339                .with_key(vec![0, 1])
340                .finish(),
341        }
342    }
343}