1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_repr::{GlobalId, RelationDesc, SqlScalarType};
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
26pub struct LoggingConfig {
27 pub interval: Duration,
29 pub enable_logging: bool,
31 pub log_logging: bool,
33 pub index_logs: BTreeMap<LogVariant, GlobalId>,
35}
36
37#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
39pub enum LogVariant {
40 Timely(TimelyLog),
42 Differential(DifferentialLog),
44 Compute(ComputeLog),
46}
47
48impl From<TimelyLog> for LogVariant {
49 fn from(value: TimelyLog) -> Self {
50 Self::Timely(value)
51 }
52}
53
54impl From<DifferentialLog> for LogVariant {
55 fn from(value: DifferentialLog) -> Self {
56 Self::Differential(value)
57 }
58}
59
60impl From<ComputeLog> for LogVariant {
61 fn from(value: ComputeLog) -> Self {
62 Self::Compute(value)
63 }
64}
65
66#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
68pub enum TimelyLog {
69 Operates,
71 Channels,
73 Elapsed,
75 Histogram,
77 Addresses,
79 Parks,
81 MessagesSent,
83 MessagesReceived,
85 Reachability,
87 BatchesSent,
89 BatchesReceived,
91}
92
93#[derive(Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
95pub enum DifferentialLog {
96 ArrangementBatches,
98 ArrangementRecords,
100 Sharing,
102 BatcherRecords,
104 BatcherSize,
106 BatcherCapacity,
108 BatcherAllocations,
110}
111
112#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize)]
114pub enum ComputeLog {
115 DataflowCurrent,
117 FrontierCurrent,
119 PeekCurrent,
121 PeekDuration,
123 ImportFrontierCurrent,
125 ArrangementHeapSize,
127 ArrangementHeapCapacity,
129 ArrangementHeapAllocations,
131 ErrorCount,
133 HydrationTime,
135 OperatorHydrationStatus,
137 LirMapping,
139 DataflowGlobal,
141}
142
143impl LogVariant {
144 pub fn index_by(&self) -> Vec<usize> {
150 let desc = self.desc();
151 let arity = desc.arity();
152 desc.typ()
153 .keys
154 .get(0)
155 .cloned()
156 .unwrap_or_else(|| (0..arity).collect())
157 }
158
159 pub fn desc(&self) -> RelationDesc {
165 match self {
166 LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
167 .with_column("id", SqlScalarType::UInt64.nullable(false))
168 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
169 .with_column("name", SqlScalarType::String.nullable(false))
170 .with_key(vec![0, 1])
171 .finish(),
172
173 LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
174 .with_column("id", SqlScalarType::UInt64.nullable(false))
175 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
176 .with_column("from_index", SqlScalarType::UInt64.nullable(false))
177 .with_column("from_port", SqlScalarType::UInt64.nullable(false))
178 .with_column("to_index", SqlScalarType::UInt64.nullable(false))
179 .with_column("to_port", SqlScalarType::UInt64.nullable(false))
180 .with_column("type", SqlScalarType::String.nullable(false))
181 .with_key(vec![0, 1])
182 .finish(),
183
184 LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
185 .with_column("id", SqlScalarType::UInt64.nullable(false))
186 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
187 .finish(),
188
189 LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
190 .with_column("id", SqlScalarType::UInt64.nullable(false))
191 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
192 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
193 .finish(),
194
195 LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
196 .with_column("id", SqlScalarType::UInt64.nullable(false))
197 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
198 .with_column(
199 "address",
200 SqlScalarType::List {
201 element_type: Box::new(SqlScalarType::UInt64),
202 custom_id: None,
203 }
204 .nullable(false),
205 )
206 .with_key(vec![0, 1])
207 .finish(),
208
209 LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
210 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
211 .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
212 .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
213 .finish(),
214
215 LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
216 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
217 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
218 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
219 .finish(),
220
221 LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
222 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
223 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
224 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
225 .finish(),
226
227 LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
228 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
229 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
230 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
231 .finish(),
232
233 LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
234 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
235 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
236 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
237 .finish(),
238
239 LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
240 .with_column("id", SqlScalarType::UInt64.nullable(false))
241 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
242 .with_column("source", SqlScalarType::UInt64.nullable(false))
243 .with_column("port", SqlScalarType::UInt64.nullable(false))
244 .with_column("update_type", SqlScalarType::String.nullable(false))
245 .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
246 .finish(),
247
248 LogVariant::Differential(DifferentialLog::ArrangementBatches)
249 | LogVariant::Differential(DifferentialLog::ArrangementRecords)
250 | LogVariant::Differential(DifferentialLog::Sharing)
251 | LogVariant::Differential(DifferentialLog::BatcherRecords)
252 | LogVariant::Differential(DifferentialLog::BatcherSize)
253 | LogVariant::Differential(DifferentialLog::BatcherCapacity)
254 | LogVariant::Differential(DifferentialLog::BatcherAllocations)
255 | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
256 | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
257 | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
258 RelationDesc::builder()
259 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
260 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
261 .finish()
262 }
263
264 LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
265 .with_column("export_id", SqlScalarType::String.nullable(false))
266 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
267 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
268 .with_key(vec![0, 1])
269 .finish(),
270
271 LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
272 .with_column("export_id", SqlScalarType::String.nullable(false))
273 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
274 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
275 .with_key(vec![0, 1])
276 .finish(),
277
278 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
279 .with_column("export_id", SqlScalarType::String.nullable(false))
280 .with_column("import_id", SqlScalarType::String.nullable(false))
281 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
282 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
283 .with_key(vec![0, 1, 2])
284 .finish(),
285
286 LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
287 .with_column("id", SqlScalarType::Uuid.nullable(false))
288 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
289 .with_column("object_id", SqlScalarType::String.nullable(false))
290 .with_column("type", SqlScalarType::String.nullable(false))
291 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
292 .with_key(vec![0, 1])
293 .finish(),
294
295 LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
296 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
297 .with_column("type", SqlScalarType::String.nullable(false))
298 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
299 .finish(),
300
301 LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
302 .with_column("export_id", SqlScalarType::String.nullable(false))
303 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
304 .with_column("count", SqlScalarType::Int64.nullable(false))
305 .with_key(vec![0, 1])
306 .finish(),
307
308 LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
309 .with_column("export_id", SqlScalarType::String.nullable(false))
310 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
311 .with_column("time_ns", SqlScalarType::UInt64.nullable(true))
312 .with_key(vec![0, 1])
313 .finish(),
314
315 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => RelationDesc::builder()
316 .with_column("export_id", SqlScalarType::String.nullable(false))
317 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
318 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
319 .with_column("hydrated", SqlScalarType::Bool.nullable(false))
320 .with_key(vec![0, 1])
321 .finish(),
322
323 LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
324 .with_column("global_id", SqlScalarType::String.nullable(false))
325 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
326 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
327 .with_column("operator", SqlScalarType::String.nullable(false))
328 .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
329 .with_column("nesting", SqlScalarType::UInt16.nullable(false))
330 .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
331 .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
332 .with_key(vec![0, 1, 2])
333 .finish(),
334
335 LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
336 .with_column("id", SqlScalarType::UInt64.nullable(false))
337 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
338 .with_column("global_id", SqlScalarType::String.nullable(false))
339 .with_key(vec![0, 1])
340 .finish(),
341 }
342 }
343}