mz_compute_client/
logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute layer logging configuration.
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
16use mz_repr::{GlobalId, RelationDesc, ScalarType};
17use proptest::prelude::{Arbitrary, BoxedStrategy, Strategy, any, prop};
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize};
20
21include!(concat!(env!("OUT_DIR"), "/mz_compute_client.logging.rs"));
22
23/// Logging configuration.
24///
25/// Setting `enable_logging` to `false` specifies that logging is disabled.
26//
27// Ideally we'd want to instead signal disabled logging by leaving `index_logs`
28// empty. Unfortunately, we have to always provide `index_logs`, because we must
29// install the logging dataflows even on replicas that have logging disabled. See database-issues#4545.
30#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
31pub struct LoggingConfig {
32    /// The logging interval
33    pub interval: Duration,
34    /// Whether logging is enabled
35    pub enable_logging: bool,
36    /// Whether we should report logs for the log-processing dataflows
37    pub log_logging: bool,
38    /// Logs to keep in an arrangement
39    pub index_logs: BTreeMap<LogVariant, GlobalId>,
40}
41
42impl Arbitrary for LoggingConfig {
43    type Parameters = ();
44    type Strategy = BoxedStrategy<Self>;
45
46    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
47        (
48            any::<Duration>(),
49            any::<bool>(),
50            any::<bool>(),
51            prop::collection::btree_map(any::<LogVariant>(), any::<GlobalId>(), 0..2),
52        )
53            .prop_map(
54                |(interval, enable_logging, log_logging, index_logs)| LoggingConfig {
55                    interval,
56                    enable_logging,
57                    log_logging,
58                    index_logs,
59                },
60            )
61            .boxed()
62    }
63}
64
65impl RustType<ProtoLoggingConfig> for LoggingConfig {
66    fn into_proto(&self) -> ProtoLoggingConfig {
67        ProtoLoggingConfig {
68            interval: Some(self.interval.into_proto()),
69            enable_logging: self.enable_logging,
70            log_logging: self.log_logging,
71            index_logs: self.index_logs.into_proto(),
72        }
73    }
74
75    fn from_proto(proto: ProtoLoggingConfig) -> Result<Self, TryFromProtoError> {
76        Ok(LoggingConfig {
77            interval: proto
78                .interval
79                .into_rust_if_some("ProtoLoggingConfig::interval")?,
80            enable_logging: proto.enable_logging,
81            log_logging: proto.log_logging,
82            index_logs: proto.index_logs.into_rust()?,
83        })
84    }
85}
86
87impl ProtoMapEntry<LogVariant, GlobalId> for ProtoIndexLog {
88    fn from_rust<'a>(entry: (&'a LogVariant, &'a GlobalId)) -> Self {
89        ProtoIndexLog {
90            key: Some(entry.0.into_proto()),
91            value: Some(entry.1.into_proto()),
92        }
93    }
94
95    fn into_rust(self) -> Result<(LogVariant, GlobalId), TryFromProtoError> {
96        Ok((
97            self.key.into_rust_if_some("ProtoIndexLog::key")?,
98            self.value.into_rust_if_some("ProtoIndexLog::value")?,
99        ))
100    }
101}
102
103/// TODO(database-issues#7533): Add documentation.
104#[derive(
105    Arbitrary, Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
106)]
107pub enum LogVariant {
108    /// TODO(database-issues#7533): Add documentation.
109    Timely(TimelyLog),
110    /// TODO(database-issues#7533): Add documentation.
111    Differential(DifferentialLog),
112    /// TODO(database-issues#7533): Add documentation.
113    Compute(ComputeLog),
114}
115
116impl From<TimelyLog> for LogVariant {
117    fn from(value: TimelyLog) -> Self {
118        Self::Timely(value)
119    }
120}
121
122impl From<DifferentialLog> for LogVariant {
123    fn from(value: DifferentialLog) -> Self {
124        Self::Differential(value)
125    }
126}
127
128impl From<ComputeLog> for LogVariant {
129    fn from(value: ComputeLog) -> Self {
130        Self::Compute(value)
131    }
132}
133
134impl RustType<ProtoLogVariant> for LogVariant {
135    fn into_proto(&self) -> ProtoLogVariant {
136        use proto_log_variant::Kind::*;
137        ProtoLogVariant {
138            kind: Some(match self {
139                LogVariant::Timely(x) => Timely(x.into_proto()),
140                LogVariant::Differential(x) => Differential(x.into_proto()),
141                LogVariant::Compute(x) => Compute(x.into_proto()),
142            }),
143        }
144    }
145
146    fn from_proto(proto: ProtoLogVariant) -> Result<Self, TryFromProtoError> {
147        use proto_log_variant::Kind::*;
148        match proto.kind {
149            Some(Timely(x)) => Ok(LogVariant::Timely(x.into_rust()?)),
150            Some(Differential(x)) => Ok(LogVariant::Differential(x.into_rust()?)),
151            Some(Compute(x)) => Ok(LogVariant::Compute(x.into_rust()?)),
152            None => Err(TryFromProtoError::missing_field("ProtoLogVariant::kind")),
153        }
154    }
155}
156
157/// TODO(database-issues#7533): Add documentation.
158#[derive(
159    Arbitrary, Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
160)]
161pub enum TimelyLog {
162    /// TODO(database-issues#7533): Add documentation.
163    Operates,
164    /// TODO(database-issues#7533): Add documentation.
165    Channels,
166    /// TODO(database-issues#7533): Add documentation.
167    Elapsed,
168    /// TODO(database-issues#7533): Add documentation.
169    Histogram,
170    /// TODO(database-issues#7533): Add documentation.
171    Addresses,
172    /// TODO(database-issues#7533): Add documentation.
173    Parks,
174    /// TODO(database-issues#7533): Add documentation.
175    MessagesSent,
176    /// TODO(database-issues#7533): Add documentation.
177    MessagesReceived,
178    /// TODO(database-issues#7533): Add documentation.
179    Reachability,
180    /// TODO(database-issues#7533): Add documentation.
181    BatchesSent,
182    /// TODO(database-issues#7533): Add documentation.
183    BatchesReceived,
184}
185
186impl RustType<ProtoTimelyLog> for TimelyLog {
187    fn into_proto(&self) -> ProtoTimelyLog {
188        use proto_timely_log::Kind::*;
189        ProtoTimelyLog {
190            kind: Some(match self {
191                TimelyLog::Operates => Operates(()),
192                TimelyLog::Channels => Channels(()),
193                TimelyLog::Elapsed => Elapsed(()),
194                TimelyLog::Histogram => Histogram(()),
195                TimelyLog::Addresses => Addresses(()),
196                TimelyLog::Parks => Parks(()),
197                TimelyLog::MessagesSent => MessagesSent(()),
198                TimelyLog::MessagesReceived => MessagesReceived(()),
199                TimelyLog::Reachability => Reachability(()),
200                TimelyLog::BatchesSent => BatchesSent(()),
201                TimelyLog::BatchesReceived => BatchesReceived(()),
202            }),
203        }
204    }
205
206    fn from_proto(proto: ProtoTimelyLog) -> Result<Self, TryFromProtoError> {
207        use proto_timely_log::Kind::*;
208        match proto.kind {
209            Some(Operates(())) => Ok(TimelyLog::Operates),
210            Some(Channels(())) => Ok(TimelyLog::Channels),
211            Some(Elapsed(())) => Ok(TimelyLog::Elapsed),
212            Some(Histogram(())) => Ok(TimelyLog::Histogram),
213            Some(Addresses(())) => Ok(TimelyLog::Addresses),
214            Some(Parks(())) => Ok(TimelyLog::Parks),
215            Some(MessagesSent(())) => Ok(TimelyLog::MessagesSent),
216            Some(MessagesReceived(())) => Ok(TimelyLog::MessagesReceived),
217            Some(Reachability(())) => Ok(TimelyLog::Reachability),
218            Some(BatchesSent(())) => Ok(TimelyLog::BatchesSent),
219            Some(BatchesReceived(())) => Ok(TimelyLog::BatchesReceived),
220            None => Err(TryFromProtoError::missing_field("ProtoTimelyLog::kind")),
221        }
222    }
223}
224
225/// TODO(database-issues#7533): Add documentation.
226#[derive(
227    Arbitrary, Hash, Eq, Ord, PartialEq, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
228)]
229pub enum DifferentialLog {
230    /// TODO(database-issues#7533): Add documentation.
231    ArrangementBatches,
232    /// TODO(database-issues#7533): Add documentation.
233    ArrangementRecords,
234    /// TODO(database-issues#7533): Add documentation.
235    Sharing,
236    /// TODO(database-issues#7533): Add documentation.
237    BatcherRecords,
238    /// TODO(database-issues#7533): Add documentation.
239    BatcherSize,
240    /// TODO(database-issues#7533): Add documentation.
241    BatcherCapacity,
242    /// TODO(database-issues#7533): Add documentation.
243    BatcherAllocations,
244}
245
246impl RustType<ProtoDifferentialLog> for DifferentialLog {
247    fn into_proto(&self) -> ProtoDifferentialLog {
248        use proto_differential_log::Kind::*;
249        ProtoDifferentialLog {
250            kind: Some(match self {
251                DifferentialLog::ArrangementBatches => ArrangementBatches(()),
252                DifferentialLog::ArrangementRecords => ArrangementRecords(()),
253                DifferentialLog::Sharing => Sharing(()),
254                DifferentialLog::BatcherRecords => BatcherRecords(()),
255                DifferentialLog::BatcherSize => BatcherSize(()),
256                DifferentialLog::BatcherCapacity => BatcherCapacity(()),
257                DifferentialLog::BatcherAllocations => BatcherAllocations(()),
258            }),
259        }
260    }
261
262    fn from_proto(proto: ProtoDifferentialLog) -> Result<Self, TryFromProtoError> {
263        use proto_differential_log::Kind::*;
264        match proto.kind {
265            Some(ArrangementBatches(())) => Ok(DifferentialLog::ArrangementBatches),
266            Some(ArrangementRecords(())) => Ok(DifferentialLog::ArrangementRecords),
267            Some(Sharing(())) => Ok(DifferentialLog::Sharing),
268            Some(BatcherRecords(())) => Ok(DifferentialLog::BatcherRecords),
269            Some(BatcherSize(())) => Ok(DifferentialLog::BatcherSize),
270            Some(BatcherCapacity(())) => Ok(DifferentialLog::BatcherCapacity),
271            Some(BatcherAllocations(())) => Ok(DifferentialLog::BatcherAllocations),
272            None => Err(TryFromProtoError::missing_field(
273                "ProtoDifferentialLog::kind",
274            )),
275        }
276    }
277}
278
279/// Variants of compute introspection sources.
280#[derive(
281    Arbitrary, Hash, Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Copy, Serialize, Deserialize,
282)]
283pub enum ComputeLog {
284    /// Installed dataflow exports.
285    DataflowCurrent,
286    /// Dataflow write frontiers.
287    FrontierCurrent,
288    /// Pending peeks.
289    PeekCurrent,
290    /// A histogram over peek durations.
291    PeekDuration,
292    /// Dataflow import frontiers.
293    ImportFrontierCurrent,
294    /// Arrangement heap sizes.
295    ArrangementHeapSize,
296    /// Arrangement heap capacities.
297    ArrangementHeapCapacity,
298    /// Arrangement heap allocations.
299    ArrangementHeapAllocations,
300    /// A histogram over dataflow shutdown durations.
301    ShutdownDuration,
302    /// Counts of errors in exported collections.
303    ErrorCount,
304    /// Hydration times of exported collections.
305    HydrationTime,
306    /// Mappings from `GlobalId`/`LirId`` pairs to dataflow addresses.
307    LirMapping,
308    /// Mappings from dataflows to `GlobalId`s.
309    DataflowGlobal,
310}
311
312impl RustType<ProtoComputeLog> for ComputeLog {
313    fn into_proto(&self) -> ProtoComputeLog {
314        use proto_compute_log::Kind::*;
315        ProtoComputeLog {
316            kind: Some(match self {
317                ComputeLog::DataflowCurrent => DataflowCurrent(()),
318                ComputeLog::FrontierCurrent => FrontierCurrent(()),
319                ComputeLog::PeekCurrent => PeekCurrent(()),
320                ComputeLog::PeekDuration => PeekDuration(()),
321                ComputeLog::ImportFrontierCurrent => ImportFrontierCurrent(()),
322                ComputeLog::ArrangementHeapSize => ArrangementHeapSize(()),
323                ComputeLog::ArrangementHeapCapacity => ArrangementHeapCapacity(()),
324                ComputeLog::ArrangementHeapAllocations => ArrangementHeapAllocations(()),
325                ComputeLog::ShutdownDuration => ShutdownDuration(()),
326                ComputeLog::ErrorCount => ErrorCount(()),
327                ComputeLog::HydrationTime => HydrationTime(()),
328                ComputeLog::LirMapping => LirMapping(()),
329                ComputeLog::DataflowGlobal => DataflowGlobal(()),
330            }),
331        }
332    }
333
334    fn from_proto(proto: ProtoComputeLog) -> Result<Self, TryFromProtoError> {
335        use proto_compute_log::Kind::*;
336        match proto.kind {
337            Some(DataflowCurrent(())) => Ok(ComputeLog::DataflowCurrent),
338            Some(FrontierCurrent(())) => Ok(ComputeLog::FrontierCurrent),
339            Some(PeekCurrent(())) => Ok(ComputeLog::PeekCurrent),
340            Some(PeekDuration(())) => Ok(ComputeLog::PeekDuration),
341            Some(ImportFrontierCurrent(())) => Ok(ComputeLog::ImportFrontierCurrent),
342            Some(ArrangementHeapSize(())) => Ok(ComputeLog::ArrangementHeapSize),
343            Some(ArrangementHeapCapacity(())) => Ok(ComputeLog::ArrangementHeapCapacity),
344            Some(ArrangementHeapAllocations(())) => Ok(ComputeLog::ArrangementHeapAllocations),
345            Some(ShutdownDuration(())) => Ok(ComputeLog::ShutdownDuration),
346            Some(ErrorCount(())) => Ok(ComputeLog::ErrorCount),
347            Some(HydrationTime(())) => Ok(ComputeLog::HydrationTime),
348            Some(LirMapping(())) => Ok(ComputeLog::LirMapping),
349            Some(DataflowGlobal(())) => Ok(ComputeLog::DataflowGlobal),
350            None => Err(TryFromProtoError::missing_field("ProtoComputeLog::kind")),
351        }
352    }
353}
354
355impl LogVariant {
356    /// By which columns should the logs be indexed.
357    ///
358    /// This is distinct from the `keys` property of the type, which indicates uniqueness.
359    /// When keys exist these are good choices for indexing, but when they do not we still
360    /// require index guidance.
361    pub fn index_by(&self) -> Vec<usize> {
362        let desc = self.desc();
363        let arity = desc.arity();
364        desc.typ()
365            .keys
366            .get(0)
367            .cloned()
368            .unwrap_or_else(|| (0..arity).collect())
369    }
370
371    /// Relation schemas for the logs.
372    ///
373    /// This types need to agree with the values that are produced
374    /// in `logging::compute::construct` and with the description in
375    /// `catalog/src/builtin.rs`.
376    pub fn desc(&self) -> RelationDesc {
377        match self {
378            LogVariant::Timely(TimelyLog::Operates) => RelationDesc::builder()
379                .with_column("id", ScalarType::UInt64.nullable(false))
380                .with_column("worker_id", ScalarType::UInt64.nullable(false))
381                .with_column("name", ScalarType::String.nullable(false))
382                .with_key(vec![0, 1])
383                .finish(),
384
385            LogVariant::Timely(TimelyLog::Channels) => RelationDesc::builder()
386                .with_column("id", ScalarType::UInt64.nullable(false))
387                .with_column("worker_id", ScalarType::UInt64.nullable(false))
388                .with_column("from_index", ScalarType::UInt64.nullable(false))
389                .with_column("from_port", ScalarType::UInt64.nullable(false))
390                .with_column("to_index", ScalarType::UInt64.nullable(false))
391                .with_column("to_port", ScalarType::UInt64.nullable(false))
392                .with_key(vec![0, 1])
393                .finish(),
394
395            LogVariant::Timely(TimelyLog::Elapsed) => RelationDesc::builder()
396                .with_column("id", ScalarType::UInt64.nullable(false))
397                .with_column("worker_id", ScalarType::UInt64.nullable(false))
398                .finish(),
399
400            LogVariant::Timely(TimelyLog::Histogram) => RelationDesc::builder()
401                .with_column("id", ScalarType::UInt64.nullable(false))
402                .with_column("worker_id", ScalarType::UInt64.nullable(false))
403                .with_column("duration_ns", ScalarType::UInt64.nullable(false))
404                .finish(),
405
406            LogVariant::Timely(TimelyLog::Addresses) => RelationDesc::builder()
407                .with_column("id", ScalarType::UInt64.nullable(false))
408                .with_column("worker_id", ScalarType::UInt64.nullable(false))
409                .with_column(
410                    "address",
411                    ScalarType::List {
412                        element_type: Box::new(ScalarType::UInt64),
413                        custom_id: None,
414                    }
415                    .nullable(false),
416                )
417                .with_key(vec![0, 1])
418                .finish(),
419
420            LogVariant::Timely(TimelyLog::Parks) => RelationDesc::builder()
421                .with_column("worker_id", ScalarType::UInt64.nullable(false))
422                .with_column("slept_for_ns", ScalarType::UInt64.nullable(false))
423                .with_column("requested_ns", ScalarType::UInt64.nullable(false))
424                .finish(),
425
426            LogVariant::Timely(TimelyLog::BatchesReceived) => RelationDesc::builder()
427                .with_column("channel_id", ScalarType::UInt64.nullable(false))
428                .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
429                .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
430                .finish(),
431
432            LogVariant::Timely(TimelyLog::BatchesSent) => RelationDesc::builder()
433                .with_column("channel_id", ScalarType::UInt64.nullable(false))
434                .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
435                .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
436                .finish(),
437
438            LogVariant::Timely(TimelyLog::MessagesReceived) => RelationDesc::builder()
439                .with_column("channel_id", ScalarType::UInt64.nullable(false))
440                .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
441                .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
442                .finish(),
443
444            LogVariant::Timely(TimelyLog::MessagesSent) => RelationDesc::builder()
445                .with_column("channel_id", ScalarType::UInt64.nullable(false))
446                .with_column("from_worker_id", ScalarType::UInt64.nullable(false))
447                .with_column("to_worker_id", ScalarType::UInt64.nullable(false))
448                .finish(),
449
450            LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder()
451                .with_column("id", ScalarType::UInt64.nullable(false))
452                .with_column("worker_id", ScalarType::UInt64.nullable(false))
453                .with_column("source", ScalarType::UInt64.nullable(false))
454                .with_column("port", ScalarType::UInt64.nullable(false))
455                .with_column("update_type", ScalarType::String.nullable(false))
456                .with_column("time", ScalarType::MzTimestamp.nullable(true))
457                .finish(),
458
459            LogVariant::Differential(DifferentialLog::ArrangementBatches)
460            | LogVariant::Differential(DifferentialLog::ArrangementRecords)
461            | LogVariant::Differential(DifferentialLog::Sharing)
462            | LogVariant::Differential(DifferentialLog::BatcherRecords)
463            | LogVariant::Differential(DifferentialLog::BatcherSize)
464            | LogVariant::Differential(DifferentialLog::BatcherCapacity)
465            | LogVariant::Differential(DifferentialLog::BatcherAllocations)
466            | LogVariant::Compute(ComputeLog::ArrangementHeapSize)
467            | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity)
468            | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => {
469                RelationDesc::builder()
470                    .with_column("operator_id", ScalarType::UInt64.nullable(false))
471                    .with_column("worker_id", ScalarType::UInt64.nullable(false))
472                    .finish()
473            }
474
475            LogVariant::Compute(ComputeLog::DataflowCurrent) => RelationDesc::builder()
476                .with_column("export_id", ScalarType::String.nullable(false))
477                .with_column("worker_id", ScalarType::UInt64.nullable(false))
478                .with_column("dataflow_id", ScalarType::UInt64.nullable(false))
479                .with_key(vec![0, 1])
480                .finish(),
481
482            LogVariant::Compute(ComputeLog::FrontierCurrent) => RelationDesc::builder()
483                .with_column("export_id", ScalarType::String.nullable(false))
484                .with_column("worker_id", ScalarType::UInt64.nullable(false))
485                .with_column("time", ScalarType::MzTimestamp.nullable(false))
486                .with_key(vec![0, 1])
487                .finish(),
488
489            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => RelationDesc::builder()
490                .with_column("export_id", ScalarType::String.nullable(false))
491                .with_column("import_id", ScalarType::String.nullable(false))
492                .with_column("worker_id", ScalarType::UInt64.nullable(false))
493                .with_column("time", ScalarType::MzTimestamp.nullable(false))
494                .with_key(vec![0, 1, 2])
495                .finish(),
496
497            LogVariant::Compute(ComputeLog::PeekCurrent) => RelationDesc::builder()
498                .with_column("id", ScalarType::Uuid.nullable(false))
499                .with_column("worker_id", ScalarType::UInt64.nullable(false))
500                .with_column("object_id", ScalarType::String.nullable(false))
501                .with_column("type", ScalarType::String.nullable(false))
502                .with_column("time", ScalarType::MzTimestamp.nullable(false))
503                .with_key(vec![0, 1])
504                .finish(),
505
506            LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::builder()
507                .with_column("worker_id", ScalarType::UInt64.nullable(false))
508                .with_column("type", ScalarType::String.nullable(false))
509                .with_column("duration_ns", ScalarType::UInt64.nullable(false))
510                .finish(),
511
512            LogVariant::Compute(ComputeLog::ShutdownDuration) => RelationDesc::builder()
513                .with_column("worker_id", ScalarType::UInt64.nullable(false))
514                .with_column("duration_ns", ScalarType::UInt64.nullable(false))
515                .finish(),
516
517            LogVariant::Compute(ComputeLog::ErrorCount) => RelationDesc::builder()
518                .with_column("export_id", ScalarType::String.nullable(false))
519                .with_column("worker_id", ScalarType::UInt64.nullable(false))
520                .with_column("count", ScalarType::Int64.nullable(false))
521                .with_key(vec![0, 1])
522                .finish(),
523
524            LogVariant::Compute(ComputeLog::HydrationTime) => RelationDesc::builder()
525                .with_column("export_id", ScalarType::String.nullable(false))
526                .with_column("worker_id", ScalarType::UInt64.nullable(false))
527                .with_column("time_ns", ScalarType::UInt64.nullable(true))
528                .with_key(vec![0, 1])
529                .finish(),
530
531            LogVariant::Compute(ComputeLog::LirMapping) => RelationDesc::builder()
532                .with_column("global_id", ScalarType::String.nullable(false))
533                .with_column("lir_id", ScalarType::UInt64.nullable(false))
534                .with_column("worker_id", ScalarType::UInt64.nullable(false))
535                .with_column("operator", ScalarType::String.nullable(false))
536                .with_column("parent_lir_id", ScalarType::UInt64.nullable(true))
537                .with_column("nesting", ScalarType::UInt16.nullable(false))
538                .with_column("operator_id_start", ScalarType::UInt64.nullable(false))
539                .with_column("operator_id_end", ScalarType::UInt64.nullable(false))
540                .with_key(vec![0, 1, 2])
541                .finish(),
542
543            LogVariant::Compute(ComputeLog::DataflowGlobal) => RelationDesc::builder()
544                .with_column("id", ScalarType::UInt64.nullable(false))
545                .with_column("worker_id", ScalarType::UInt64.nullable(false))
546                .with_column("global_id", ScalarType::String.nullable(false))
547                .with_key(vec![0, 1])
548                .finish(),
549        }
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use mz_ore::assert_ok;
556    use mz_proto::protobuf_roundtrip;
557    use proptest::prelude::*;
558
559    use super::*;
560
561    proptest! {
562        #[mz_ore::test]
563        fn logging_config_protobuf_roundtrip(expect in any::<LoggingConfig>()) {
564            let actual = protobuf_roundtrip::<_, ProtoLoggingConfig>(&expect);
565            assert_ok!(actual);
566            assert_eq!(actual.unwrap(), expect);
567        }
568    }
569}