Skip to main content

mz_catalog/builtin/
mz_introspection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Built-in catalog items for the `mz_introspection` schema.
11
12use std::collections::BTreeMap;
13use std::sync::LazyLock;
14
15use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
16use mz_pgrepr::oid;
17use mz_repr::adt::numeric::NumericMaxScale;
18use mz_repr::namespaces::MZ_INTROSPECTION_SCHEMA;
19use mz_repr::{RelationDesc, SemanticType, SqlScalarType};
20
21use super::{
22    BuiltinLog, BuiltinView, Cardinality, LinkProperties, Ontology, OntologyLink, PUBLIC_SELECT,
23};
24
25pub static MZ_DATAFLOW_OPERATORS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
26    name: "mz_dataflow_operators_per_worker",
27    schema: MZ_INTROSPECTION_SCHEMA,
28    oid: oid::LOG_MZ_DATAFLOW_OPERATORS_PER_WORKER_OID,
29    variant: LogVariant::Timely(TimelyLog::Operates),
30    access: vec![PUBLIC_SELECT],
31    ontology: Some(Ontology {
32        entity_name: "dataflow_operator_per_worker",
33        description: "Timely dataflow operators present on a specific worker.",
34        links: &const { [] },
35        column_semantic_types: &[],
36    }),
37});
38
39pub static MZ_DATAFLOW_ADDRESSES_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
40    name: "mz_dataflow_addresses_per_worker",
41    schema: MZ_INTROSPECTION_SCHEMA,
42    oid: oid::LOG_MZ_DATAFLOW_ADDRESSES_PER_WORKER_OID,
43    variant: LogVariant::Timely(TimelyLog::Addresses),
44    access: vec![PUBLIC_SELECT],
45    ontology: Some(Ontology {
46        entity_name: "dataflow_address_per_worker",
47        description: "Scope address of each Timely operator per worker.",
48        links: &const {
49            [OntologyLink {
50                name: "address_of",
51                target: "dataflow_operator_per_worker",
52                properties: LinkProperties::fk_composite(
53                    "id",
54                    "id",
55                    Cardinality::ManyToOne,
56                    &[("worker_id", "worker_id")],
57                ),
58            }]
59        },
60        column_semantic_types: &[],
61    }),
62});
63
64pub static MZ_DATAFLOW_CHANNELS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
65    name: "mz_dataflow_channels_per_worker",
66    schema: MZ_INTROSPECTION_SCHEMA,
67    oid: oid::LOG_MZ_DATAFLOW_CHANNELS_PER_WORKER_OID,
68    variant: LogVariant::Timely(TimelyLog::Channels),
69    access: vec![PUBLIC_SELECT],
70    ontology: Some(Ontology {
71        entity_name: "dataflow_channel_per_worker",
72        description: "Timely dataflow communication channels per worker.",
73        links: &const {
74            [
75                OntologyLink {
76                    name: "source_operator",
77                    target: "dataflow_operator_per_worker",
78                    properties: LinkProperties::fk_composite(
79                        "from_index",
80                        "id",
81                        Cardinality::ManyToOne,
82                        &[("worker_id", "worker_id")],
83                    ),
84                },
85                OntologyLink {
86                    name: "target_operator",
87                    target: "dataflow_operator_per_worker",
88                    properties: LinkProperties::fk_composite(
89                        "to_index",
90                        "id",
91                        Cardinality::ManyToOne,
92                        &[("worker_id", "worker_id")],
93                    ),
94                },
95            ]
96        },
97        column_semantic_types: &[],
98    }),
99});
100
101pub static MZ_SCHEDULING_ELAPSED_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
102    name: "mz_scheduling_elapsed_raw",
103    schema: MZ_INTROSPECTION_SCHEMA,
104    oid: oid::LOG_MZ_SCHEDULING_ELAPSED_RAW_OID,
105    variant: LogVariant::Timely(TimelyLog::Elapsed),
106    access: vec![PUBLIC_SELECT],
107    ontology: None,
108});
109
110pub static MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_RAW: LazyLock<BuiltinLog> =
111    LazyLock::new(|| BuiltinLog {
112        name: "mz_compute_operator_durations_histogram_raw",
113        schema: MZ_INTROSPECTION_SCHEMA,
114        oid: oid::LOG_MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_RAW_OID,
115        variant: LogVariant::Timely(TimelyLog::Histogram),
116        access: vec![PUBLIC_SELECT],
117        ontology: None,
118    });
119
120pub static MZ_SCHEDULING_PARKS_HISTOGRAM_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
121    name: "mz_scheduling_parks_histogram_raw",
122    schema: MZ_INTROSPECTION_SCHEMA,
123    oid: oid::LOG_MZ_SCHEDULING_PARKS_HISTOGRAM_RAW_OID,
124    variant: LogVariant::Timely(TimelyLog::Parks),
125    access: vec![PUBLIC_SELECT],
126    ontology: None,
127});
128
129pub static MZ_ARRANGEMENT_RECORDS_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
130    name: "mz_arrangement_records_raw",
131    schema: MZ_INTROSPECTION_SCHEMA,
132    oid: oid::LOG_MZ_ARRANGEMENT_RECORDS_RAW_OID,
133    variant: LogVariant::Differential(DifferentialLog::ArrangementRecords),
134    access: vec![PUBLIC_SELECT],
135    ontology: None,
136});
137
138pub static MZ_ARRANGEMENT_BATCHES_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
139    name: "mz_arrangement_batches_raw",
140    schema: MZ_INTROSPECTION_SCHEMA,
141    oid: oid::LOG_MZ_ARRANGEMENT_BATCHES_RAW_OID,
142    variant: LogVariant::Differential(DifferentialLog::ArrangementBatches),
143    access: vec![PUBLIC_SELECT],
144    ontology: None,
145});
146
147pub static MZ_ARRANGEMENT_SHARING_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
148    name: "mz_arrangement_sharing_raw",
149    schema: MZ_INTROSPECTION_SCHEMA,
150    oid: oid::LOG_MZ_ARRANGEMENT_SHARING_RAW_OID,
151    variant: LogVariant::Differential(DifferentialLog::Sharing),
152    access: vec![PUBLIC_SELECT],
153    ontology: None,
154});
155
156pub static MZ_ARRANGEMENT_BATCHER_RECORDS_RAW: LazyLock<BuiltinLog> =
157    LazyLock::new(|| BuiltinLog {
158        name: "mz_arrangement_batcher_records_raw",
159        schema: MZ_INTROSPECTION_SCHEMA,
160        oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_RECORDS_RAW_OID,
161        variant: LogVariant::Differential(DifferentialLog::BatcherRecords),
162        access: vec![PUBLIC_SELECT],
163        ontology: None,
164    });
165
166pub static MZ_ARRANGEMENT_BATCHER_SIZE_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
167    name: "mz_arrangement_batcher_size_raw",
168    schema: MZ_INTROSPECTION_SCHEMA,
169    oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_SIZE_RAW_OID,
170    variant: LogVariant::Differential(DifferentialLog::BatcherSize),
171    access: vec![PUBLIC_SELECT],
172    ontology: None,
173});
174
175pub static MZ_ARRANGEMENT_BATCHER_CAPACITY_RAW: LazyLock<BuiltinLog> =
176    LazyLock::new(|| BuiltinLog {
177        name: "mz_arrangement_batcher_capacity_raw",
178        schema: MZ_INTROSPECTION_SCHEMA,
179        oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_CAPACITY_RAW_OID,
180        variant: LogVariant::Differential(DifferentialLog::BatcherCapacity),
181        access: vec![PUBLIC_SELECT],
182        ontology: None,
183    });
184
185pub static MZ_ARRANGEMENT_BATCHER_ALLOCATIONS_RAW: LazyLock<BuiltinLog> =
186    LazyLock::new(|| BuiltinLog {
187        name: "mz_arrangement_batcher_allocations_raw",
188        schema: MZ_INTROSPECTION_SCHEMA,
189        oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_ALLOCATIONS_RAW_OID,
190        variant: LogVariant::Differential(DifferentialLog::BatcherAllocations),
191        access: vec![PUBLIC_SELECT],
192        ontology: None,
193    });
194
195pub static MZ_COMPUTE_EXPORTS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
196    name: "mz_compute_exports_per_worker",
197    schema: MZ_INTROSPECTION_SCHEMA,
198    oid: oid::LOG_MZ_COMPUTE_EXPORTS_PER_WORKER_OID,
199    variant: LogVariant::Compute(ComputeLog::DataflowCurrent),
200    access: vec![PUBLIC_SELECT],
201    ontology: Some(Ontology {
202        entity_name: "compute_export_per_worker",
203        description: "Active compute exports (dataflows) present per worker.",
204        links: &const { [] },
205        column_semantic_types: &[("export_id", SemanticType::GlobalId)],
206    }),
207});
208
209pub static MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER: LazyLock<BuiltinLog> =
210    LazyLock::new(|| BuiltinLog {
211        name: "mz_compute_dataflow_global_ids_per_worker",
212        schema: MZ_INTROSPECTION_SCHEMA,
213        oid: oid::LOG_MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER_OID,
214        variant: LogVariant::Compute(ComputeLog::DataflowGlobal),
215        access: vec![PUBLIC_SELECT],
216        ontology: Some(Ontology {
217            entity_name: "dataflow_global_id_per_worker",
218            description: "Mapping from internal dataflow IDs to GlobalIds per worker.",
219            links: &const {
220                [OntologyLink {
221                    name: "global_id_of",
222                    target: "compute_export_per_worker",
223                    properties: LinkProperties::MapsTo {
224                        source_column: "global_id",
225                        target_column: "export_id",
226                        via: None,
227                        from_type: Some(SemanticType::GlobalId),
228                        to_type: Some(SemanticType::GlobalId),
229                        note: None,
230                    },
231                }]
232            },
233            column_semantic_types: &[("global_id", SemanticType::GlobalId)],
234        }),
235    });
236
237pub static MZ_CLUSTER_PROMETHEUS_METRICS: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
238    name: "mz_cluster_prometheus_metrics",
239    schema: MZ_INTROSPECTION_SCHEMA,
240    oid: oid::LOG_MZ_CLUSTER_PROMETHEUS_METRICS_OID,
241    variant: LogVariant::Compute(ComputeLog::PrometheusMetrics),
242    access: vec![PUBLIC_SELECT],
243    ontology: Some(Ontology {
244        entity_name: "cluster_prometheus_metric",
245        description: "Prometheus metrics gathered from the cluster's metrics registry.",
246        links: &const { [] },
247        column_semantic_types: &[],
248    }),
249});
250
251pub static MZ_COMPUTE_FRONTIERS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
252    name: "mz_compute_frontiers_per_worker",
253    schema: MZ_INTROSPECTION_SCHEMA,
254    oid: oid::LOG_MZ_COMPUTE_FRONTIERS_PER_WORKER_OID,
255    variant: LogVariant::Compute(ComputeLog::FrontierCurrent),
256    access: vec![PUBLIC_SELECT],
257    ontology: Some(Ontology {
258        entity_name: "compute_frontier_per_worker",
259        description: "Per-worker output frontier timestamps for each compute export.",
260        links: &const {
261            [OntologyLink {
262                name: "frontier_of",
263                target: "compute_export_per_worker",
264                properties: LinkProperties::measures_composite(
265                    "export_id",
266                    "export_id",
267                    "time",
268                    &[("worker_id", "worker_id")],
269                ),
270            }]
271        },
272        column_semantic_types: &[
273            ("export_id", SemanticType::GlobalId),
274            ("time", SemanticType::MzTimestamp),
275        ],
276    }),
277});
278
279pub static MZ_COMPUTE_IMPORT_FRONTIERS_PER_WORKER: LazyLock<BuiltinLog> =
280    LazyLock::new(|| BuiltinLog {
281        name: "mz_compute_import_frontiers_per_worker",
282        schema: MZ_INTROSPECTION_SCHEMA,
283        oid: oid::LOG_MZ_COMPUTE_IMPORT_FRONTIERS_PER_WORKER_OID,
284        variant: LogVariant::Compute(ComputeLog::ImportFrontierCurrent),
285        access: vec![PUBLIC_SELECT],
286        ontology: Some(Ontology {
287            entity_name: "compute_import_frontier_per_worker",
288            description: "Per-worker input frontier timestamps for each compute dataflow.",
289            links: &const {
290                [OntologyLink {
291                    name: "import_frontier_of",
292                    target: "compute_export_per_worker",
293                    properties: LinkProperties::fk_composite(
294                        "export_id",
295                        "export_id",
296                        Cardinality::ManyToOne,
297                        &[("worker_id", "worker_id")],
298                    ),
299                }]
300            },
301            column_semantic_types: &[
302                ("export_id", SemanticType::GlobalId),
303                ("import_id", SemanticType::GlobalId),
304                ("time", SemanticType::MzTimestamp),
305            ],
306        }),
307    });
308
309pub static MZ_COMPUTE_ERROR_COUNTS_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
310    name: "mz_compute_error_counts_raw",
311    schema: MZ_INTROSPECTION_SCHEMA,
312    oid: oid::LOG_MZ_COMPUTE_ERROR_COUNTS_RAW_OID,
313    variant: LogVariant::Compute(ComputeLog::ErrorCount),
314    access: vec![PUBLIC_SELECT],
315    ontology: None,
316});
317
318pub static MZ_COMPUTE_HYDRATION_TIMES_PER_WORKER: LazyLock<BuiltinLog> =
319    LazyLock::new(|| BuiltinLog {
320        name: "mz_compute_hydration_times_per_worker",
321        schema: MZ_INTROSPECTION_SCHEMA,
322        oid: oid::LOG_MZ_COMPUTE_HYDRATION_TIMES_PER_WORKER_OID,
323        variant: LogVariant::Compute(ComputeLog::HydrationTime),
324        access: vec![PUBLIC_SELECT],
325        ontology: Some(Ontology {
326            entity_name: "hydration_time_per_worker",
327            description: "Time in nanoseconds for each compute export to hydrate per worker.",
328            links: &const {
329                [OntologyLink {
330                    name: "hydration_time_of",
331                    target: "compute_export_per_worker",
332                    properties: LinkProperties::measures_composite(
333                        "export_id",
334                        "export_id",
335                        "time_ns",
336                        &[("worker_id", "worker_id")],
337                    ),
338                }]
339            },
340            column_semantic_types: &[("export_id", SemanticType::GlobalId)],
341        }),
342    });
343
344pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER: LazyLock<BuiltinLog> =
345    LazyLock::new(|| BuiltinLog {
346        name: "mz_compute_operator_hydration_statuses_per_worker",
347        schema: MZ_INTROSPECTION_SCHEMA,
348        oid: oid::LOG_MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER_OID,
349        variant: LogVariant::Compute(ComputeLog::OperatorHydrationStatus),
350        access: vec![PUBLIC_SELECT],
351        ontology: Some(Ontology {
352            entity_name: "hydration_status_per_worker",
353            description: "Hydration state for each LIR node per worker.",
354            links: &const {
355                [OntologyLink {
356                    name: "hydration_of",
357                    target: "compute_export_per_worker",
358                    properties: LinkProperties::fk_composite(
359                        "export_id",
360                        "export_id",
361                        Cardinality::ManyToOne,
362                        &[("worker_id", "worker_id")],
363                    ),
364                }]
365            },
366            column_semantic_types: &[("export_id", SemanticType::GlobalId)],
367        }),
368    });
369
370pub static MZ_ACTIVE_PEEKS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
371    name: "mz_active_peeks_per_worker",
372    schema: MZ_INTROSPECTION_SCHEMA,
373    oid: oid::LOG_MZ_ACTIVE_PEEKS_PER_WORKER_OID,
374    variant: LogVariant::Compute(ComputeLog::PeekCurrent),
375    access: vec![PUBLIC_SELECT],
376    ontology: Some(Ontology {
377        entity_name: "active_peek_per_worker",
378        description: "In-flight peek requests currently executing per worker.",
379        links: &const { [] },
380        column_semantic_types: &[
381            ("object_id", SemanticType::GlobalId),
382            ("time", SemanticType::MzTimestamp),
383        ],
384    }),
385});
386
387pub static MZ_COMPUTE_LIR_MAPPING_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
388    name: "mz_compute_lir_mapping_per_worker",
389    schema: MZ_INTROSPECTION_SCHEMA,
390    oid: oid::LOG_MZ_COMPUTE_LIR_MAPPING_PER_WORKER_OID,
391    variant: LogVariant::Compute(ComputeLog::LirMapping),
392    access: vec![PUBLIC_SELECT],
393    ontology: Some(Ontology {
394        entity_name: "lir_mapping_per_worker",
395        description: "Mapping from LIR node IDs to dataflow operator address ranges per worker.",
396        links: &const {
397            [OntologyLink {
398                name: "export_of",
399                target: "compute_export_per_worker",
400                properties: LinkProperties::fk_composite(
401                    "global_id",
402                    "export_id",
403                    Cardinality::ManyToOne,
404                    &[("worker_id", "worker_id")],
405                ),
406            }]
407        },
408        column_semantic_types: &[("global_id", SemanticType::GlobalId)],
409    }),
410});
411
412pub static MZ_PEEK_DURATIONS_HISTOGRAM_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
413    name: "mz_peek_durations_histogram_raw",
414    schema: MZ_INTROSPECTION_SCHEMA,
415    oid: oid::LOG_MZ_PEEK_DURATIONS_HISTOGRAM_RAW_OID,
416    variant: LogVariant::Compute(ComputeLog::PeekDuration),
417    access: vec![PUBLIC_SELECT],
418    ontology: None,
419});
420
421pub static MZ_ARRANGEMENT_HEAP_SIZE_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
422    name: "mz_arrangement_heap_size_raw",
423    schema: MZ_INTROSPECTION_SCHEMA,
424    oid: oid::LOG_MZ_ARRANGEMENT_HEAP_SIZE_RAW_OID,
425    variant: LogVariant::Compute(ComputeLog::ArrangementHeapSize),
426    access: vec![PUBLIC_SELECT],
427    ontology: None,
428});
429
430pub static MZ_ARRANGEMENT_HEAP_CAPACITY_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
431    name: "mz_arrangement_heap_capacity_raw",
432    schema: MZ_INTROSPECTION_SCHEMA,
433    oid: oid::LOG_MZ_ARRANGEMENT_HEAP_CAPACITY_RAW_OID,
434    variant: LogVariant::Compute(ComputeLog::ArrangementHeapCapacity),
435    access: vec![PUBLIC_SELECT],
436    ontology: None,
437});
438
439pub static MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW: LazyLock<BuiltinLog> =
440    LazyLock::new(|| BuiltinLog {
441        name: "mz_arrangement_heap_allocations_raw",
442        schema: MZ_INTROSPECTION_SCHEMA,
443        oid: oid::LOG_MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW_OID,
444        variant: LogVariant::Compute(ComputeLog::ArrangementHeapAllocations),
445        access: vec![PUBLIC_SELECT],
446        ontology: None,
447    });
448
449pub static MZ_MESSAGE_BATCH_COUNTS_RECEIVED_RAW: LazyLock<BuiltinLog> =
450    LazyLock::new(|| BuiltinLog {
451        name: "mz_message_batch_counts_received_raw",
452        schema: MZ_INTROSPECTION_SCHEMA,
453        oid: oid::LOG_MZ_MESSAGE_BATCH_COUNTS_RECEIVED_RAW_OID,
454        variant: LogVariant::Timely(TimelyLog::BatchesReceived),
455        access: vec![PUBLIC_SELECT],
456        ontology: None,
457    });
458
459pub static MZ_MESSAGE_BATCH_COUNTS_SENT_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
460    name: "mz_message_batch_counts_sent_raw",
461    schema: MZ_INTROSPECTION_SCHEMA,
462    oid: oid::LOG_MZ_MESSAGE_BATCH_COUNTS_SENT_RAW_OID,
463    variant: LogVariant::Timely(TimelyLog::BatchesSent),
464    access: vec![PUBLIC_SELECT],
465    ontology: None,
466});
467
468pub static MZ_MESSAGE_COUNTS_RECEIVED_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
469    name: "mz_message_counts_received_raw",
470    schema: MZ_INTROSPECTION_SCHEMA,
471    oid: oid::LOG_MZ_MESSAGE_COUNTS_RECEIVED_RAW_OID,
472    variant: LogVariant::Timely(TimelyLog::MessagesReceived),
473    access: vec![PUBLIC_SELECT],
474    ontology: None,
475});
476
477pub static MZ_MESSAGE_COUNTS_SENT_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
478    name: "mz_message_counts_sent_raw",
479    schema: MZ_INTROSPECTION_SCHEMA,
480    oid: oid::LOG_MZ_MESSAGE_COUNTS_SENT_RAW_OID,
481    variant: LogVariant::Timely(TimelyLog::MessagesSent),
482    access: vec![PUBLIC_SELECT],
483    ontology: None,
484});
485
486pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_RAW: LazyLock<BuiltinLog> =
487    LazyLock::new(|| BuiltinLog {
488        name: "mz_dataflow_operator_reachability_raw",
489        schema: MZ_INTROSPECTION_SCHEMA,
490        oid: oid::LOG_MZ_DATAFLOW_OPERATOR_REACHABILITY_RAW_OID,
491        variant: LogVariant::Timely(TimelyLog::Reachability),
492        access: vec![PUBLIC_SELECT],
493        ontology: None,
494    });
495
496pub static MZ_DATAFLOWS_PER_WORKER: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
497    name: "mz_dataflows_per_worker",
498    schema: MZ_INTROSPECTION_SCHEMA,
499    oid: oid::VIEW_MZ_DATAFLOWS_PER_WORKER_OID,
500    desc: RelationDesc::builder()
501        .with_column("id", SqlScalarType::UInt64.nullable(true))
502        .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
503        .with_column("name", SqlScalarType::String.nullable(false))
504        .finish(),
505    column_comments: BTreeMap::new(),
506    sql: "SELECT
507    addrs.address[1] AS id,
508    ops.worker_id,
509    ops.name
510FROM
511    mz_introspection.mz_dataflow_addresses_per_worker addrs,
512    mz_introspection.mz_dataflow_operators_per_worker ops
513WHERE
514    addrs.id = ops.id AND
515    addrs.worker_id = ops.worker_id AND
516    mz_catalog.list_length(addrs.address) = 1",
517    access: vec![PUBLIC_SELECT],
518    ontology: None,
519});
520
521pub static MZ_DATAFLOWS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
522    name: "mz_dataflows",
523    schema: MZ_INTROSPECTION_SCHEMA,
524    oid: oid::VIEW_MZ_DATAFLOWS_OID,
525    desc: RelationDesc::builder()
526        .with_column("id", SqlScalarType::UInt64.nullable(true))
527        .with_column("name", SqlScalarType::String.nullable(false))
528        .finish(),
529    column_comments: BTreeMap::from_iter([
530        ("id", "The ID of the dataflow."),
531        ("name", "The internal name of the dataflow."),
532    ]),
533    sql: "
534SELECT id, name
535FROM mz_introspection.mz_dataflows_per_worker
536WHERE worker_id = 0::uint8",
537    access: vec![PUBLIC_SELECT],
538    ontology: Some(Ontology {
539        entity_name: "dataflow",
540        description: "Dataflow instances",
541        links: &const { [] },
542        column_semantic_types: &[],
543    }),
544});
545
546pub static MZ_DATAFLOW_ADDRESSES: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
547    name: "mz_dataflow_addresses",
548    schema: MZ_INTROSPECTION_SCHEMA,
549    oid: oid::VIEW_MZ_DATAFLOW_ADDRESSES_OID,
550    desc: RelationDesc::builder()
551        .with_column("id", SqlScalarType::UInt64.nullable(false))
552        .with_column(
553            "address",
554            SqlScalarType::List {
555                element_type: Box::new(SqlScalarType::UInt64),
556                custom_id: None,
557            }
558            .nullable(false),
559        )
560        .with_key(vec![0])
561        .finish(),
562    column_comments: BTreeMap::from_iter([
563        (
564            "id",
565            "The ID of the channel or operator. Corresponds to `mz_dataflow_channels.id` or `mz_dataflow_operators.id`.",
566        ),
567        (
568            "address",
569            "A list of scope-local indexes indicating the path from the root to this channel or operator.",
570        ),
571    ]),
572    sql: "
573SELECT id, address
574FROM mz_introspection.mz_dataflow_addresses_per_worker
575WHERE worker_id = 0::uint8",
576    access: vec![PUBLIC_SELECT],
577    ontology: Some(Ontology {
578        entity_name: "dataflow_address",
579        description: "Address (scope path) of dataflow operators",
580        links: &const {
581            [OntologyLink {
582                name: "address_of_operator",
583                target: "dataflow_operator",
584                properties: LinkProperties::fk("id", "id", Cardinality::OneToOne),
585            }]
586        },
587        column_semantic_types: &[],
588    }),
589});
590
591pub static MZ_DATAFLOW_CHANNELS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
592    name: "mz_dataflow_channels",
593    schema: MZ_INTROSPECTION_SCHEMA,
594    oid: oid::VIEW_MZ_DATAFLOW_CHANNELS_OID,
595    desc: RelationDesc::builder()
596        .with_column("id", SqlScalarType::UInt64.nullable(false))
597        .with_column("from_index", SqlScalarType::UInt64.nullable(false))
598        .with_column("from_port", SqlScalarType::UInt64.nullable(false))
599        .with_column("to_index", SqlScalarType::UInt64.nullable(false))
600        .with_column("to_port", SqlScalarType::UInt64.nullable(false))
601        .with_column("type", SqlScalarType::String.nullable(false))
602        .with_key(vec![0])
603        .finish(),
604    column_comments: BTreeMap::from_iter([
605        ("id", "The ID of the channel."),
606        (
607            "from_index",
608            "The scope-local index of the source operator. Corresponds to `mz_dataflow_addresses.address`.",
609        ),
610        ("from_port", "The source operator's output port."),
611        (
612            "to_index",
613            "The scope-local index of the target operator. Corresponds to `mz_dataflow_addresses.address`.",
614        ),
615        ("to_port", "The target operator's input port."),
616        ("type", "The container type of the channel."),
617    ]),
618    sql: "
619SELECT id, from_index, from_port, to_index, to_port, type
620FROM mz_introspection.mz_dataflow_channels_per_worker
621WHERE worker_id = 0::uint8",
622    access: vec![PUBLIC_SELECT],
623    ontology: Some(Ontology {
624        entity_name: "dataflow_channel",
625        description: "Communication channels between operators",
626        links: &const {
627            [OntologyLink {
628                name: "channel_in_dataflow",
629                target: "dataflow",
630                properties: LinkProperties::MapsTo {
631                    source_column: "from_index",
632                    target_column: "id",
633                    via: Some("mz_introspection.mz_dataflow_operator_dataflows"),
634                    from_type: None,
635                    to_type: None,
636                    note: Some(
637                        "Channels do not have a direct dataflow_id. Use mz_dataflow_addresses to find the parent scope, then correlate with mz_dataflow_operator_dataflows.",
638                    ),
639                },
640            }]
641        },
642        column_semantic_types: &[],
643    }),
644});
645
646pub static MZ_DATAFLOW_OPERATORS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
647    name: "mz_dataflow_operators",
648    schema: MZ_INTROSPECTION_SCHEMA,
649    oid: oid::VIEW_MZ_DATAFLOW_OPERATORS_OID,
650    desc: RelationDesc::builder()
651        .with_column("id", SqlScalarType::UInt64.nullable(false))
652        .with_column("name", SqlScalarType::String.nullable(false))
653        .with_key(vec![0])
654        .finish(),
655    column_comments: BTreeMap::from_iter([
656        ("id", "The ID of the operator."),
657        ("name", "The internal name of the operator."),
658    ]),
659    sql: "
660SELECT id, name
661FROM mz_introspection.mz_dataflow_operators_per_worker
662WHERE worker_id = 0::uint8",
663    access: vec![PUBLIC_SELECT],
664    ontology: Some(Ontology {
665        entity_name: "dataflow_operator",
666        description: "Operators within dataflows",
667        links: &const { [] },
668        column_semantic_types: &[],
669    }),
670});
671
672pub static MZ_DATAFLOW_GLOBAL_IDS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
673    name: "mz_dataflow_global_ids",
674    schema: MZ_INTROSPECTION_SCHEMA,
675    oid: oid::VIEW_MZ_DATAFLOW_GLOBAL_IDS_OID,
676    desc: RelationDesc::builder()
677        .with_column("id", SqlScalarType::UInt64.nullable(false))
678        .with_column("global_id", SqlScalarType::String.nullable(false))
679        .with_key(vec![0, 1])
680        .finish(),
681    column_comments: BTreeMap::from_iter([
682        ("id", "The dataflow ID."),
683        ("global_id", "A global ID associated with that dataflow."),
684    ]),
685    sql: "
686SELECT id, global_id
687FROM mz_introspection.mz_compute_dataflow_global_ids_per_worker
688WHERE worker_id = 0::uint8",
689    access: vec![PUBLIC_SELECT],
690    ontology: None,
691});
692
693pub static MZ_MAPPABLE_OBJECTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
694    name: "mz_mappable_objects",
695    schema: MZ_INTROSPECTION_SCHEMA,
696    oid: oid::VIEW_MZ_MAPPABLE_OBJECTS_OID,
697    desc: RelationDesc::builder()
698        .with_column("name", SqlScalarType::String.nullable(false))
699        .with_column("global_id", SqlScalarType::String.nullable(false))
700        .finish(),
701    column_comments: BTreeMap::from_iter([
702        (
703            "name",
704            "The name of the object. This name is unquoted, and you might need to call `quote_ident` if you want to reference the name shown here.",
705        ),
706        ("global_id", "The global ID of the object."),
707    ]),
708    sql: "
709SELECT COALESCE(md.name || '.', '') || ms.name || '.' || mo.name AS name, mgi.global_id AS global_id
710FROM      mz_catalog.mz_objects mo
711          JOIN mz_introspection.mz_compute_exports mce ON (mo.id = mce.export_id)
712          JOIN mz_catalog.mz_schemas ms ON (mo.schema_id = ms.id)
713          JOIN mz_introspection.mz_dataflow_global_ids mgi ON (mce.dataflow_id = mgi.id)
714     LEFT JOIN mz_catalog.mz_databases md ON (ms.database_id = md.id);",
715    access: vec![PUBLIC_SELECT],
716    ontology: Some(Ontology {
717        entity_name: "mappable_object",
718        description: "Objects that can be mapped to dataflow operators",
719        links: &const { [] },
720        column_semantic_types: &[("global_id", SemanticType::GlobalId)],
721    }),
722});
723
724pub static MZ_LIR_MAPPING: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
725    name: "mz_lir_mapping",
726    schema: MZ_INTROSPECTION_SCHEMA,
727    oid: oid::VIEW_MZ_LIR_MAPPING_OID,
728    desc: RelationDesc::builder()
729        .with_column("global_id", SqlScalarType::String.nullable(false))
730        .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
731        .with_column("operator", SqlScalarType::String.nullable(false))
732        .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
733        .with_column("nesting", SqlScalarType::UInt16.nullable(false))
734        .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
735        .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
736        .with_key(vec![0, 1])
737        .finish(),
738    column_comments: BTreeMap::from_iter([
739        ("global_id", "The global ID."),
740        ("lir_id", "The LIR node ID."),
741        (
742            "operator",
743            "The LIR operator, in the format `OperatorName INPUTS [OPTIONS]`.",
744        ),
745        (
746            "parent_lir_id",
747            "The parent of this LIR node. May be `NULL`.",
748        ),
749        ("nesting", "The nesting level of this LIR node."),
750        (
751            "operator_id_start",
752            "The first dataflow operator ID implementing this LIR operator (inclusive).",
753        ),
754        (
755            "operator_id_end",
756            "The first dataflow operator ID _after_ this LIR operator (exclusive).",
757        ),
758    ]),
759    sql: "
760SELECT global_id, lir_id, operator, parent_lir_id, nesting, operator_id_start, operator_id_end
761FROM mz_introspection.mz_compute_lir_mapping_per_worker
762WHERE worker_id = 0::uint8",
763    access: vec![PUBLIC_SELECT],
764    ontology: Some(Ontology {
765        entity_name: "lir_mapping",
766        description: "LIR (low-level IR) to dataflow operator mapping",
767        links: &const { [] },
768        column_semantic_types: &[("global_id", SemanticType::GlobalId)],
769    }),
770});
771
772pub static MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER: LazyLock<BuiltinView> =
773    LazyLock::new(|| BuiltinView {
774        name: "mz_dataflow_operator_dataflows_per_worker",
775        schema: MZ_INTROSPECTION_SCHEMA,
776        oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER_OID,
777        desc: RelationDesc::builder()
778            .with_column("id", SqlScalarType::UInt64.nullable(false))
779            .with_column("name", SqlScalarType::String.nullable(false))
780            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
781            .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
782            .with_column("dataflow_name", SqlScalarType::String.nullable(false))
783            .finish(),
784        column_comments: BTreeMap::new(),
785        sql: "SELECT
786    ops.id,
787    ops.name,
788    ops.worker_id,
789    dfs.id as dataflow_id,
790    dfs.name as dataflow_name
791FROM
792    mz_introspection.mz_dataflow_operators_per_worker ops,
793    mz_introspection.mz_dataflow_addresses_per_worker addrs,
794    mz_introspection.mz_dataflows_per_worker dfs
795WHERE
796    ops.id = addrs.id AND
797    ops.worker_id = addrs.worker_id AND
798    dfs.id = addrs.address[1] AND
799    dfs.worker_id = addrs.worker_id",
800        access: vec![PUBLIC_SELECT],
801        ontology: None,
802    });
803
804pub static MZ_DATAFLOW_OPERATOR_DATAFLOWS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
805    name: "mz_dataflow_operator_dataflows",
806    schema: MZ_INTROSPECTION_SCHEMA,
807    oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_DATAFLOWS_OID,
808    desc: RelationDesc::builder()
809        .with_column("id", SqlScalarType::UInt64.nullable(false))
810        .with_column("name", SqlScalarType::String.nullable(false))
811        .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
812        .with_column("dataflow_name", SqlScalarType::String.nullable(false))
813        .finish(),
814    column_comments: BTreeMap::from_iter([
815        (
816            "id",
817            "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
818        ),
819        ("name", "The internal name of the operator."),
820        (
821            "dataflow_id",
822            "The ID of the dataflow hosting the operator. Corresponds to `mz_dataflows.id`.",
823        ),
824        (
825            "dataflow_name",
826            "The internal name of the dataflow hosting the operator.",
827        ),
828    ]),
829    sql: "
830SELECT id, name, dataflow_id, dataflow_name
831FROM mz_introspection.mz_dataflow_operator_dataflows_per_worker
832WHERE worker_id = 0::uint8",
833    access: vec![PUBLIC_SELECT],
834    ontology: Some(Ontology {
835        entity_name: "dataflow_operator_dataflow",
836        description: "Mapping of operators to their parent dataflow",
837        links: &const {
838            [OntologyLink {
839                name: "operator_in_dataflow",
840                target: "dataflow",
841                properties: LinkProperties::fk("dataflow_id", "id", Cardinality::ManyToOne),
842            }]
843        },
844        column_semantic_types: &[],
845    }),
846});
847
848pub static MZ_COMPUTE_EXPORTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
849    name: "mz_compute_exports",
850    schema: MZ_INTROSPECTION_SCHEMA,
851    oid: oid::VIEW_MZ_COMPUTE_EXPORTS_OID,
852    desc: RelationDesc::builder()
853        .with_column("export_id", SqlScalarType::String.nullable(false))
854        .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
855        .with_key(vec![0])
856        .finish(),
857    column_comments: BTreeMap::from_iter([
858        (
859            "export_id",
860            "The ID of the index, materialized view, or subscription exported by the dataflow. Corresponds to `mz_catalog.mz_indexes.id`, `mz_catalog.mz_materialized_views.id`, or `mz_internal.mz_subscriptions.id`.",
861        ),
862        (
863            "dataflow_id",
864            "The ID of the dataflow. Corresponds to `mz_dataflows.id`.",
865        ),
866    ]),
867    sql: "
868SELECT export_id, dataflow_id
869FROM mz_introspection.mz_compute_exports_per_worker
870WHERE worker_id = 0::uint8",
871    access: vec![PUBLIC_SELECT],
872    ontology: Some(Ontology {
873        entity_name: "compute_export",
874        description: "Compute exports (maintained collections)",
875        links: &const {
876            [
877                OntologyLink {
878                    name: "export_of",
879                    target: "object",
880                    properties: LinkProperties::fk_mapped(
881                        "export_id",
882                        "id",
883                        Cardinality::ManyToOne,
884                        mz_repr::SemanticType::GlobalId,
885                        "mz_internal.mz_object_global_ids",
886                    ),
887                },
888                OntologyLink {
889                    name: "introspection_uses_global_id",
890                    target: "object_global_id",
891                    properties: LinkProperties::MapsTo {
892                        source_column: "export_id",
893                        target_column: "global_id",
894                        via: None,
895                        from_type: None,
896                        to_type: None,
897                        note: Some(
898                            "mz_introspection tables use GlobalId. To join with mz_catalog tables (which use CatalogItemId), go through mz_internal.mz_object_global_ids.",
899                        ),
900                    },
901                },
902            ]
903        },
904        column_semantic_types: &[("export_id", SemanticType::GlobalId)],
905    }),
906});
907
908pub static MZ_COMPUTE_FRONTIERS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
909    name: "mz_compute_frontiers",
910    schema: MZ_INTROSPECTION_SCHEMA,
911    oid: oid::VIEW_MZ_COMPUTE_FRONTIERS_OID,
912    desc: RelationDesc::builder()
913        .with_column("export_id", SqlScalarType::String.nullable(false))
914        .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
915        .with_key(vec![0])
916        .finish(),
917    column_comments: BTreeMap::from_iter([
918        (
919            "export_id",
920            "The ID of the dataflow export. Corresponds to `mz_compute_exports.export_id`.",
921        ),
922        (
923            "time",
924            "The next timestamp at which the dataflow output may change.",
925        ),
926    ]),
927    sql: "SELECT
928    export_id, pg_catalog.min(time) AS time
929FROM mz_introspection.mz_compute_frontiers_per_worker
930GROUP BY export_id",
931    access: vec![PUBLIC_SELECT],
932    ontology: Some(Ontology {
933        entity_name: "compute_frontier",
934        description: "Per-replica compute frontiers",
935        links: &const {
936            [OntologyLink {
937                name: "compute_frontier_of",
938                target: "object",
939                properties: LinkProperties::fk_mapped(
940                    "export_id",
941                    "id",
942                    Cardinality::ManyToOne,
943                    mz_repr::SemanticType::GlobalId,
944                    "mz_internal.mz_object_global_ids",
945                ),
946            }]
947        },
948        column_semantic_types: &const {
949            [
950                ("export_id", SemanticType::GlobalId),
951                ("time", SemanticType::MzTimestamp),
952            ]
953        },
954    }),
955});
956
957pub static MZ_DATAFLOW_CHANNEL_OPERATORS_PER_WORKER: LazyLock<BuiltinView> =
958    LazyLock::new(|| BuiltinView {
959        name: "mz_dataflow_channel_operators_per_worker",
960        schema: MZ_INTROSPECTION_SCHEMA,
961        oid: oid::VIEW_MZ_DATAFLOW_CHANNEL_OPERATORS_PER_WORKER_OID,
962        desc: RelationDesc::builder()
963            .with_column("id", SqlScalarType::UInt64.nullable(false))
964            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
965            .with_column("from_operator_id", SqlScalarType::UInt64.nullable(true))
966            .with_column(
967                "from_operator_address",
968                SqlScalarType::List {
969                    element_type: Box::new(SqlScalarType::UInt64),
970                    custom_id: None,
971                }
972                .nullable(false),
973            )
974            .with_column("to_operator_id", SqlScalarType::UInt64.nullable(true))
975            .with_column(
976                "to_operator_address",
977                SqlScalarType::List {
978                    element_type: Box::new(SqlScalarType::UInt64),
979                    custom_id: None,
980                }
981                .nullable(false),
982            )
983            .with_column("type", SqlScalarType::String.nullable(false))
984            .finish(),
985        column_comments: BTreeMap::new(),
986        sql: "
987WITH
988channel_addresses(id, worker_id, address, from_index, to_index, type) AS (
989     SELECT id, worker_id, address, from_index, to_index, type
990     FROM mz_introspection.mz_dataflow_channels_per_worker mdc
991     INNER JOIN mz_introspection.mz_dataflow_addresses_per_worker mda
992     USING (id, worker_id)
993),
994channel_operator_addresses(id, worker_id, from_address, to_address, type) AS (
995     SELECT id, worker_id,
996            address || from_index AS from_address,
997            address || to_index AS to_address,
998            type
999     FROM channel_addresses
1000),
1001operator_addresses(id, worker_id, address) AS (
1002     SELECT id, worker_id, address
1003     FROM mz_introspection.mz_dataflow_addresses_per_worker mda
1004     INNER JOIN mz_introspection.mz_dataflow_operators_per_worker mdo
1005     USING (id, worker_id)
1006)
1007SELECT coa.id,
1008       coa.worker_id,
1009       from_ops.id AS from_operator_id,
1010       coa.from_address AS from_operator_address,
1011       to_ops.id AS to_operator_id,
1012       coa.to_address AS to_operator_address,
1013       coa.type
1014FROM channel_operator_addresses coa
1015     LEFT OUTER JOIN operator_addresses from_ops
1016          ON coa.from_address = from_ops.address AND
1017             coa.worker_id = from_ops.worker_id
1018     LEFT OUTER JOIN operator_addresses to_ops
1019          ON coa.to_address = to_ops.address AND
1020             coa.worker_id = to_ops.worker_id
1021",
1022        access: vec![PUBLIC_SELECT],
1023        ontology: None,
1024    });
1025
1026pub static MZ_DATAFLOW_CHANNEL_OPERATORS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1027    name: "mz_dataflow_channel_operators",
1028    schema: MZ_INTROSPECTION_SCHEMA,
1029    oid: oid::VIEW_MZ_DATAFLOW_CHANNEL_OPERATORS_OID,
1030    desc: RelationDesc::builder()
1031        .with_column("id", SqlScalarType::UInt64.nullable(false))
1032        .with_column("from_operator_id", SqlScalarType::UInt64.nullable(true))
1033        .with_column(
1034            "from_operator_address",
1035            SqlScalarType::List {
1036                element_type: Box::new(SqlScalarType::UInt64),
1037                custom_id: None,
1038            }
1039            .nullable(false),
1040        )
1041        .with_column("to_operator_id", SqlScalarType::UInt64.nullable(true))
1042        .with_column(
1043            "to_operator_address",
1044            SqlScalarType::List {
1045                element_type: Box::new(SqlScalarType::UInt64),
1046                custom_id: None,
1047            }
1048            .nullable(false),
1049        )
1050        .with_column("type", SqlScalarType::String.nullable(false))
1051        .finish(),
1052    column_comments: BTreeMap::from_iter([
1053        (
1054            "id",
1055            "The ID of the channel. Corresponds to `mz_dataflow_channels.id`.",
1056        ),
1057        (
1058            "from_operator_id",
1059            "The ID of the source of the channel. Corresponds to `mz_dataflow_operators.id`.",
1060        ),
1061        (
1062            "from_operator_address",
1063            "The address of the source of the channel. Corresponds to `mz_dataflow_addresses.address`.",
1064        ),
1065        (
1066            "to_operator_id",
1067            "The ID of the target of the channel. Corresponds to `mz_dataflow_operators.id`.",
1068        ),
1069        (
1070            "to_operator_address",
1071            "The address of the target of the channel. Corresponds to `mz_dataflow_addresses.address`.",
1072        ),
1073        ("type", "The container type of the channel."),
1074    ]),
1075    sql: "
1076SELECT id, from_operator_id, from_operator_address, to_operator_id, to_operator_address, type
1077FROM mz_introspection.mz_dataflow_channel_operators_per_worker
1078WHERE worker_id = 0::uint8",
1079    access: vec![PUBLIC_SELECT],
1080    ontology: None,
1081});
1082
1083pub static MZ_COMPUTE_IMPORT_FRONTIERS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1084    name: "mz_compute_import_frontiers",
1085    schema: MZ_INTROSPECTION_SCHEMA,
1086    oid: oid::VIEW_MZ_COMPUTE_IMPORT_FRONTIERS_OID,
1087    desc: RelationDesc::builder()
1088        .with_column("export_id", SqlScalarType::String.nullable(false))
1089        .with_column("import_id", SqlScalarType::String.nullable(false))
1090        .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
1091        .with_key(vec![0, 1])
1092        .finish(),
1093    column_comments: BTreeMap::from_iter([
1094        (
1095            "export_id",
1096            "The ID of the dataflow export. Corresponds to `mz_compute_exports.export_id`.",
1097        ),
1098        (
1099            "import_id",
1100            "The ID of the dataflow import. Corresponds to `mz_catalog.mz_sources.id` or `mz_catalog.mz_tables.id` or `mz_compute_exports.export_id`.",
1101        ),
1102        (
1103            "time",
1104            "The next timestamp at which the dataflow input may change.",
1105        ),
1106    ]),
1107    sql: "SELECT
1108    export_id, import_id, pg_catalog.min(time) AS time
1109FROM mz_introspection.mz_compute_import_frontiers_per_worker
1110GROUP BY export_id, import_id",
1111    access: vec![PUBLIC_SELECT],
1112    ontology: Some(Ontology {
1113        entity_name: "compute_import_frontier",
1114        description: "Import frontiers for compute dependencies",
1115        links: &const {
1116            [OntologyLink {
1117                name: "compute_import_frontier_of",
1118                target: "object",
1119                properties: LinkProperties::fk_mapped(
1120                    "export_id",
1121                    "id",
1122                    Cardinality::ManyToOne,
1123                    mz_repr::SemanticType::GlobalId,
1124                    "mz_internal.mz_object_global_ids",
1125                ),
1126            }]
1127        },
1128        column_semantic_types: &const {
1129            [
1130                ("export_id", SemanticType::GlobalId),
1131                ("import_id", SemanticType::GlobalId),
1132                ("time", SemanticType::MzTimestamp),
1133            ]
1134        },
1135    }),
1136});
1137
1138pub static MZ_RECORDS_PER_DATAFLOW_OPERATOR_PER_WORKER: LazyLock<BuiltinView> =
1139    LazyLock::new(|| BuiltinView {
1140        name: "mz_records_per_dataflow_operator_per_worker",
1141        schema: MZ_INTROSPECTION_SCHEMA,
1142        oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_OPERATOR_PER_WORKER_OID,
1143        desc: RelationDesc::builder()
1144            .with_column("id", SqlScalarType::UInt64.nullable(false))
1145            .with_column("name", SqlScalarType::String.nullable(false))
1146            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1147            .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
1148            .with_column("records", SqlScalarType::Int64.nullable(true))
1149            .with_column("batches", SqlScalarType::Int64.nullable(true))
1150            .with_column("size", SqlScalarType::Int64.nullable(true))
1151            .with_column("capacity", SqlScalarType::Int64.nullable(true))
1152            .with_column("allocations", SqlScalarType::Int64.nullable(true))
1153            .finish(),
1154        column_comments: BTreeMap::new(),
1155        sql: "
1156SELECT
1157    dod.id,
1158    dod.name,
1159    dod.worker_id,
1160    dod.dataflow_id,
1161    ar_size.records AS records,
1162    ar_size.batches AS batches,
1163    ar_size.size AS size,
1164    ar_size.capacity AS capacity,
1165    ar_size.allocations AS allocations
1166FROM
1167    mz_introspection.mz_dataflow_operator_dataflows_per_worker dod
1168    LEFT OUTER JOIN mz_introspection.mz_arrangement_sizes_per_worker ar_size ON
1169        dod.id = ar_size.operator_id AND
1170        dod.worker_id = ar_size.worker_id",
1171        access: vec![PUBLIC_SELECT],
1172        ontology: None,
1173    });
1174
1175pub static MZ_RECORDS_PER_DATAFLOW_OPERATOR: LazyLock<BuiltinView> =
1176    LazyLock::new(|| BuiltinView {
1177        name: "mz_records_per_dataflow_operator",
1178        schema: MZ_INTROSPECTION_SCHEMA,
1179        oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_OPERATOR_OID,
1180        desc: RelationDesc::builder()
1181            .with_column("id", SqlScalarType::UInt64.nullable(false))
1182            .with_column("name", SqlScalarType::String.nullable(false))
1183            .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
1184            .with_column("records", SqlScalarType::Int64.nullable(true))
1185            .with_column("batches", SqlScalarType::Int64.nullable(true))
1186            .with_column("size", SqlScalarType::Int64.nullable(true))
1187            .with_column("capacity", SqlScalarType::Int64.nullable(true))
1188            .with_column("allocations", SqlScalarType::Int64.nullable(true))
1189            .with_key(vec![0, 1, 2])
1190            .finish(),
1191        column_comments: BTreeMap::from_iter([
1192            (
1193                "id",
1194                "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
1195            ),
1196            ("name", "The internal name of the operator."),
1197            (
1198                "dataflow_id",
1199                "The ID of the dataflow. Corresponds to `mz_dataflows.id`.",
1200            ),
1201            ("records", "The number of records in the operator."),
1202            ("batches", "The number of batches in the dataflow."),
1203            ("size", "The utilized size in bytes of the arrangement."),
1204            (
1205                "capacity",
1206                "The capacity in bytes of the arrangement. Can be larger than the size.",
1207            ),
1208            (
1209                "allocations",
1210                "The number of separate memory allocations backing the arrangement.",
1211            ),
1212        ]),
1213        sql: "
1214SELECT
1215    id,
1216    name,
1217    dataflow_id,
1218    SUM(records)::int8 AS records,
1219    SUM(batches)::int8 AS batches,
1220    SUM(size)::int8 AS size,
1221    SUM(capacity)::int8 AS capacity,
1222    SUM(allocations)::int8 AS allocations
1223FROM mz_introspection.mz_records_per_dataflow_operator_per_worker
1224GROUP BY id, name, dataflow_id",
1225        access: vec![PUBLIC_SELECT],
1226        ontology: None,
1227    });
1228
1229pub static MZ_RECORDS_PER_DATAFLOW_PER_WORKER: LazyLock<BuiltinView> =
1230    LazyLock::new(|| BuiltinView {
1231        name: "mz_records_per_dataflow_per_worker",
1232        schema: MZ_INTROSPECTION_SCHEMA,
1233        oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_PER_WORKER_OID,
1234        desc: RelationDesc::builder()
1235            .with_column("id", SqlScalarType::UInt64.nullable(false))
1236            .with_column("name", SqlScalarType::String.nullable(false))
1237            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1238            .with_column("records", SqlScalarType::Int64.nullable(true))
1239            .with_column("batches", SqlScalarType::Int64.nullable(true))
1240            .with_column("size", SqlScalarType::Int64.nullable(true))
1241            .with_column("capacity", SqlScalarType::Int64.nullable(true))
1242            .with_column("allocations", SqlScalarType::Int64.nullable(true))
1243            .with_key(vec![0, 1, 2])
1244            .finish(),
1245        column_comments: BTreeMap::new(),
1246        sql: "
1247SELECT
1248    rdo.dataflow_id as id,
1249    dfs.name,
1250    rdo.worker_id,
1251    SUM(rdo.records)::int8 as records,
1252    SUM(rdo.batches)::int8 as batches,
1253    SUM(rdo.size)::int8 as size,
1254    SUM(rdo.capacity)::int8 as capacity,
1255    SUM(rdo.allocations)::int8 as allocations
1256FROM
1257    mz_introspection.mz_records_per_dataflow_operator_per_worker rdo,
1258    mz_introspection.mz_dataflows_per_worker dfs
1259WHERE
1260    rdo.dataflow_id = dfs.id AND
1261    rdo.worker_id = dfs.worker_id
1262GROUP BY
1263    rdo.dataflow_id,
1264    dfs.name,
1265    rdo.worker_id",
1266        access: vec![PUBLIC_SELECT],
1267        ontology: None,
1268    });
1269
1270pub static MZ_RECORDS_PER_DATAFLOW: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1271    name: "mz_records_per_dataflow",
1272    schema: MZ_INTROSPECTION_SCHEMA,
1273    oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_OID,
1274    desc: RelationDesc::builder()
1275        .with_column("id", SqlScalarType::UInt64.nullable(false))
1276        .with_column("name", SqlScalarType::String.nullable(false))
1277        .with_column("records", SqlScalarType::Int64.nullable(true))
1278        .with_column("batches", SqlScalarType::Int64.nullable(true))
1279        .with_column("size", SqlScalarType::Int64.nullable(true))
1280        .with_column("capacity", SqlScalarType::Int64.nullable(true))
1281        .with_column("allocations", SqlScalarType::Int64.nullable(true))
1282        .with_key(vec![0, 1])
1283        .finish(),
1284    column_comments: BTreeMap::from_iter([
1285        (
1286            "id",
1287            "The ID of the dataflow. Corresponds to `mz_dataflows.id`.",
1288        ),
1289        ("name", "The internal name of the dataflow."),
1290        ("records", "The number of records in the dataflow."),
1291        ("batches", "The number of batches in the dataflow."),
1292        ("size", "The utilized size in bytes of the arrangements."),
1293        (
1294            "capacity",
1295            "The capacity in bytes of the arrangements. Can be larger than the size.",
1296        ),
1297        (
1298            "allocations",
1299            "The number of separate memory allocations backing the arrangements.",
1300        ),
1301    ]),
1302    sql: "
1303SELECT
1304    id,
1305    name,
1306    SUM(records)::int8 as records,
1307    SUM(batches)::int8 as batches,
1308    SUM(size)::int8 as size,
1309    SUM(capacity)::int8 as capacity,
1310    SUM(allocations)::int8 as allocations
1311FROM
1312    mz_introspection.mz_records_per_dataflow_per_worker
1313GROUP BY
1314    id,
1315    name",
1316    access: vec![PUBLIC_SELECT],
1317    ontology: Some(Ontology {
1318        entity_name: "records_per_dataflow",
1319        description: "Record counts aggregated per dataflow",
1320        links: &const {
1321            [OntologyLink {
1322                name: "details_of",
1323                target: "dataflow",
1324                properties: LinkProperties::fk("id", "id", Cardinality::OneToOne),
1325            }]
1326        },
1327        column_semantic_types: &[],
1328    }),
1329});
1330
1331pub static MZ_PEEK_DURATIONS_HISTOGRAM_PER_WORKER: LazyLock<BuiltinView> =
1332    LazyLock::new(|| BuiltinView {
1333        name: "mz_peek_durations_histogram_per_worker",
1334        schema: MZ_INTROSPECTION_SCHEMA,
1335        oid: oid::VIEW_MZ_PEEK_DURATIONS_HISTOGRAM_PER_WORKER_OID,
1336        desc: RelationDesc::builder()
1337            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1338            .with_column("type", SqlScalarType::String.nullable(false))
1339            .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1340            .with_column("count", SqlScalarType::Int64.nullable(false))
1341            .with_key(vec![0, 1, 2])
1342            .finish(),
1343        column_comments: BTreeMap::new(),
1344        sql: "SELECT
1345    worker_id, type, duration_ns, pg_catalog.count(*) AS count
1346FROM
1347    mz_introspection.mz_peek_durations_histogram_raw
1348GROUP BY
1349    worker_id, type, duration_ns",
1350        access: vec![PUBLIC_SELECT],
1351        ontology: None,
1352    });
1353
1354pub static MZ_PEEK_DURATIONS_HISTOGRAM: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1355    name: "mz_peek_durations_histogram",
1356    schema: MZ_INTROSPECTION_SCHEMA,
1357    oid: oid::VIEW_MZ_PEEK_DURATIONS_HISTOGRAM_OID,
1358    desc: RelationDesc::builder()
1359        .with_column("type", SqlScalarType::String.nullable(false))
1360        .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1361        .with_column(
1362            "count",
1363            SqlScalarType::Numeric {
1364                max_scale: Some(NumericMaxScale::ZERO),
1365            }
1366            .nullable(false),
1367        )
1368        .with_key(vec![0, 1])
1369        .finish(),
1370    column_comments: BTreeMap::from_iter([
1371        ("type", "The peek variant: `index` or `persist`."),
1372        (
1373            "duration_ns",
1374            "The upper bound of the bucket in nanoseconds.",
1375        ),
1376        (
1377            "count",
1378            "The (noncumulative) count of peeks in this bucket.",
1379        ),
1380    ]),
1381    sql: "
1382SELECT
1383    type, duration_ns,
1384    pg_catalog.sum(count) AS count
1385FROM mz_introspection.mz_peek_durations_histogram_per_worker
1386GROUP BY type, duration_ns",
1387    access: vec![PUBLIC_SELECT],
1388    ontology: Some(Ontology {
1389        entity_name: "peek_duration",
1390        description: "Histogram of SELECT query durations",
1391        links: &const { [] },
1392        column_semantic_types: &[],
1393    }),
1394});
1395
1396pub static MZ_SCHEDULING_ELAPSED_PER_WORKER: LazyLock<BuiltinView> =
1397    LazyLock::new(|| BuiltinView {
1398        name: "mz_scheduling_elapsed_per_worker",
1399        schema: MZ_INTROSPECTION_SCHEMA,
1400        oid: oid::VIEW_MZ_SCHEDULING_ELAPSED_PER_WORKER_OID,
1401        desc: RelationDesc::builder()
1402            .with_column("id", SqlScalarType::UInt64.nullable(false))
1403            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1404            .with_column("elapsed_ns", SqlScalarType::Int64.nullable(false))
1405            .with_key(vec![0, 1])
1406            .finish(),
1407        column_comments: BTreeMap::new(),
1408        sql: "SELECT
1409    id, worker_id, pg_catalog.count(*) AS elapsed_ns
1410FROM
1411    mz_introspection.mz_scheduling_elapsed_raw
1412GROUP BY
1413    id, worker_id",
1414        access: vec![PUBLIC_SELECT],
1415        ontology: None,
1416    });
1417
1418pub static MZ_SCHEDULING_ELAPSED: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1419    name: "mz_scheduling_elapsed",
1420    schema: MZ_INTROSPECTION_SCHEMA,
1421    oid: oid::VIEW_MZ_SCHEDULING_ELAPSED_OID,
1422    desc: RelationDesc::builder()
1423        .with_column("id", SqlScalarType::UInt64.nullable(false))
1424        .with_column(
1425            "elapsed_ns",
1426            SqlScalarType::Numeric {
1427                max_scale: Some(NumericMaxScale::ZERO),
1428            }
1429            .nullable(false),
1430        )
1431        .with_key(vec![0])
1432        .finish(),
1433    column_comments: BTreeMap::from_iter([
1434        (
1435            "id",
1436            "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
1437        ),
1438        (
1439            "elapsed_ns",
1440            "The total elapsed time spent in the operator in nanoseconds.",
1441        ),
1442    ]),
1443    sql: "
1444SELECT
1445    id,
1446    pg_catalog.sum(elapsed_ns) AS elapsed_ns
1447FROM mz_introspection.mz_scheduling_elapsed_per_worker
1448GROUP BY id",
1449    access: vec![PUBLIC_SELECT],
1450    ontology: Some(Ontology {
1451        entity_name: "scheduling_elapsed",
1452        description: "CPU time spent per operator",
1453        links: &const {
1454            [OntologyLink {
1455                name: "elapsed_for_operator",
1456                target: "dataflow_operator",
1457                properties: LinkProperties::measures("id", "id", "cpu_time_ns"),
1458            }]
1459        },
1460        column_semantic_types: &[],
1461    }),
1462});
1463
1464pub static MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_PER_WORKER: LazyLock<BuiltinView> =
1465    LazyLock::new(|| BuiltinView {
1466        name: "mz_compute_operator_durations_histogram_per_worker",
1467        schema: MZ_INTROSPECTION_SCHEMA,
1468        oid: oid::VIEW_MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_PER_WORKER_OID,
1469        desc: RelationDesc::builder()
1470            .with_column("id", SqlScalarType::UInt64.nullable(false))
1471            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1472            .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1473            .with_column("count", SqlScalarType::Int64.nullable(false))
1474            .with_key(vec![0, 1, 2])
1475            .finish(),
1476        column_comments: BTreeMap::new(),
1477        sql: "SELECT
1478    id, worker_id, duration_ns, pg_catalog.count(*) AS count
1479FROM
1480    mz_introspection.mz_compute_operator_durations_histogram_raw
1481GROUP BY
1482    id, worker_id, duration_ns",
1483        access: vec![PUBLIC_SELECT],
1484        ontology: None,
1485    });
1486
1487pub static MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM: LazyLock<BuiltinView> =
1488    LazyLock::new(|| BuiltinView {
1489        name: "mz_compute_operator_durations_histogram",
1490        schema: MZ_INTROSPECTION_SCHEMA,
1491        oid: oid::VIEW_MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_OID,
1492        desc: RelationDesc::builder()
1493            .with_column("id", SqlScalarType::UInt64.nullable(false))
1494            .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1495            .with_column(
1496                "count",
1497                SqlScalarType::Numeric {
1498                    max_scale: Some(NumericMaxScale::ZERO),
1499                }
1500                .nullable(false),
1501            )
1502            .with_key(vec![0, 1])
1503            .finish(),
1504        column_comments: BTreeMap::from_iter([
1505            (
1506                "id",
1507                "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
1508            ),
1509            (
1510                "duration_ns",
1511                "The upper bound of the duration bucket in nanoseconds.",
1512            ),
1513            (
1514                "count",
1515                "The (noncumulative) count of invocations in the bucket.",
1516            ),
1517        ]),
1518        sql: "
1519SELECT
1520    id,
1521    duration_ns,
1522    pg_catalog.sum(count) AS count
1523FROM mz_introspection.mz_compute_operator_durations_histogram_per_worker
1524GROUP BY id, duration_ns",
1525        access: vec![PUBLIC_SELECT],
1526        ontology: None,
1527    });
1528
1529pub static MZ_SCHEDULING_PARKS_HISTOGRAM_PER_WORKER: LazyLock<BuiltinView> =
1530    LazyLock::new(|| BuiltinView {
1531        name: "mz_scheduling_parks_histogram_per_worker",
1532        schema: MZ_INTROSPECTION_SCHEMA,
1533        oid: oid::VIEW_MZ_SCHEDULING_PARKS_HISTOGRAM_PER_WORKER_OID,
1534        desc: RelationDesc::builder()
1535            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1536            .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
1537            .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
1538            .with_column("count", SqlScalarType::Int64.nullable(false))
1539            .with_key(vec![0, 1, 2])
1540            .finish(),
1541        column_comments: BTreeMap::new(),
1542        sql: "SELECT
1543    worker_id, slept_for_ns, requested_ns, pg_catalog.count(*) AS count
1544FROM
1545    mz_introspection.mz_scheduling_parks_histogram_raw
1546GROUP BY
1547    worker_id, slept_for_ns, requested_ns",
1548        access: vec![PUBLIC_SELECT],
1549        ontology: None,
1550    });
1551
1552pub static MZ_SCHEDULING_PARKS_HISTOGRAM: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1553    name: "mz_scheduling_parks_histogram",
1554    schema: MZ_INTROSPECTION_SCHEMA,
1555    oid: oid::VIEW_MZ_SCHEDULING_PARKS_HISTOGRAM_OID,
1556    desc: RelationDesc::builder()
1557        .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
1558        .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
1559        .with_column(
1560            "count",
1561            SqlScalarType::Numeric {
1562                max_scale: Some(NumericMaxScale::ZERO),
1563            }
1564            .nullable(false),
1565        )
1566        .with_key(vec![0, 1])
1567        .finish(),
1568    column_comments: BTreeMap::from_iter([
1569        (
1570            "slept_for_ns",
1571            "The actual length of the park event in nanoseconds.",
1572        ),
1573        (
1574            "requested_ns",
1575            "The requested length of the park event in nanoseconds.",
1576        ),
1577        (
1578            "count",
1579            "The (noncumulative) count of park events in this bucket.",
1580        ),
1581    ]),
1582    sql: "
1583SELECT
1584    slept_for_ns,
1585    requested_ns,
1586    pg_catalog.sum(count) AS count
1587FROM mz_introspection.mz_scheduling_parks_histogram_per_worker
1588GROUP BY slept_for_ns, requested_ns",
1589    access: vec![PUBLIC_SELECT],
1590    ontology: Some(Ontology {
1591        entity_name: "scheduling_parks",
1592        description: "Histogram of operator park durations",
1593        links: &const { [] },
1594        column_semantic_types: &[],
1595    }),
1596});
1597
1598pub static MZ_COMPUTE_ERROR_COUNTS_PER_WORKER: LazyLock<BuiltinView> =
1599    LazyLock::new(|| BuiltinView {
1600        name: "mz_compute_error_counts_per_worker",
1601        schema: MZ_INTROSPECTION_SCHEMA,
1602        oid: oid::VIEW_MZ_COMPUTE_ERROR_COUNTS_PER_WORKER_OID,
1603        desc: RelationDesc::builder()
1604            .with_column("export_id", SqlScalarType::String.nullable(false))
1605            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1606            .with_column("count", SqlScalarType::Int64.nullable(false))
1607            .with_key(vec![0, 1, 2])
1608            .finish(),
1609        column_comments: BTreeMap::new(),
1610        sql: "
1611WITH MUTUALLY RECURSIVE
1612    -- Indexes that reuse existing indexes rather than maintaining separate dataflows.
1613    -- For these we don't log error counts separately, so we need to forward the error counts from
1614    -- their dependencies instead.
1615    index_reuses(reuse_id text, index_id text) AS (
1616        SELECT d.object_id, d.dependency_id
1617        FROM mz_internal.mz_compute_dependencies d
1618        JOIN mz_introspection.mz_compute_exports e ON (e.export_id = d.object_id)
1619        WHERE NOT EXISTS (
1620            SELECT 1 FROM mz_introspection.mz_dataflows
1621            WHERE id = e.dataflow_id
1622        )
1623    ),
1624    -- Error counts that were directly logged on compute exports.
1625    direct_errors(export_id text, worker_id uint8, count int8) AS (
1626        SELECT export_id, worker_id, count
1627        FROM mz_introspection.mz_compute_error_counts_raw
1628    ),
1629    -- Error counts propagated to index reused.
1630    all_errors(export_id text, worker_id uint8, count int8) AS (
1631        SELECT * FROM direct_errors
1632        UNION
1633        SELECT r.reuse_id, e.worker_id, e.count
1634        FROM all_errors e
1635        JOIN index_reuses r ON (r.index_id = e.export_id)
1636    )
1637SELECT * FROM all_errors",
1638        access: vec![PUBLIC_SELECT],
1639        ontology: Some(Ontology {
1640            entity_name: "compute_error_per_worker",
1641            description: "Error counts per compute collection per worker.",
1642            links: &const {
1643                [OntologyLink {
1644                    name: "errors_in",
1645                    target: "compute_export_per_worker",
1646                    properties: LinkProperties::measures_composite(
1647                        "export_id",
1648                        "export_id",
1649                        "count",
1650                        &[("worker_id", "worker_id")],
1651                    ),
1652                }]
1653            },
1654            column_semantic_types: &[("export_id", SemanticType::GlobalId)],
1655        }),
1656    });
1657
1658pub static MZ_COMPUTE_ERROR_COUNTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1659    name: "mz_compute_error_counts",
1660    schema: MZ_INTROSPECTION_SCHEMA,
1661    oid: oid::VIEW_MZ_COMPUTE_ERROR_COUNTS_OID,
1662    desc: RelationDesc::builder()
1663        .with_column("export_id", SqlScalarType::String.nullable(false))
1664        .with_column(
1665            "count",
1666            SqlScalarType::Numeric {
1667                max_scale: Some(NumericMaxScale::ZERO),
1668            }
1669            .nullable(false),
1670        )
1671        .with_key(vec![0])
1672        .finish(),
1673    column_comments: BTreeMap::from_iter([
1674        (
1675            "export_id",
1676            "The ID of the dataflow export. Corresponds to `mz_compute_exports.export_id`.",
1677        ),
1678        (
1679            "count",
1680            "The count of errors present in this dataflow export.",
1681        ),
1682    ]),
1683    sql: "
1684SELECT
1685    export_id,
1686    pg_catalog.sum(count) AS count
1687FROM mz_introspection.mz_compute_error_counts_per_worker
1688GROUP BY export_id
1689HAVING pg_catalog.sum(count) != 0",
1690    access: vec![PUBLIC_SELECT],
1691    ontology: Some(Ontology {
1692        entity_name: "compute_error_count",
1693        description: "Error counts per compute collection",
1694        links: &const {
1695            [OntologyLink {
1696                name: "errors_in",
1697                target: "compute_export",
1698                properties: LinkProperties::fk("export_id", "export_id", Cardinality::OneToOne),
1699            }]
1700        },
1701        column_semantic_types: &[("export_id", SemanticType::GlobalId)],
1702    }),
1703});
1704
1705pub static MZ_MESSAGE_COUNTS_PER_WORKER: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1706    name: "mz_message_counts_per_worker",
1707    schema: MZ_INTROSPECTION_SCHEMA,
1708    oid: oid::VIEW_MZ_MESSAGE_COUNTS_PER_WORKER_OID,
1709    desc: RelationDesc::builder()
1710        .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
1711        .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
1712        .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
1713        .with_column("sent", SqlScalarType::Int64.nullable(false))
1714        .with_column("received", SqlScalarType::Int64.nullable(false))
1715        .with_column("batch_sent", SqlScalarType::Int64.nullable(false))
1716        .with_column("batch_received", SqlScalarType::Int64.nullable(false))
1717        .with_key(vec![0, 1, 2])
1718        .finish(),
1719    column_comments: BTreeMap::new(),
1720    sql: "
1721WITH batch_sent_cte AS (
1722    SELECT
1723        channel_id,
1724        from_worker_id,
1725        to_worker_id,
1726        pg_catalog.count(*) AS sent
1727    FROM
1728        mz_introspection.mz_message_batch_counts_sent_raw
1729    GROUP BY
1730        channel_id, from_worker_id, to_worker_id
1731),
1732batch_received_cte AS (
1733    SELECT
1734        channel_id,
1735        from_worker_id,
1736        to_worker_id,
1737        pg_catalog.count(*) AS received
1738    FROM
1739        mz_introspection.mz_message_batch_counts_received_raw
1740    GROUP BY
1741        channel_id, from_worker_id, to_worker_id
1742),
1743sent_cte AS (
1744    SELECT
1745        channel_id,
1746        from_worker_id,
1747        to_worker_id,
1748        pg_catalog.count(*) AS sent
1749    FROM
1750        mz_introspection.mz_message_counts_sent_raw
1751    GROUP BY
1752        channel_id, from_worker_id, to_worker_id
1753),
1754received_cte AS (
1755    SELECT
1756        channel_id,
1757        from_worker_id,
1758        to_worker_id,
1759        pg_catalog.count(*) AS received
1760    FROM
1761        mz_introspection.mz_message_counts_received_raw
1762    GROUP BY
1763        channel_id, from_worker_id, to_worker_id
1764)
1765SELECT
1766    sent_cte.channel_id,
1767    sent_cte.from_worker_id,
1768    sent_cte.to_worker_id,
1769    sent_cte.sent,
1770    received_cte.received,
1771    batch_sent_cte.sent AS batch_sent,
1772    batch_received_cte.received AS batch_received
1773FROM sent_cte
1774JOIN received_cte USING (channel_id, from_worker_id, to_worker_id)
1775JOIN batch_sent_cte USING (channel_id, from_worker_id, to_worker_id)
1776JOIN batch_received_cte USING (channel_id, from_worker_id, to_worker_id)",
1777    access: vec![PUBLIC_SELECT],
1778    ontology: None,
1779});
1780
1781pub static MZ_MESSAGE_COUNTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1782    name: "mz_message_counts",
1783    schema: MZ_INTROSPECTION_SCHEMA,
1784    oid: oid::VIEW_MZ_MESSAGE_COUNTS_OID,
1785    desc: RelationDesc::builder()
1786        .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
1787        .with_column(
1788            "sent",
1789            SqlScalarType::Numeric {
1790                max_scale: Some(NumericMaxScale::ZERO),
1791            }
1792            .nullable(false),
1793        )
1794        .with_column(
1795            "received",
1796            SqlScalarType::Numeric {
1797                max_scale: Some(NumericMaxScale::ZERO),
1798            }
1799            .nullable(false),
1800        )
1801        .with_column(
1802            "batch_sent",
1803            SqlScalarType::Numeric {
1804                max_scale: Some(NumericMaxScale::ZERO),
1805            }
1806            .nullable(false),
1807        )
1808        .with_column(
1809            "batch_received",
1810            SqlScalarType::Numeric {
1811                max_scale: Some(NumericMaxScale::ZERO),
1812            }
1813            .nullable(false),
1814        )
1815        .with_key(vec![0])
1816        .finish(),
1817    column_comments: BTreeMap::from_iter([
1818        (
1819            "channel_id",
1820            "The ID of the channel. Corresponds to `mz_dataflow_channels.id`.",
1821        ),
1822        ("sent", "The number of messages sent."),
1823        ("received", "The number of messages received."),
1824        ("batch_sent", "The number of batches sent."),
1825        ("batch_received", "The number of batches received."),
1826    ]),
1827    sql: "
1828SELECT
1829    channel_id,
1830    pg_catalog.sum(sent) AS sent,
1831    pg_catalog.sum(received) AS received,
1832    pg_catalog.sum(batch_sent) AS batch_sent,
1833    pg_catalog.sum(batch_received) AS batch_received
1834FROM mz_introspection.mz_message_counts_per_worker
1835GROUP BY channel_id",
1836    access: vec![PUBLIC_SELECT],
1837    ontology: Some(Ontology {
1838        entity_name: "message_count",
1839        description: "Inter-worker message counts",
1840        links: &const {
1841            [OntologyLink {
1842                name: "counts_for",
1843                target: "dataflow_channel",
1844                properties: LinkProperties::fk("channel_id", "id", Cardinality::OneToOne),
1845            }]
1846        },
1847        column_semantic_types: &[],
1848    }),
1849});
1850
1851pub static MZ_ACTIVE_PEEKS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1852    name: "mz_active_peeks",
1853    schema: MZ_INTROSPECTION_SCHEMA,
1854    oid: oid::VIEW_MZ_ACTIVE_PEEKS_OID,
1855    desc: RelationDesc::builder()
1856        .with_column("id", SqlScalarType::Uuid.nullable(false))
1857        .with_column("object_id", SqlScalarType::String.nullable(false))
1858        .with_column("type", SqlScalarType::String.nullable(false))
1859        .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
1860        .with_key(vec![0])
1861        .finish(),
1862    column_comments: BTreeMap::from_iter([
1863        ("id", "The ID of the peek request."),
1864        (
1865            "object_id",
1866            "The ID of the collection the peek is targeting. Corresponds to `mz_catalog.mz_indexes.id`, `mz_catalog.mz_materialized_views.id`, `mz_catalog.mz_sources.id`, or `mz_catalog.mz_tables.id`.",
1867        ),
1868        (
1869            "type",
1870            "The type of the corresponding peek: `index` if targeting an index or temporary dataflow; `persist` for a source, materialized view, or table.",
1871        ),
1872        ("time", "The timestamp the peek has requested."),
1873    ]),
1874    sql: "
1875SELECT id, object_id, type, time
1876FROM mz_introspection.mz_active_peeks_per_worker
1877WHERE worker_id = 0::uint8",
1878    access: vec![PUBLIC_SELECT],
1879    ontology: Some(Ontology {
1880        entity_name: "active_peek",
1881        description: "Currently executing SELECT queries",
1882        links: &const { [] },
1883        column_semantic_types: &const {
1884            [
1885                ("object_id", SemanticType::GlobalId),
1886                ("time", SemanticType::MzTimestamp),
1887            ]
1888        },
1889    }),
1890});
1891
1892pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER: LazyLock<BuiltinView> =
1893    LazyLock::new(|| BuiltinView {
1894        name: "mz_dataflow_operator_reachability_per_worker",
1895        schema: MZ_INTROSPECTION_SCHEMA,
1896        oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER_OID,
1897        desc: RelationDesc::builder()
1898            .with_column("id", SqlScalarType::UInt64.nullable(false))
1899            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1900            .with_column("port", SqlScalarType::UInt64.nullable(false))
1901            .with_column("update_type", SqlScalarType::String.nullable(false))
1902            .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
1903            .with_column("count", SqlScalarType::Int64.nullable(false))
1904            .with_key(vec![0, 1, 2, 3, 4])
1905            .finish(),
1906        column_comments: BTreeMap::new(),
1907        sql: "SELECT
1908    addr2.id,
1909    reachability.worker_id,
1910    port,
1911    update_type,
1912    time,
1913    pg_catalog.count(*) as count
1914FROM
1915    mz_introspection.mz_dataflow_operator_reachability_raw reachability,
1916    mz_introspection.mz_dataflow_addresses_per_worker addr1,
1917    mz_introspection.mz_dataflow_addresses_per_worker addr2
1918WHERE
1919    addr2.address =
1920    CASE
1921        WHEN source = 0 THEN addr1.address
1922        ELSE addr1.address || reachability.source
1923    END
1924    AND addr1.id = reachability.id
1925    AND addr1.worker_id = reachability.worker_id
1926    AND addr2.worker_id = reachability.worker_id
1927GROUP BY addr2.id, reachability.worker_id, port, update_type, time",
1928        access: vec![PUBLIC_SELECT],
1929        ontology: None,
1930    });
1931
1932pub static MZ_DATAFLOW_OPERATOR_REACHABILITY: LazyLock<BuiltinView> =
1933    LazyLock::new(|| BuiltinView {
1934        name: "mz_dataflow_operator_reachability",
1935        schema: MZ_INTROSPECTION_SCHEMA,
1936        oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_OID,
1937        desc: RelationDesc::builder()
1938            .with_column("id", SqlScalarType::UInt64.nullable(false))
1939            .with_column("port", SqlScalarType::UInt64.nullable(false))
1940            .with_column("update_type", SqlScalarType::String.nullable(false))
1941            .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
1942            .with_column(
1943                "count",
1944                SqlScalarType::Numeric {
1945                    max_scale: Some(NumericMaxScale::ZERO),
1946                }
1947                .nullable(false),
1948            )
1949            .with_key(vec![0, 1, 2, 3])
1950            .finish(),
1951        column_comments: BTreeMap::new(),
1952        sql: "
1953SELECT
1954    id,
1955    port,
1956    update_type,
1957    time,
1958    pg_catalog.sum(count) as count
1959FROM mz_introspection.mz_dataflow_operator_reachability_per_worker
1960GROUP BY id, port, update_type, time",
1961        access: vec![PUBLIC_SELECT],
1962        ontology: None,
1963    });
1964
1965pub static MZ_ARRANGEMENT_SIZES_PER_WORKER: LazyLock<BuiltinView> = LazyLock::new(|| {
1966    BuiltinView {
1967        name: "mz_arrangement_sizes_per_worker",
1968        schema: MZ_INTROSPECTION_SCHEMA,
1969        oid: oid::VIEW_MZ_ARRANGEMENT_SIZES_PER_WORKER_OID,
1970        desc: RelationDesc::builder()
1971            .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
1972            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1973            .with_column("records", SqlScalarType::Int64.nullable(true))
1974            .with_column("batches", SqlScalarType::Int64.nullable(true))
1975            .with_column("size", SqlScalarType::Int64.nullable(true))
1976            .with_column("capacity", SqlScalarType::Int64.nullable(true))
1977            .with_column("allocations", SqlScalarType::Int64.nullable(true))
1978            .finish(),
1979        column_comments: BTreeMap::new(),
1980        sql: "
1981WITH operators_per_worker_cte AS (
1982    SELECT
1983        id AS operator_id,
1984        worker_id
1985    FROM
1986        mz_introspection.mz_dataflow_operators_per_worker
1987),
1988batches_cte AS (
1989    SELECT
1990        operator_id,
1991        worker_id,
1992        COUNT(*) AS batches
1993    FROM
1994        mz_introspection.mz_arrangement_batches_raw
1995    GROUP BY
1996        operator_id, worker_id
1997),
1998records_cte AS (
1999    SELECT
2000        operator_id,
2001        worker_id,
2002        COUNT(*) AS records
2003    FROM
2004        mz_introspection.mz_arrangement_records_raw
2005    GROUP BY
2006        operator_id, worker_id
2007),
2008heap_size_cte AS (
2009    SELECT
2010        operator_id,
2011        worker_id,
2012        COUNT(*) AS size
2013    FROM
2014        mz_introspection.mz_arrangement_heap_size_raw
2015    GROUP BY
2016        operator_id, worker_id
2017),
2018heap_capacity_cte AS (
2019    SELECT
2020        operator_id,
2021        worker_id,
2022        COUNT(*) AS capacity
2023    FROM
2024        mz_introspection.mz_arrangement_heap_capacity_raw
2025    GROUP BY
2026        operator_id, worker_id
2027),
2028heap_allocations_cte AS (
2029    SELECT
2030        operator_id,
2031        worker_id,
2032        COUNT(*) AS allocations
2033    FROM
2034        mz_introspection.mz_arrangement_heap_allocations_raw
2035    GROUP BY
2036        operator_id, worker_id
2037),
2038batcher_records_cte AS (
2039    SELECT
2040        operator_id,
2041        worker_id,
2042        COUNT(*) AS records
2043    FROM
2044        mz_introspection.mz_arrangement_batcher_records_raw
2045    GROUP BY
2046        operator_id, worker_id
2047),
2048batcher_size_cte AS (
2049    SELECT
2050        operator_id,
2051        worker_id,
2052        COUNT(*) AS size
2053    FROM
2054        mz_introspection.mz_arrangement_batcher_size_raw
2055    GROUP BY
2056        operator_id, worker_id
2057),
2058batcher_capacity_cte AS (
2059    SELECT
2060        operator_id,
2061        worker_id,
2062        COUNT(*) AS capacity
2063    FROM
2064        mz_introspection.mz_arrangement_batcher_capacity_raw
2065    GROUP BY
2066        operator_id, worker_id
2067),
2068batcher_allocations_cte AS (
2069    SELECT
2070        operator_id,
2071        worker_id,
2072        COUNT(*) AS allocations
2073    FROM
2074        mz_introspection.mz_arrangement_batcher_allocations_raw
2075    GROUP BY
2076        operator_id, worker_id
2077),
2078combined AS (
2079    SELECT
2080        opw.operator_id,
2081        opw.worker_id,
2082        CASE
2083            WHEN records_cte.records IS NULL AND batcher_records_cte.records IS NULL THEN NULL
2084            ELSE COALESCE(records_cte.records, 0) + COALESCE(batcher_records_cte.records, 0)
2085        END AS records,
2086        batches_cte.batches AS batches,
2087        CASE
2088            WHEN heap_size_cte.size IS NULL AND batcher_size_cte.size IS NULL THEN NULL
2089            ELSE COALESCE(heap_size_cte.size, 0) + COALESCE(batcher_size_cte.size, 0)
2090        END AS size,
2091        CASE
2092            WHEN heap_capacity_cte.capacity IS NULL AND batcher_capacity_cte.capacity IS NULL THEN NULL
2093            ELSE COALESCE(heap_capacity_cte.capacity, 0) + COALESCE(batcher_capacity_cte.capacity, 0)
2094        END AS capacity,
2095        CASE
2096            WHEN heap_allocations_cte.allocations IS NULL AND batcher_allocations_cte.allocations IS NULL THEN NULL
2097            ELSE COALESCE(heap_allocations_cte.allocations, 0) + COALESCE(batcher_allocations_cte.allocations, 0)
2098        END AS allocations
2099    FROM
2100                    operators_per_worker_cte opw
2101    LEFT OUTER JOIN batches_cte USING (operator_id, worker_id)
2102    LEFT OUTER JOIN records_cte USING (operator_id, worker_id)
2103    LEFT OUTER JOIN heap_size_cte USING (operator_id, worker_id)
2104    LEFT OUTER JOIN heap_capacity_cte USING (operator_id, worker_id)
2105    LEFT OUTER JOIN heap_allocations_cte USING (operator_id, worker_id)
2106    LEFT OUTER JOIN batcher_records_cte USING (operator_id, worker_id)
2107    LEFT OUTER JOIN batcher_size_cte USING (operator_id, worker_id)
2108    LEFT OUTER JOIN batcher_capacity_cte USING (operator_id, worker_id)
2109    LEFT OUTER JOIN batcher_allocations_cte USING (operator_id, worker_id)
2110)
2111SELECT
2112    operator_id, worker_id, records, batches, size, capacity, allocations
2113FROM combined
2114WHERE
2115       records     IS NOT NULL
2116    OR batches     IS NOT NULL
2117    OR size        IS NOT NULL
2118    OR capacity    IS NOT NULL
2119    OR allocations IS NOT NULL
2120",
2121        access: vec![PUBLIC_SELECT],
2122        ontology: None,
2123    }
2124});
2125
2126pub static MZ_ARRANGEMENT_SIZES: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2127    name: "mz_arrangement_sizes",
2128    schema: MZ_INTROSPECTION_SCHEMA,
2129    oid: oid::VIEW_MZ_ARRANGEMENT_SIZES_OID,
2130    desc: RelationDesc::builder()
2131        .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
2132        .with_column("records", SqlScalarType::Int64.nullable(true))
2133        .with_column("batches", SqlScalarType::Int64.nullable(true))
2134        .with_column("size", SqlScalarType::Int64.nullable(true))
2135        .with_column("capacity", SqlScalarType::Int64.nullable(true))
2136        .with_column("allocations", SqlScalarType::Int64.nullable(true))
2137        .with_key(vec![0])
2138        .finish(),
2139    column_comments: BTreeMap::from_iter([
2140        (
2141            "operator_id",
2142            "The ID of the operator that created the arrangement. Corresponds to `mz_dataflow_operators.id`.",
2143        ),
2144        ("records", "The number of records in the arrangement."),
2145        ("batches", "The number of batches in the arrangement."),
2146        ("size", "The utilized size in bytes of the arrangement."),
2147        (
2148            "capacity",
2149            "The capacity in bytes of the arrangement. Can be larger than the size.",
2150        ),
2151        (
2152            "allocations",
2153            "The number of separate memory allocations backing the arrangement.",
2154        ),
2155    ]),
2156    sql: "
2157SELECT
2158    operator_id,
2159    SUM(records)::int8 AS records,
2160    SUM(batches)::int8 AS batches,
2161    SUM(size)::int8 AS size,
2162    SUM(capacity)::int8 AS capacity,
2163    SUM(allocations)::int8 AS allocations
2164FROM mz_introspection.mz_arrangement_sizes_per_worker
2165GROUP BY operator_id",
2166    access: vec![PUBLIC_SELECT],
2167    ontology: Some(Ontology {
2168        entity_name: "arrangement_size",
2169        description: "Aggregated arrangement sizes (records, batches, bytes)",
2170        links: &const {
2171            [OntologyLink {
2172                name: "arrangement_of_operator",
2173                target: "dataflow_operator",
2174                properties: LinkProperties::Measures {
2175                    source_column: "operator_id",
2176                    target_column: "id",
2177                    metric: "arrangement_size",
2178                    source_id_type: None,
2179                    requires_mapping: None,
2180                    note: Some(
2181                        "Both IDs are local uint64 operator IDs within a dataflow, not GlobalIds.",
2182                    ),
2183                    extra_key_columns: None,
2184                },
2185            }]
2186        },
2187        column_semantic_types: &[],
2188    }),
2189});
2190
2191pub static MZ_ARRANGEMENT_SHARING_PER_WORKER: LazyLock<BuiltinView> =
2192    LazyLock::new(|| BuiltinView {
2193        name: "mz_arrangement_sharing_per_worker",
2194        schema: MZ_INTROSPECTION_SCHEMA,
2195        oid: oid::VIEW_MZ_ARRANGEMENT_SHARING_PER_WORKER_OID,
2196        desc: RelationDesc::builder()
2197            .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
2198            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
2199            .with_column("count", SqlScalarType::Int64.nullable(false))
2200            .with_key(vec![0, 1])
2201            .finish(),
2202        column_comments: BTreeMap::new(),
2203        sql: "
2204SELECT
2205    operator_id,
2206    worker_id,
2207    pg_catalog.count(*) AS count
2208FROM mz_introspection.mz_arrangement_sharing_raw
2209GROUP BY operator_id, worker_id",
2210        access: vec![PUBLIC_SELECT],
2211        ontology: None,
2212    });
2213
2214pub static MZ_ARRANGEMENT_SHARING: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2215    name: "mz_arrangement_sharing",
2216    schema: MZ_INTROSPECTION_SCHEMA,
2217    oid: oid::VIEW_MZ_ARRANGEMENT_SHARING_OID,
2218    desc: RelationDesc::builder()
2219        .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
2220        .with_column("count", SqlScalarType::Int64.nullable(false))
2221        .with_key(vec![0])
2222        .finish(),
2223    column_comments: BTreeMap::from_iter([
2224        (
2225            "operator_id",
2226            "The ID of the operator that created the arrangement. Corresponds to `mz_dataflow_operators.id`.",
2227        ),
2228        (
2229            "count",
2230            "The number of operators that share the arrangement.",
2231        ),
2232    ]),
2233    sql: "
2234SELECT operator_id, count
2235FROM mz_introspection.mz_arrangement_sharing_per_worker
2236WHERE worker_id = 0::uint8",
2237    access: vec![PUBLIC_SELECT],
2238    ontology: Some(Ontology {
2239        entity_name: "arrangement_sharing",
2240        description: "Arrangement sharing between operators",
2241        links: &const {
2242            [OntologyLink {
2243                name: "shared_by",
2244                target: "dataflow_operator",
2245                properties: LinkProperties::fk("operator_id", "id", Cardinality::OneToOne),
2246            }]
2247        },
2248        column_semantic_types: &[],
2249    }),
2250});
2251
2252pub static MZ_DATAFLOW_OPERATOR_PARENTS_PER_WORKER: LazyLock<BuiltinView> =
2253    LazyLock::new(|| BuiltinView {
2254        name: "mz_dataflow_operator_parents_per_worker",
2255        schema: MZ_INTROSPECTION_SCHEMA,
2256        oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_PARENTS_PER_WORKER_OID,
2257        desc: RelationDesc::builder()
2258            .with_column("id", SqlScalarType::UInt64.nullable(false))
2259            .with_column("parent_id", SqlScalarType::UInt64.nullable(false))
2260            .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
2261            .finish(),
2262        column_comments: BTreeMap::new(),
2263        sql: "
2264WITH operator_addrs AS(
2265    SELECT
2266        id, address, worker_id
2267    FROM mz_introspection.mz_dataflow_addresses_per_worker
2268        INNER JOIN mz_introspection.mz_dataflow_operators_per_worker
2269            USING (id, worker_id)
2270),
2271parent_addrs AS (
2272    SELECT
2273        id,
2274        address[1:list_length(address) - 1] AS parent_address,
2275        worker_id
2276    FROM operator_addrs
2277)
2278SELECT pa.id, oa.id AS parent_id, pa.worker_id
2279FROM parent_addrs AS pa
2280    INNER JOIN operator_addrs AS oa
2281        ON pa.parent_address = oa.address
2282        AND pa.worker_id = oa.worker_id",
2283        access: vec![PUBLIC_SELECT],
2284        ontology: None,
2285    });
2286
2287pub static MZ_DATAFLOW_OPERATOR_PARENTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2288    name: "mz_dataflow_operator_parents",
2289    schema: MZ_INTROSPECTION_SCHEMA,
2290    oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_PARENTS_OID,
2291    desc: RelationDesc::builder()
2292        .with_column("id", SqlScalarType::UInt64.nullable(false))
2293        .with_column("parent_id", SqlScalarType::UInt64.nullable(false))
2294        .finish(),
2295    column_comments: BTreeMap::from_iter([
2296        (
2297            "id",
2298            "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
2299        ),
2300        (
2301            "parent_id",
2302            "The ID of the operator's parent operator. Corresponds to `mz_dataflow_operators.id`.",
2303        ),
2304    ]),
2305    sql: "
2306SELECT id, parent_id
2307FROM mz_introspection.mz_dataflow_operator_parents_per_worker
2308WHERE worker_id = 0::uint8",
2309    access: vec![PUBLIC_SELECT],
2310    ontology: None,
2311});
2312
2313pub static MZ_DATAFLOW_ARRANGEMENT_SIZES: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2314    name: "mz_dataflow_arrangement_sizes",
2315    schema: MZ_INTROSPECTION_SCHEMA,
2316    oid: oid::VIEW_MZ_DATAFLOW_ARRANGEMENT_SIZES_OID,
2317    desc: RelationDesc::builder()
2318        .with_column("id", SqlScalarType::UInt64.nullable(false))
2319        .with_column("name", SqlScalarType::String.nullable(false))
2320        .with_column("records", SqlScalarType::Int64.nullable(true))
2321        .with_column("batches", SqlScalarType::Int64.nullable(true))
2322        .with_column("size", SqlScalarType::Int64.nullable(true))
2323        .with_column("capacity", SqlScalarType::Int64.nullable(true))
2324        .with_column("allocations", SqlScalarType::Int64.nullable(true))
2325        .with_key(vec![0, 1])
2326        .finish(),
2327    column_comments: BTreeMap::from_iter([
2328        (
2329            "id",
2330            "The ID of the [dataflow]. Corresponds to `mz_dataflows.id`.",
2331        ),
2332        ("name", "The name of the [dataflow]."),
2333        (
2334            "records",
2335            "The number of records in all arrangements in the dataflow.",
2336        ),
2337        (
2338            "batches",
2339            "The number of batches in all arrangements in the dataflow.",
2340        ),
2341        ("size", "The utilized size in bytes of the arrangements."),
2342        (
2343            "capacity",
2344            "The capacity in bytes of the arrangements. Can be larger than the size.",
2345        ),
2346        (
2347            "allocations",
2348            "The number of separate memory allocations backing the arrangements.",
2349        ),
2350    ]),
2351    sql: "
2352SELECT
2353    mdod.dataflow_id AS id,
2354    mdod.dataflow_name AS name,
2355    SUM(mas.records)::int8 AS records,
2356    SUM(mas.batches)::int8 AS batches,
2357    SUM(mas.size)::int8 AS size,
2358    SUM(mas.capacity)::int8 AS capacity,
2359    SUM(mas.allocations)::int8 AS allocations
2360FROM mz_introspection.mz_dataflow_operator_dataflows AS mdod
2361LEFT JOIN mz_introspection.mz_arrangement_sizes AS mas
2362    ON mdod.id = mas.operator_id
2363GROUP BY mdod.dataflow_id, mdod.dataflow_name",
2364    access: vec![PUBLIC_SELECT],
2365    ontology: None,
2366});
2367
2368pub static MZ_EXPECTED_GROUP_SIZE_ADVICE: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2369    name: "mz_expected_group_size_advice",
2370    schema: MZ_INTROSPECTION_SCHEMA,
2371    oid: oid::VIEW_MZ_EXPECTED_GROUP_SIZE_ADVICE_OID,
2372    desc: RelationDesc::builder()
2373        .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
2374        .with_column("dataflow_name", SqlScalarType::String.nullable(false))
2375        .with_column("region_id", SqlScalarType::UInt64.nullable(false))
2376        .with_column("region_name", SqlScalarType::String.nullable(false))
2377        .with_column("levels", SqlScalarType::Int64.nullable(false))
2378        .with_column("to_cut", SqlScalarType::Int64.nullable(false))
2379        .with_column(
2380            "savings",
2381            SqlScalarType::Numeric {
2382                max_scale: Some(NumericMaxScale::ZERO),
2383            }
2384            .nullable(true),
2385        )
2386        .with_column("hint", SqlScalarType::Float64.nullable(false))
2387        .finish(),
2388    column_comments: BTreeMap::from_iter([
2389        (
2390            "dataflow_id",
2391            "The ID of the [dataflow]. Corresponds to `mz_dataflows.id`.",
2392        ),
2393        (
2394            "dataflow_name",
2395            "The internal name of the dataflow hosting the min/max aggregation or Top K.",
2396        ),
2397        (
2398            "region_id",
2399            "The ID of the root operator scope. Corresponds to `mz_dataflow_operators.id`.",
2400        ),
2401        (
2402            "region_name",
2403            "The internal name of the root operator scope for the min/max aggregation or Top K.",
2404        ),
2405        (
2406            "levels",
2407            "The number of levels in the hierarchical scheme implemented by the region.",
2408        ),
2409        (
2410            "to_cut",
2411            "The number of levels that can be eliminated (cut) from the region's hierarchy.",
2412        ),
2413        (
2414            "savings",
2415            "A conservative estimate of the amount of memory in bytes to be saved by applying the hint.",
2416        ),
2417        (
2418            "hint",
2419            "The hint value that will eliminate `to_cut` levels from the region's hierarchy.",
2420        ),
2421    ]),
2422    sql: "
2423        -- The mz_expected_group_size_advice view provides tuning suggestions for the GROUP SIZE
2424        -- query hints. This tuning hint is effective for min/max/top-k patterns, where a stack
2425        -- of arrangements must be built. For each dataflow and region corresponding to one
2426        -- such pattern, we look for how many levels can be eliminated without hitting a level
2427        -- that actually substantially filters the input. The advice is constructed so that
2428        -- setting the hint for the affected region will eliminate these redundant levels of
2429        -- the hierarchical rendering.
2430        --
2431        -- A number of helper CTEs are used for the view definition. The first one, operators,
2432        -- looks for operator names that comprise arrangements of inputs to each level of a
2433        -- min/max/top-k hierarchy.
2434        WITH operators AS (
2435            SELECT
2436                dod.dataflow_id,
2437                dor.id AS region_id,
2438                dod.id,
2439                ars.records,
2440                ars.size
2441            FROM
2442                mz_introspection.mz_dataflow_operator_dataflows dod
2443                JOIN mz_introspection.mz_dataflow_addresses doa
2444                    ON dod.id = doa.id
2445                JOIN mz_introspection.mz_dataflow_addresses dra
2446                    ON dra.address = doa.address[:list_length(doa.address) - 1]
2447                JOIN mz_introspection.mz_dataflow_operators dor
2448                    ON dor.id = dra.id
2449                JOIN mz_introspection.mz_arrangement_sizes ars
2450                    ON ars.operator_id = dod.id
2451            WHERE
2452                dod.name = 'Arranged TopK input'
2453                OR dod.name = 'Arranged MinsMaxesHierarchical input'
2454                OR dod.name = 'Arrange ReduceMinsMaxes'
2455            ),
2456        -- The second CTE, levels, simply computes the heights of the min/max/top-k hierarchies
2457        -- identified in operators above.
2458        levels AS (
2459            SELECT o.dataflow_id, o.region_id, COUNT(*) AS levels
2460            FROM operators o
2461            GROUP BY o.dataflow_id, o.region_id
2462        ),
2463        -- The third CTE, pivot, determines for each min/max/top-k hierarchy, the first input
2464        -- operator. This operator is crucially important, as it records the number of records
2465        -- that was given as input to the gadget as a whole.
2466        pivot AS (
2467            SELECT
2468                o1.dataflow_id,
2469                o1.region_id,
2470                o1.id,
2471                o1.records
2472            FROM operators o1
2473            WHERE
2474                o1.id = (
2475                    SELECT MIN(o2.id)
2476                    FROM operators o2
2477                    WHERE
2478                        o2.dataflow_id = o1.dataflow_id
2479                        AND o2.region_id = o1.region_id
2480                    OPTIONS (AGGREGATE INPUT GROUP SIZE = 8)
2481                )
2482        ),
2483        -- The fourth CTE, candidates, will look for operators where the number of records
2484        -- maintained is not significantly different from the number at the pivot (excluding
2485        -- the pivot itself). These are the candidates for being cut from the dataflow region
2486        -- by adjusting the hint. The query includes a constant, heuristically tuned on TPC-H
2487        -- load generator data, to give some room for small deviations in number of records.
2488        -- The intuition for allowing for this deviation is that we are looking for a strongly
2489        -- reducing point in the hierarchy. To see why one such operator ought to exist in an
2490        -- untuned hierarchy, consider that at each level, we use hashing to distribute rows
2491        -- among groups where the min/max/top-k computation is (partially) applied. If the
2492        -- hierarchy has too many levels, the first-level (pivot) groups will be such that many
2493        -- groups might be empty or contain only one row. Each subsequent level will have a number
2494        -- of groups that is reduced exponentially. So at some point, we will find the level where
2495        -- we actually start having a few rows per group. That's where we will see the row counts
2496        -- significantly drop off.
2497        candidates AS (
2498            SELECT
2499                o.dataflow_id,
2500                o.region_id,
2501                o.id,
2502                o.records,
2503                o.size
2504            FROM
2505                operators o
2506                JOIN pivot p
2507                    ON o.dataflow_id = p.dataflow_id
2508                        AND o.region_id = p.region_id
2509                        AND o.id <> p.id
2510            WHERE o.records >= p.records * (1 - 0.15)
2511        ),
2512        -- The fifth CTE, cuts, computes for each relevant dataflow region, the number of
2513        -- candidate levels that should be cut. We only return here dataflow regions where at
2514        -- least one level must be cut. Note that once we hit a point where the hierarchy starts
2515        -- to have a filtering effect, i.e., after the last candidate, it is dangerous to suggest
2516        -- cutting the height of the hierarchy further. This is because we will have way less
2517        -- groups in the next level, so there should be even further reduction happening or there
2518        -- is some substantial skew in the data. But if the latter is the case, then we should not
2519        -- tune the GROUP SIZE hints down anyway to avoid hurting latency upon updates directed
2520        -- at these unusually large groups. In addition to selecting the levels to cut, we also
2521        -- compute a conservative estimate of the memory savings in bytes that will result from
2522        -- cutting these levels from the hierarchy. The estimate is based on the sizes of the
2523        -- input arrangements for each level to be cut. These arrangements should dominate the
2524        -- size of each level that can be cut, since the reduction gadget internal to the level
2525        -- does not remove much data at these levels.
2526        cuts AS (
2527            SELECT c.dataflow_id, c.region_id, COUNT(*) AS to_cut, SUM(c.size) AS savings
2528            FROM candidates c
2529            GROUP BY c.dataflow_id, c.region_id
2530            HAVING COUNT(*) > 0
2531        )
2532        -- Finally, we compute the hint suggestion for each dataflow region based on the number of
2533        -- levels and the number of candidates to be cut. The hint is computed taking into account
2534        -- the fan-in used in rendering for the hash partitioning and reduction of the groups,
2535        -- currently equal to 16.
2536        SELECT
2537            dod.dataflow_id,
2538            dod.dataflow_name,
2539            dod.id AS region_id,
2540            dod.name AS region_name,
2541            l.levels,
2542            c.to_cut,
2543            c.savings,
2544            pow(16, l.levels - c.to_cut) - 1 AS hint
2545        FROM cuts c
2546            JOIN levels l
2547                ON c.dataflow_id = l.dataflow_id AND c.region_id = l.region_id
2548            JOIN mz_introspection.mz_dataflow_operator_dataflows dod
2549                ON dod.dataflow_id = c.dataflow_id AND dod.id = c.region_id",
2550    access: vec![PUBLIC_SELECT],
2551    ontology: Some(Ontology {
2552        entity_name: "group_size_advice",
2553        description: "Advice on expected group sizes for reduce operators",
2554        links: &const { [] },
2555        column_semantic_types: &[],
2556    }),
2557});