1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::{GlobalId, RelationDesc, SqlScalarType};
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
26pub struct LoggingConfig {
27 pub interval: Duration,
29 pub enable_logging: bool,
31 pub log_logging: bool,
33 pub index_logs: BTreeMap<LogVariant, GlobalId>,
35}
36
37#[derive(
39 Hash,
40 Eq,
41 PartialEq,
42 Ord,
43 PartialOrd,
44 Debug,
45 Clone,
46 Copy,
47 Serialize,
48 Deserialize
49)]
50pub enum LogVariant {
51 Timely(TimelyLog),
53 Differential(DifferentialLog),
55 Compute(ComputeLog),
57}
58
59impl From<TimelyLog> for LogVariant {
60 fn from(value: TimelyLog) -> Self {
61 Self::Timely(value)
62 }
63}
64
65impl From<DifferentialLog> for LogVariant {
66 fn from(value: DifferentialLog) -> Self {
67 Self::Differential(value)
68 }
69}
70
71impl From<ComputeLog> for LogVariant {
72 fn from(value: ComputeLog) -> Self {
73 Self::Compute(value)
74 }
75}
76
77#[derive(
79 Hash,
80 Eq,
81 Ord,
82 PartialEq,
83 PartialOrd,
84 Debug,
85 Clone,
86 Copy,
87 Serialize,
88 Deserialize
89)]
90pub enum TimelyLog {
91 Operates,
93 Channels,
95 Elapsed,
97 Histogram,
99 Addresses,
101 Parks,
103 MessagesSent,
105 MessagesReceived,
107 Reachability,
109 BatchesSent,
111 BatchesReceived,
113}
114
115#[derive(
117 Hash,
118 Eq,
119 Ord,
120 PartialEq,
121 PartialOrd,
122 Debug,
123 Clone,
124 Copy,
125 Serialize,
126 Deserialize
127)]
128pub enum DifferentialLog {
129 ArrangementBatches,
131 ArrangementRecords,
133 Sharing,
135 BatcherRecords,
137 BatcherSize,
139 BatcherCapacity,
141 BatcherAllocations,
143}
144
145#[derive(
147 Hash,
148 Eq,
149 PartialEq,
150 Ord,
151 PartialOrd,
152 Debug,
153 Clone,
154 Copy,
155 Serialize,
156 Deserialize
157)]
158pub enum ComputeLog {
159 DataflowCurrent,
161 FrontierCurrent,
163 PeekCurrent,
165 PeekDuration,
167 ImportFrontierCurrent,
169 ArrangementHeapSize,
171 ArrangementHeapCapacity,
173 ArrangementHeapAllocations,
175 ErrorCount,
177 HydrationTime,
179 OperatorHydrationStatus,
181 LirMapping,
183 DataflowGlobal,
185 PrometheusMetrics,
187}
188
189impl LogVariant {
190 pub fn index_by(&self) -> Vec<usize> {
196 let desc = self.desc();
197 let arity = desc.arity();
198 desc.typ()
199 .keys
200 .get(0)
201 .cloned()
202 .unwrap_or_else(|| (0..arity).collect())
203 }
204
205 pub fn desc(&self) -> RelationDesc {
211 match self {
212 LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
213 .with_column("id", SqlScalarType::UInt64.nullable(false))
214 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
215 .with_column("name", SqlScalarType::String.nullable(false))
216 .with_key(vec![0, 1])
217 .finish(),
218
219 LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
220 .with_column("id", SqlScalarType::UInt64.nullable(false))
221 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
222 .with_column("from_index", SqlScalarType::UInt64.nullable(false))
223 .with_column("from_port", SqlScalarType::UInt64.nullable(false))
224 .with_column("to_index", SqlScalarType::UInt64.nullable(false))
225 .with_column("to_port", SqlScalarType::UInt64.nullable(false))
226 .with_column("type", SqlScalarType::String.nullable(false))
227 .with_key(vec![0, 1])
228 .finish(),
229
230 LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
231 .with_column("id", SqlScalarType::UInt64.nullable(false))
232 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
233 .finish(),
234
235 LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
236 .with_column("id", SqlScalarType::UInt64.nullable(false))
237 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
238 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
239 .finish(),
240
241 LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
242 .with_column("id", SqlScalarType::UInt64.nullable(false))
243 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
244 .with_column(
245 "address",
246 SqlScalarType::List {
247 element_type: Box::new(SqlScalarType::UInt64),
248 custom_id: None,
249 }
250 .nullable(false),
251 )
252 .with_key(vec![0, 1])
253 .finish(),
254
255 LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
256 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
257 .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
258 .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
259 .finish(),
260
261 LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
262 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
263 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
264 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
265 .finish(),
266
267 LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
268 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
269 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
270 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
271 .finish(),
272
273 LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
274 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
275 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
276 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
277 .finish(),
278
279 LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
280 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
281 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
282 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
283 .finish(),
284
285 LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
286 .with_column("id", SqlScalarType::UInt64.nullable(false))
287 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
288 .with_column("source", SqlScalarType::UInt64.nullable(false))
289 .with_column("port", SqlScalarType::UInt64.nullable(false))
290 .with_column("update_type", SqlScalarType::String.nullable(false))
291 .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
292 .finish(),
293
294 LogVariant::Differential(DifferentialLog::ArrangementBatches)
295 | LogVariant::Differential(DifferentialLog::ArrangementRecords)
296 | LogVariant::Differential(DifferentialLog::Sharing)
297 | LogVariant::Differential(DifferentialLog::BatcherRecords)
298 | LogVariant::Differential(DifferentialLog::BatcherSize)
299 | LogVariant::Differential(DifferentialLog::BatcherCapacity)
300 | LogVariant::Differential(DifferentialLog::BatcherAllocations)
301 | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
302 | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
303 | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
304 RelationDesc::builder()
305 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
306 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
307 .finish()
308 }
309
310 LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
311 .with_column("export_id", SqlScalarType::String.nullable(false))
312 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
313 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
314 .with_key(vec![0, 1])
315 .finish(),
316
317 LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
318 .with_column("export_id", SqlScalarType::String.nullable(false))
319 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
320 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
321 .with_key(vec![0, 1])
322 .finish(),
323
324 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
325 .with_column("export_id", SqlScalarType::String.nullable(false))
326 .with_column("import_id", SqlScalarType::String.nullable(false))
327 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
328 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
329 .with_key(vec![0, 1, 2])
330 .finish(),
331
332 LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
333 .with_column("id", SqlScalarType::Uuid.nullable(false))
334 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
335 .with_column("object_id", SqlScalarType::String.nullable(false))
336 .with_column("type", SqlScalarType::String.nullable(false))
337 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
338 .with_key(vec![0, 1])
339 .finish(),
340
341 LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
342 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
343 .with_column("type", SqlScalarType::String.nullable(false))
344 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
345 .finish(),
346
347 LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
348 .with_column("export_id", SqlScalarType::String.nullable(false))
349 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
350 .with_column("count", SqlScalarType::Int64.nullable(false))
351 .with_key(vec![0, 1])
352 .finish(),
353
354 LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
355 .with_column("export_id", SqlScalarType::String.nullable(false))
356 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
357 .with_column("time_ns", SqlScalarType::UInt64.nullable(true))
358 .with_key(vec![0, 1])
359 .finish(),
360
361 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => RelationDesc::builder()
362 .with_column("export_id", SqlScalarType::String.nullable(false))
363 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
364 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
365 .with_column("hydrated", SqlScalarType::Bool.nullable(false))
366 .with_key(vec![0, 1])
367 .finish(),
368
369 LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
370 .with_column("global_id", SqlScalarType::String.nullable(false))
371 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
372 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
373 .with_column("operator", SqlScalarType::String.nullable(false))
374 .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
375 .with_column("nesting", SqlScalarType::UInt16.nullable(false))
376 .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
377 .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
378 .with_key(vec![0, 1, 2])
379 .finish(),
380
381 LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
382 .with_column("id", SqlScalarType::UInt64.nullable(false))
383 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
384 .with_column("global_id", SqlScalarType::String.nullable(false))
385 .with_key(vec![0, 1])
386 .finish(),
387
388 LogVariant::Compute(ComputeLog::PrometheusMetrics) => RelationDesc::builder()
389 .with_column("process_id", SqlScalarType::UInt64.nullable(false))
390 .with_column("metric_name", SqlScalarType::String.nullable(false))
391 .with_column("metric_type", SqlScalarType::String.nullable(false))
392 .with_column(
393 "labels",
394 SqlScalarType::Map {
395 value_type: Box::new(SqlScalarType::String),
396 custom_id: None,
397 }
398 .nullable(false),
399 )
400 .with_column("value", SqlScalarType::Float64.nullable(false))
401 .with_column("help", SqlScalarType::String.nullable(false))
402 .with_key(vec![0, 1, 3])
403 .finish(),
404 }
405 }
406}