1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
16use mz_repr::{GlobalId, RelationDesc, ScalarType};
17use proptest::prelude::{Arbitrary, BoxedStrategy, Strategy, any, prop};
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize};
20
21include!(concat!(env!("OUT_DIR"), "/mz_compute_client.logging.rs"));
22
23#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
31pub struct LoggingConfig {
32 pub interval: Duration,
34 pub enable_logging: bool,
36 pub log_logging: bool,
38 pub index_logs: BTreeMap<LogVariant, GlobalId>,
40}
41
42impl Arbitrary for LoggingConfig {
43 type Parameters = ();
44 type Strategy = BoxedStrategy<Self>;
45
46 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
47 (
48 any::<Duration>(),
49 any::<bool>(),
50 any::<bool>(),
51 prop::collection::btree_map(any::<LogVariant>(), any::<GlobalId>(), 0..2),
52 )
53 .prop_map(
54 |(interval, enable_logging, log_logging, index_logs)| LoggingConfig {
55 interval,
56 enable_logging,
57 log_logging,
58 index_logs,
59 },
60 )
61 .boxed()
62 }
63}
64
65impl RustType<ProtoLoggingConfig> for LoggingConfig {
66 fn into_proto(&self) -> ProtoLoggingConfig {
67 ProtoLoggingConfig {
68 interval: Some(self.interval.into_proto()),
69 enable_logging: self.enable_logging,
70 log_logging: self.log_logging,
71 index_logs: self.index_logs.into_proto(),
72 }
73 }
74
75 fn from_proto(proto: ProtoLoggingConfig) -> Result<Self, TryFromProtoError> {
76 Ok(LoggingConfig {
77 interval: proto
78 .interval
79 .into_rust_if_some("ProtoLoggingConfig::interval")?,
80 enable_logging: proto.enable_logging,
81 log_logging: proto.log_logging,
82 index_logs: proto.index_logs.into_rust()?,
83 })
84 }
85}
86
87impl ProtoMapEntry<LogVariant, GlobalId> for ProtoIndexLog {
88 fn from_rust<'a>(entry: (&'a LogVariant, &'a GlobalId)) -> Self {
89 ProtoIndexLog {
90 key: Some(entry.0.into_proto()),
91 value: Some(entry.1.into_proto()),
92 }
93 }
94
95 fn into_rust(self) -> Result<(LogVariant, GlobalId), TryFromProtoError> {
96 Ok((
97 self.key.into_rust_if_some("ProtoIndexLog::key")?,
98 self.value.into_rust_if_some("ProtoIndexLog::value")?,
99 ))
100 }
101}
102
103#[derive(
105 Arbitrary, Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
106)]
107pub enum LogVariant {
108 Timely(TimelyLog),
110 Differential(DifferentialLog),
112 Compute(ComputeLog),
114}
115
116impl From<TimelyLog> for LogVariant {
117 fn from(value: TimelyLog) -> Self {
118 Self::Timely(value)
119 }
120}
121
122impl From<DifferentialLog> for LogVariant {
123 fn from(value: DifferentialLog) -> Self {
124 Self::Differential(value)
125 }
126}
127
128impl From<ComputeLog> for LogVariant {
129 fn from(value: ComputeLog) -> Self {
130 Self::Compute(value)
131 }
132}
133
134impl RustType<ProtoLogVariant> for LogVariant {
135 fn into_proto(&self) -> ProtoLogVariant {
136 use proto_log_variant::Kind::*;
137 ProtoLogVariant {
138 kind: Some(match self {
139 LogVariant::Timely(x) => Timely(x.into_proto()),
140 LogVariant::Differential(x) => Differential(x.into_proto()),
141 LogVariant::Compute(x) => Compute(x.into_proto()),
142 }),
143 }
144 }
145
146 fn from_proto(proto: ProtoLogVariant) -> Result<Self, TryFromProtoError> {
147 use proto_log_variant::Kind::*;
148 match proto.kind {
149 Some(Timely(x)) => Ok(LogVariant::Timely(x.into_rust()?)),
150 Some(Differential(x)) => Ok(LogVariant::Differential(x.into_rust()?)),
151 Some(Compute(x)) => Ok(LogVariant::Compute(x.into_rust()?)),
152 None => Err(TryFromProtoError::missing_field("ProtoLogVariant::kind")),
153 }
154 }
155}
156
157#[derive(
159 Arbitrary, Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
160)]
161pub enum TimelyLog {
162 Operates,
164 Channels,
166 Elapsed,
168 Histogram,
170 Addresses,
172 Parks,
174 MessagesSent,
176 MessagesReceived,
178 Reachability,
180 BatchesSent,
182 BatchesReceived,
184}
185
186impl RustType<ProtoTimelyLog> for TimelyLog {
187 fn into_proto(&self) -> ProtoTimelyLog {
188 use proto_timely_log::Kind::*;
189 ProtoTimelyLog {
190 kind: Some(match self {
191 TimelyLog::Operates => Operates(()),
192 TimelyLog::Channels => Channels(()),
193 TimelyLog::Elapsed => Elapsed(()),
194 TimelyLog::Histogram => Histogram(()),
195 TimelyLog::Addresses => Addresses(()),
196 TimelyLog::Parks => Parks(()),
197 TimelyLog::MessagesSent => MessagesSent(()),
198 TimelyLog::MessagesReceived => MessagesReceived(()),
199 TimelyLog::Reachability => Reachability(()),
200 TimelyLog::BatchesSent => BatchesSent(()),
201 TimelyLog::BatchesReceived => BatchesReceived(()),
202 }),
203 }
204 }
205
206 fn from_proto(proto: ProtoTimelyLog) -> Result<Self, TryFromProtoError> {
207 use proto_timely_log::Kind::*;
208 match proto.kind {
209 Some(Operates(())) => Ok(TimelyLog::Operates),
210 Some(Channels(())) => Ok(TimelyLog::Channels),
211 Some(Elapsed(())) => Ok(TimelyLog::Elapsed),
212 Some(Histogram(())) => Ok(TimelyLog::Histogram),
213 Some(Addresses(())) => Ok(TimelyLog::Addresses),
214 Some(Parks(())) => Ok(TimelyLog::Parks),
215 Some(MessagesSent(())) => Ok(TimelyLog::MessagesSent),
216 Some(MessagesReceived(())) => Ok(TimelyLog::MessagesReceived),
217 Some(Reachability(())) => Ok(TimelyLog::Reachability),
218 Some(BatchesSent(())) => Ok(TimelyLog::BatchesSent),
219 Some(BatchesReceived(())) => Ok(TimelyLog::BatchesReceived),
220 None => Err(TryFromProtoError::missing_field("ProtoTimelyLog::kind")),
221 }
222 }
223}
224
225#[derive(
227 Arbitrary, Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
228)]
229pub enum DifferentialLog {
230 ArrangementBatches,
232 ArrangementRecords,
234 Sharing,
236 BatcherRecords,
238 BatcherSize,
240 BatcherCapacity,
242 BatcherAllocations,
244}
245
246impl RustType<ProtoDifferentialLog> for DifferentialLog {
247 fn into_proto(&self) -> ProtoDifferentialLog {
248 use proto_differential_log::Kind::*;
249 ProtoDifferentialLog {
250 kind: Some(match self {
251 DifferentialLog::ArrangementBatches => ArrangementBatches(()),
252 DifferentialLog::ArrangementRecords => ArrangementRecords(()),
253 DifferentialLog::Sharing => Sharing(()),
254 DifferentialLog::BatcherRecords => BatcherRecords(()),
255 DifferentialLog::BatcherSize => BatcherSize(()),
256 DifferentialLog::BatcherCapacity => BatcherCapacity(()),
257 DifferentialLog::BatcherAllocations => BatcherAllocations(()),
258 }),
259 }
260 }
261
262 fn from_proto(proto: ProtoDifferentialLog) -> Result<Self, TryFromProtoError> {
263 use proto_differential_log::Kind::*;
264 match proto.kind {
265 Some(ArrangementBatches(())) => Ok(DifferentialLog::ArrangementBatches),
266 Some(ArrangementRecords(())) => Ok(DifferentialLog::ArrangementRecords),
267 Some(Sharing(())) => Ok(DifferentialLog::Sharing),
268 Some(BatcherRecords(())) => Ok(DifferentialLog::BatcherRecords),
269 Some(BatcherSize(())) => Ok(DifferentialLog::BatcherSize),
270 Some(BatcherCapacity(())) => Ok(DifferentialLog::BatcherCapacity),
271 Some(BatcherAllocations(())) => Ok(DifferentialLog::BatcherAllocations),
272 None => Err(TryFromProtoError::missing_field(
273 "ProtoDifferentialLog::kind",
274 )),
275 }
276 }
277}
278
279#[derive(
281 Arbitrary, Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
282)]
283pub enum ComputeLog {
284 DataflowCurrent,
286 FrontierCurrent,
288 PeekCurrent,
290 PeekDuration,
292 ImportFrontierCurrent,
294 ArrangementHeapSize,
296 ArrangementHeapCapacity,
298 ArrangementHeapAllocations,
300 ShutdownDuration,
302 ErrorCount,
304 HydrationTime,
306 LirMapping,
308 DataflowGlobal,
310}
311
312impl RustType<ProtoComputeLog> for ComputeLog {
313 fn into_proto(&self) -> ProtoComputeLog {
314 use proto_compute_log::Kind::*;
315 ProtoComputeLog {
316 kind: Some(match self {
317 ComputeLog::DataflowCurrent => DataflowCurrent(()),
318 ComputeLog::FrontierCurrent => FrontierCurrent(()),
319 ComputeLog::PeekCurrent => PeekCurrent(()),
320 ComputeLog::PeekDuration => PeekDuration(()),
321 ComputeLog::ImportFrontierCurrent => ImportFrontierCurrent(()),
322 ComputeLog::ArrangementHeapSize => ArrangementHeapSize(()),
323 ComputeLog::ArrangementHeapCapacity => ArrangementHeapCapacity(()),
324 ComputeLog::ArrangementHeapAllocations => ArrangementHeapAllocations(()),
325 ComputeLog::ShutdownDuration => ShutdownDuration(()),
326 ComputeLog::ErrorCount => ErrorCount(()),
327 ComputeLog::HydrationTime => HydrationTime(()),
328 ComputeLog::LirMapping => LirMapping(()),
329 ComputeLog::DataflowGlobal => DataflowGlobal(()),
330 }),
331 }
332 }
333
334 fn from_proto(proto: ProtoComputeLog) -> Result<Self, TryFromProtoError> {
335 use proto_compute_log::Kind::*;
336 match proto.kind {
337 Some(DataflowCurrent(())) => Ok(ComputeLog::DataflowCurrent),
338 Some(FrontierCurrent(())) => Ok(ComputeLog::FrontierCurrent),
339 Some(PeekCurrent(())) => Ok(ComputeLog::PeekCurrent),
340 Some(PeekDuration(())) => Ok(ComputeLog::PeekDuration),
341 Some(ImportFrontierCurrent(())) => Ok(ComputeLog::ImportFrontierCurrent),
342 Some(ArrangementHeapSize(())) => Ok(ComputeLog::ArrangementHeapSize),
343 Some(ArrangementHeapCapacity(())) => Ok(ComputeLog::ArrangementHeapCapacity),
344 Some(ArrangementHeapAllocations(())) => Ok(ComputeLog::ArrangementHeapAllocations),
345 Some(ShutdownDuration(())) => Ok(ComputeLog::ShutdownDuration),
346 Some(ErrorCount(())) => Ok(ComputeLog::ErrorCount),
347 Some(HydrationTime(())) => Ok(ComputeLog::HydrationTime),
348 Some(LirMapping(())) => Ok(ComputeLog::LirMapping),
349 Some(DataflowGlobal(())) => Ok(ComputeLog::DataflowGlobal),
350 None => Err(TryFromProtoError::missing_field("ProtoComputeLog::kind")),
351 }
352 }
353}
354
355impl LogVariant {
356 pub fn index_by(&self) -> Vec<usize> {
362 let desc = self.desc();
363 let arity = desc.arity();
364 desc.typ()
365 .keys
366 .get(0)
367 .cloned()
368 .unwrap_or_else(|| (0..arity).collect())
369 }
370
371 pub fn desc(&self) -> RelationDesc {
377 match self {
378 LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
379 .with_column("id", ScalarType::UInt64.nullable(false))
380 .with_column("worker_id", ScalarType::UInt64.nullable(false))
381 .with_column("name", ScalarType::String.nullable(false))
382 .with_key(vec![0, 1])
383 .finish(),
384
385 LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
386 .with_column("id", ScalarType::UInt64.nullable(false))
387 .with_column("worker_id", ScalarType::UInt64.nullable(false))
388 .with_column("from_index", ScalarType::UInt64.nullable(false))
389 .with_column("from_port", ScalarType::UInt64.nullable(false))
390 .with_column("to_index", ScalarType::UInt64.nullable(false))
391 .with_column("to_port", ScalarType::UInt64.nullable(false))
392 .with_key(vec![0, 1])
393 .finish(),
394
395 LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
396 .with_column("id", ScalarType::UInt64.nullable(false))
397 .with_column("worker_id", ScalarType::UInt64.nullable(false))
398 .finish(),
399
400 LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
401 .with_column("id", ScalarType::UInt64.nullable(false))
402 .with_column("worker_id", ScalarType::UInt64.nullable(false))
403 .with_column("duration_ns", ScalarType::UInt64.nullable(false))
404 .finish(),
405
406 LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
407 .with_column("id", ScalarType::UInt64.nullable(false))
408 .with_column("worker_id", ScalarType::UInt64.nullable(false))
409 .with_column(
410 "address",
411 ScalarType::List {
412 element_type: Box::new(ScalarType::UInt64),
413 custom_id: None,
414 }
415 .nullable(false),
416 )
417 .with_key(vec![0, 1])
418 .finish(),
419
420 LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
421 .with_column("worker_id", ScalarType::UInt64.nullable(false))
422 .with_column("slept_for_ns", ScalarType::UInt64.nullable(false))
423 .with_column("requested_ns", ScalarType::UInt64.nullable(false))
424 .finish(),
425
426 LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
427 .with_column("channel_id", ScalarType::UInt64.nullable(false))
428 .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
429 .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
430 .finish(),
431
432 LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
433 .with_column("channel_id", ScalarType::UInt64.nullable(false))
434 .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
435 .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
436 .finish(),
437
438 LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
439 .with_column("channel_id", ScalarType::UInt64.nullable(false))
440 .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
441 .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
442 .finish(),
443
444 LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
445 .with_column("channel_id", ScalarType::UInt64.nullable(false))
446 .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
447 .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
448 .finish(),
449
450 LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
451 .with_column("id", ScalarType::UInt64.nullable(false))
452 .with_column("worker_id", ScalarType::UInt64.nullable(false))
453 .with_column("source", ScalarType::UInt64.nullable(false))
454 .with_column("port", ScalarType::UInt64.nullable(false))
455 .with_column("update_type", ScalarType::String.nullable(false))
456 .with_column("time", ScalarType::MzTimestamp.nullable(true))
457 .finish(),
458
459 LogVariant::Differential(DifferentialLog::ArrangementBatches)
460 | LogVariant::Differential(DifferentialLog::ArrangementRecords)
461 | LogVariant::Differential(DifferentialLog::Sharing)
462 | LogVariant::Differential(DifferentialLog::BatcherRecords)
463 | LogVariant::Differential(DifferentialLog::BatcherSize)
464 | LogVariant::Differential(DifferentialLog::BatcherCapacity)
465 | LogVariant::Differential(DifferentialLog::BatcherAllocations)
466 | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
467 | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
468 | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
469 RelationDesc::builder()
470 .with_column("operator_id", ScalarType::UInt64.nullable(false))
471 .with_column("worker_id", ScalarType::UInt64.nullable(false))
472 .finish()
473 }
474
475 LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
476 .with_column("export_id", ScalarType::String.nullable(false))
477 .with_column("worker_id", ScalarType::UInt64.nullable(false))
478 .with_column("dataflow_id", ScalarType::UInt64.nullable(false))
479 .with_key(vec![0, 1])
480 .finish(),
481
482 LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
483 .with_column("export_id", ScalarType::String.nullable(false))
484 .with_column("worker_id", ScalarType::UInt64.nullable(false))
485 .with_column("time", ScalarType::MzTimestamp.nullable(false))
486 .with_key(vec![0, 1])
487 .finish(),
488
489 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
490 .with_column("export_id", ScalarType::String.nullable(false))
491 .with_column("import_id", ScalarType::String.nullable(false))
492 .with_column("worker_id", ScalarType::UInt64.nullable(false))
493 .with_column("time", ScalarType::MzTimestamp.nullable(false))
494 .with_key(vec![0, 1, 2])
495 .finish(),
496
497 LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
498 .with_column("id", ScalarType::Uuid.nullable(false))
499 .with_column("worker_id", ScalarType::UInt64.nullable(false))
500 .with_column("object_id", ScalarType::String.nullable(false))
501 .with_column("type", ScalarType::String.nullable(false))
502 .with_column("time", ScalarType::MzTimestamp.nullable(false))
503 .with_key(vec![0, 1])
504 .finish(),
505
506 LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
507 .with_column("worker_id", ScalarType::UInt64.nullable(false))
508 .with_column("type", ScalarType::String.nullable(false))
509 .with_column("duration_ns", ScalarType::UInt64.nullable(false))
510 .finish(),
511
512 LogVariant::Compute(ComputeLog::ShutdownDuration) => RelationDesc::builder()
513 .with_column("worker_id", ScalarType::UInt64.nullable(false))
514 .with_column("duration_ns", ScalarType::UInt64.nullable(false))
515 .finish(),
516
517 LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
518 .with_column("export_id", ScalarType::String.nullable(false))
519 .with_column("worker_id", ScalarType::UInt64.nullable(false))
520 .with_column("count", ScalarType::Int64.nullable(false))
521 .with_key(vec![0, 1])
522 .finish(),
523
524 LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
525 .with_column("export_id", ScalarType::String.nullable(false))
526 .with_column("worker_id", ScalarType::UInt64.nullable(false))
527 .with_column("time_ns", ScalarType::UInt64.nullable(true))
528 .with_key(vec![0, 1])
529 .finish(),
530
531 LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
532 .with_column("global_id", ScalarType::String.nullable(false))
533 .with_column("lir_id", ScalarType::UInt64.nullable(false))
534 .with_column("worker_id", ScalarType::UInt64.nullable(false))
535 .with_column("operator", ScalarType::String.nullable(false))
536 .with_column("parent_lir_id", ScalarType::UInt64.nullable(true))
537 .with_column("nesting", ScalarType::UInt16.nullable(false))
538 .with_column("operator_id_start", ScalarType::UInt64.nullable(false))
539 .with_column("operator_id_end", ScalarType::UInt64.nullable(false))
540 .with_key(vec![0, 1, 2])
541 .finish(),
542
543 LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
544 .with_column("id", ScalarType::UInt64.nullable(false))
545 .with_column("worker_id", ScalarType::UInt64.nullable(false))
546 .with_column("global_id", ScalarType::String.nullable(false))
547 .with_key(vec![0, 1])
548 .finish(),
549 }
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use mz_ore::assert_ok;
556 use mz_proto::protobuf_roundtrip;
557 use proptest::prelude::*;
558
559 use super::*;
560
561 proptest! {
562 #[mz_ore::test]
563 fn logging_config_protobuf_roundtrip(expect in any::<LoggingConfig>()) {
564 let actual = protobuf_roundtrip::<_, ProtoLoggingConfig>(&expect);
565 assert_ok!(actual);
566 assert_eq!(actual.unwrap(), expect);
567 }
568 }
569}