1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::{GlobalId, RelationDesc, SqlScalarType};
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
26pub struct LoggingConfig {
27 pub interval: Duration,
29 pub enable_logging: bool,
31 pub log_logging: bool,
33 pub index_logs: BTreeMap<LogVariant, GlobalId>,
35}
36
37#[derive(
39 Hash,
40 Eq,
41 PartialEq,
42 Ord,
43 PartialOrd,
44 Debug,
45 Clone,
46 Copy,
47 Serialize,
48 Deserialize
49)]
50pub enum LogVariant {
51 Timely(TimelyLog),
53 Differential(DifferentialLog),
55 Compute(ComputeLog),
57}
58
59impl From<TimelyLog> for LogVariant {
60 fn from(value: TimelyLog) -> Self {
61 Self::Timely(value)
62 }
63}
64
65impl From<DifferentialLog> for LogVariant {
66 fn from(value: DifferentialLog) -> Self {
67 Self::Differential(value)
68 }
69}
70
71impl From<ComputeLog> for LogVariant {
72 fn from(value: ComputeLog) -> Self {
73 Self::Compute(value)
74 }
75}
76
77#[derive(
79 Hash,
80 Eq,
81 Ord,
82 PartialEq,
83 PartialOrd,
84 Debug,
85 Clone,
86 Copy,
87 Serialize,
88 Deserialize
89)]
90pub enum TimelyLog {
91 Operates,
93 Channels,
95 Elapsed,
97 Histogram,
99 Addresses,
101 Parks,
103 MessagesSent,
105 MessagesReceived,
107 Reachability,
109 BatchesSent,
111 BatchesReceived,
113}
114
115#[derive(
117 Hash,
118 Eq,
119 Ord,
120 PartialEq,
121 PartialOrd,
122 Debug,
123 Clone,
124 Copy,
125 Serialize,
126 Deserialize
127)]
128pub enum DifferentialLog {
129 ArrangementBatches,
131 ArrangementRecords,
133 Sharing,
135 BatcherRecords,
137 BatcherSize,
139 BatcherCapacity,
141 BatcherAllocations,
143}
144
145#[derive(
147 Hash,
148 Eq,
149 PartialEq,
150 Ord,
151 PartialOrd,
152 Debug,
153 Clone,
154 Copy,
155 Serialize,
156 Deserialize
157)]
158pub enum ComputeLog {
159 DataflowCurrent,
161 FrontierCurrent,
163 PeekCurrent,
165 PeekDuration,
167 ImportFrontierCurrent,
169 ArrangementHeapSize,
171 ArrangementHeapCapacity,
173 ArrangementHeapAllocations,
175 ErrorCount,
177 HydrationTime,
179 OperatorHydrationStatus,
181 LirMapping,
183 DataflowGlobal,
185}
186
187impl LogVariant {
188 pub fn index_by(&self) -> Vec<usize> {
194 let desc = self.desc();
195 let arity = desc.arity();
196 desc.typ()
197 .keys
198 .get(0)
199 .cloned()
200 .unwrap_or_else(|| (0..arity).collect())
201 }
202
203 pub fn desc(&self) -> RelationDesc {
209 match self {
210 LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
211 .with_column("id", SqlScalarType::UInt64.nullable(false))
212 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
213 .with_column("name", SqlScalarType::String.nullable(false))
214 .with_key(vec![0, 1])
215 .finish(),
216
217 LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
218 .with_column("id", SqlScalarType::UInt64.nullable(false))
219 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
220 .with_column("from_index", SqlScalarType::UInt64.nullable(false))
221 .with_column("from_port", SqlScalarType::UInt64.nullable(false))
222 .with_column("to_index", SqlScalarType::UInt64.nullable(false))
223 .with_column("to_port", SqlScalarType::UInt64.nullable(false))
224 .with_column("type", SqlScalarType::String.nullable(false))
225 .with_key(vec![0, 1])
226 .finish(),
227
228 LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
229 .with_column("id", SqlScalarType::UInt64.nullable(false))
230 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
231 .finish(),
232
233 LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
234 .with_column("id", SqlScalarType::UInt64.nullable(false))
235 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
236 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
237 .finish(),
238
239 LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
240 .with_column("id", SqlScalarType::UInt64.nullable(false))
241 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
242 .with_column(
243 "address",
244 SqlScalarType::List {
245 element_type: Box::new(SqlScalarType::UInt64),
246 custom_id: None,
247 }
248 .nullable(false),
249 )
250 .with_key(vec![0, 1])
251 .finish(),
252
253 LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
254 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
255 .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
256 .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
257 .finish(),
258
259 LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
260 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
261 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
262 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
263 .finish(),
264
265 LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
266 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
267 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
268 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
269 .finish(),
270
271 LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
272 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
273 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
274 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
275 .finish(),
276
277 LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
278 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
279 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
280 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
281 .finish(),
282
283 LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
284 .with_column("id", SqlScalarType::UInt64.nullable(false))
285 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
286 .with_column("source", SqlScalarType::UInt64.nullable(false))
287 .with_column("port", SqlScalarType::UInt64.nullable(false))
288 .with_column("update_type", SqlScalarType::String.nullable(false))
289 .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
290 .finish(),
291
292 LogVariant::Differential(DifferentialLog::ArrangementBatches)
293 | LogVariant::Differential(DifferentialLog::ArrangementRecords)
294 | LogVariant::Differential(DifferentialLog::Sharing)
295 | LogVariant::Differential(DifferentialLog::BatcherRecords)
296 | LogVariant::Differential(DifferentialLog::BatcherSize)
297 | LogVariant::Differential(DifferentialLog::BatcherCapacity)
298 | LogVariant::Differential(DifferentialLog::BatcherAllocations)
299 | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
300 | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
301 | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
302 RelationDesc::builder()
303 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
304 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
305 .finish()
306 }
307
308 LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
309 .with_column("export_id", SqlScalarType::String.nullable(false))
310 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
311 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
312 .with_key(vec![0, 1])
313 .finish(),
314
315 LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
316 .with_column("export_id", SqlScalarType::String.nullable(false))
317 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
318 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
319 .with_key(vec![0, 1])
320 .finish(),
321
322 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
323 .with_column("export_id", SqlScalarType::String.nullable(false))
324 .with_column("import_id", SqlScalarType::String.nullable(false))
325 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
326 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
327 .with_key(vec![0, 1, 2])
328 .finish(),
329
330 LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
331 .with_column("id", SqlScalarType::Uuid.nullable(false))
332 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
333 .with_column("object_id", SqlScalarType::String.nullable(false))
334 .with_column("type", SqlScalarType::String.nullable(false))
335 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
336 .with_key(vec![0, 1])
337 .finish(),
338
339 LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
340 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
341 .with_column("type", SqlScalarType::String.nullable(false))
342 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
343 .finish(),
344
345 LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
346 .with_column("export_id", SqlScalarType::String.nullable(false))
347 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
348 .with_column("count", SqlScalarType::Int64.nullable(false))
349 .with_key(vec![0, 1])
350 .finish(),
351
352 LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
353 .with_column("export_id", SqlScalarType::String.nullable(false))
354 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
355 .with_column("time_ns", SqlScalarType::UInt64.nullable(true))
356 .with_key(vec![0, 1])
357 .finish(),
358
359 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => RelationDesc::builder()
360 .with_column("export_id", SqlScalarType::String.nullable(false))
361 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
362 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
363 .with_column("hydrated", SqlScalarType::Bool.nullable(false))
364 .with_key(vec![0, 1])
365 .finish(),
366
367 LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
368 .with_column("global_id", SqlScalarType::String.nullable(false))
369 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
370 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
371 .with_column("operator", SqlScalarType::String.nullable(false))
372 .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
373 .with_column("nesting", SqlScalarType::UInt16.nullable(false))
374 .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
375 .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
376 .with_key(vec![0, 1, 2])
377 .finish(),
378
379 LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
380 .with_column("id", SqlScalarType::UInt64.nullable(false))
381 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
382 .with_column("global_id", SqlScalarType::String.nullable(false))
383 .with_key(vec![0, 1])
384 .finish(),
385 }
386 }
387}