1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::{GlobalId, RelationDesc, SqlScalarType};
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
26pub struct LoggingConfig {
27 pub interval: Duration,
29 pub enable_logging: bool,
31 pub log_logging: bool,
33 pub index_logs: BTreeMap<LogVariant, GlobalId>,
35}
36
37#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
39pub enum LogVariant {
40 Timely(TimelyLog),
42 Differential(DifferentialLog),
44 Compute(ComputeLog),
46}
47
48impl From<TimelyLog> for LogVariant {
49 fn from(value: TimelyLog) -> Self {
50 Self::Timely(value)
51 }
52}
53
54impl From<DifferentialLog> for LogVariant {
55 fn from(value: DifferentialLog) -> Self {
56 Self::Differential(value)
57 }
58}
59
60impl From<ComputeLog> for LogVariant {
61 fn from(value: ComputeLog) -> Self {
62 Self::Compute(value)
63 }
64}
65
66#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
68pub enum TimelyLog {
69 Operates,
71 Channels,
73 Elapsed,
75 Histogram,
77 Addresses,
79 Parks,
81 MessagesSent,
83 MessagesReceived,
85 Reachability,
87 BatchesSent,
89 BatchesReceived,
91}
92
93#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
95pub enum DifferentialLog {
96 ArrangementBatches,
98 ArrangementRecords,
100 Sharing,
102 BatcherRecords,
104 BatcherSize,
106 BatcherCapacity,
108 BatcherAllocations,
110}
111
112#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
114pub enum ComputeLog {
115 DataflowCurrent,
117 FrontierCurrent,
119 PeekCurrent,
121 PeekDuration,
123 ImportFrontierCurrent,
125 ArrangementHeapSize,
127 ArrangementHeapCapacity,
129 ArrangementHeapAllocations,
131 ShutdownDuration,
133 ErrorCount,
135 HydrationTime,
137 OperatorHydrationStatus,
139 LirMapping,
141 DataflowGlobal,
143}
144
145impl LogVariant {
146 pub fn index_by(&self) -> Vec<usize> {
152 let desc = self.desc();
153 let arity = desc.arity();
154 desc.typ()
155 .keys
156 .get(0)
157 .cloned()
158 .unwrap_or_else(|| (0..arity).collect())
159 }
160
161 pub fn desc(&self) -> RelationDesc {
167 match self {
168 LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
169 .with_column("id", SqlScalarType::UInt64.nullable(false))
170 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
171 .with_column("name", SqlScalarType::String.nullable(false))
172 .with_key(vec![0, 1])
173 .finish(),
174
175 LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
176 .with_column("id", SqlScalarType::UInt64.nullable(false))
177 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
178 .with_column("from_index", SqlScalarType::UInt64.nullable(false))
179 .with_column("from_port", SqlScalarType::UInt64.nullable(false))
180 .with_column("to_index", SqlScalarType::UInt64.nullable(false))
181 .with_column("to_port", SqlScalarType::UInt64.nullable(false))
182 .with_column("type", SqlScalarType::String.nullable(false))
183 .with_key(vec![0, 1])
184 .finish(),
185
186 LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
187 .with_column("id", SqlScalarType::UInt64.nullable(false))
188 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
189 .finish(),
190
191 LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
192 .with_column("id", SqlScalarType::UInt64.nullable(false))
193 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
194 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
195 .finish(),
196
197 LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
198 .with_column("id", SqlScalarType::UInt64.nullable(false))
199 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
200 .with_column(
201 "address",
202 SqlScalarType::List {
203 element_type: Box::new(SqlScalarType::UInt64),
204 custom_id: None,
205 }
206 .nullable(false),
207 )
208 .with_key(vec![0, 1])
209 .finish(),
210
211 LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
212 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
213 .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
214 .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
215 .finish(),
216
217 LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
218 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
219 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
220 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
221 .finish(),
222
223 LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
224 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
225 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
226 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
227 .finish(),
228
229 LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
230 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
231 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
232 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
233 .finish(),
234
235 LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
236 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
237 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
238 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
239 .finish(),
240
241 LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
242 .with_column("id", SqlScalarType::UInt64.nullable(false))
243 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
244 .with_column("source", SqlScalarType::UInt64.nullable(false))
245 .with_column("port", SqlScalarType::UInt64.nullable(false))
246 .with_column("update_type", SqlScalarType::String.nullable(false))
247 .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
248 .finish(),
249
250 LogVariant::Differential(DifferentialLog::ArrangementBatches)
251 | LogVariant::Differential(DifferentialLog::ArrangementRecords)
252 | LogVariant::Differential(DifferentialLog::Sharing)
253 | LogVariant::Differential(DifferentialLog::BatcherRecords)
254 | LogVariant::Differential(DifferentialLog::BatcherSize)
255 | LogVariant::Differential(DifferentialLog::BatcherCapacity)
256 | LogVariant::Differential(DifferentialLog::BatcherAllocations)
257 | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
258 | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
259 | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
260 RelationDesc::builder()
261 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
262 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
263 .finish()
264 }
265
266 LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
267 .with_column("export_id", SqlScalarType::String.nullable(false))
268 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
269 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
270 .with_key(vec![0, 1])
271 .finish(),
272
273 LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
274 .with_column("export_id", SqlScalarType::String.nullable(false))
275 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
276 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
277 .with_key(vec![0, 1])
278 .finish(),
279
280 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
281 .with_column("export_id", SqlScalarType::String.nullable(false))
282 .with_column("import_id", SqlScalarType::String.nullable(false))
283 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
284 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
285 .with_key(vec![0, 1, 2])
286 .finish(),
287
288 LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
289 .with_column("id", SqlScalarType::Uuid.nullable(false))
290 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
291 .with_column("object_id", SqlScalarType::String.nullable(false))
292 .with_column("type", SqlScalarType::String.nullable(false))
293 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
294 .with_key(vec![0, 1])
295 .finish(),
296
297 LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
298 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
299 .with_column("type", SqlScalarType::String.nullable(false))
300 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
301 .finish(),
302
303 LogVariant::Compute(ComputeLog::ShutdownDuration) => RelationDesc::builder()
304 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
305 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
306 .finish(),
307
308 LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
309 .with_column("export_id", SqlScalarType::String.nullable(false))
310 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
311 .with_column("count", SqlScalarType::Int64.nullable(false))
312 .with_key(vec![0, 1])
313 .finish(),
314
315 LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
316 .with_column("export_id", SqlScalarType::String.nullable(false))
317 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
318 .with_column("time_ns", SqlScalarType::UInt64.nullable(true))
319 .with_key(vec![0, 1])
320 .finish(),
321
322 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => RelationDesc::builder()
323 .with_column("export_id", SqlScalarType::String.nullable(false))
324 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
325 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
326 .with_column("hydrated", SqlScalarType::Bool.nullable(false))
327 .with_key(vec![0, 1])
328 .finish(),
329
330 LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
331 .with_column("global_id", SqlScalarType::String.nullable(false))
332 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
333 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
334 .with_column("operator", SqlScalarType::String.nullable(false))
335 .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
336 .with_column("nesting", SqlScalarType::UInt16.nullable(false))
337 .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
338 .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
339 .with_key(vec![0, 1, 2])
340 .finish(),
341
342 LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
343 .with_column("id", SqlScalarType::UInt64.nullable(false))
344 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
345 .with_column("global_id", SqlScalarType::String.nullable(false))
346 .with_key(vec![0, 1])
347 .finish(),
348 }
349 }
350}