1use std::collections::BTreeMap;
13use std::sync::LazyLock;
14
15use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
16use mz_pgrepr::oid;
17use mz_repr::adt::numeric::NumericMaxScale;
18use mz_repr::namespaces::MZ_INTROSPECTION_SCHEMA;
19use mz_repr::{RelationDesc, SemanticType, SqlScalarType};
20
21use super::{
22 BuiltinLog, BuiltinView, Cardinality, LinkProperties, Ontology, OntologyLink, PUBLIC_SELECT,
23};
24
25pub static MZ_DATAFLOW_OPERATORS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
26 name: "mz_dataflow_operators_per_worker",
27 schema: MZ_INTROSPECTION_SCHEMA,
28 oid: oid::LOG_MZ_DATAFLOW_OPERATORS_PER_WORKER_OID,
29 variant: LogVariant::Timely(TimelyLog::Operates),
30 access: vec![PUBLIC_SELECT],
31 ontology: Some(Ontology {
32 entity_name: "dataflow_operator_per_worker",
33 description: "Timely dataflow operators present on a specific worker.",
34 links: &const { [] },
35 column_semantic_types: &[],
36 }),
37});
38
39pub static MZ_DATAFLOW_ADDRESSES_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
40 name: "mz_dataflow_addresses_per_worker",
41 schema: MZ_INTROSPECTION_SCHEMA,
42 oid: oid::LOG_MZ_DATAFLOW_ADDRESSES_PER_WORKER_OID,
43 variant: LogVariant::Timely(TimelyLog::Addresses),
44 access: vec![PUBLIC_SELECT],
45 ontology: Some(Ontology {
46 entity_name: "dataflow_address_per_worker",
47 description: "Scope address of each Timely operator per worker.",
48 links: &const {
49 [OntologyLink {
50 name: "address_of",
51 target: "dataflow_operator_per_worker",
52 properties: LinkProperties::fk_composite(
53 "id",
54 "id",
55 Cardinality::ManyToOne,
56 &[("worker_id", "worker_id")],
57 ),
58 }]
59 },
60 column_semantic_types: &[],
61 }),
62});
63
64pub static MZ_DATAFLOW_CHANNELS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
65 name: "mz_dataflow_channels_per_worker",
66 schema: MZ_INTROSPECTION_SCHEMA,
67 oid: oid::LOG_MZ_DATAFLOW_CHANNELS_PER_WORKER_OID,
68 variant: LogVariant::Timely(TimelyLog::Channels),
69 access: vec![PUBLIC_SELECT],
70 ontology: Some(Ontology {
71 entity_name: "dataflow_channel_per_worker",
72 description: "Timely dataflow communication channels per worker.",
73 links: &const {
74 [
75 OntologyLink {
76 name: "source_operator",
77 target: "dataflow_operator_per_worker",
78 properties: LinkProperties::fk_composite(
79 "from_index",
80 "id",
81 Cardinality::ManyToOne,
82 &[("worker_id", "worker_id")],
83 ),
84 },
85 OntologyLink {
86 name: "target_operator",
87 target: "dataflow_operator_per_worker",
88 properties: LinkProperties::fk_composite(
89 "to_index",
90 "id",
91 Cardinality::ManyToOne,
92 &[("worker_id", "worker_id")],
93 ),
94 },
95 ]
96 },
97 column_semantic_types: &[],
98 }),
99});
100
101pub static MZ_SCHEDULING_ELAPSED_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
102 name: "mz_scheduling_elapsed_raw",
103 schema: MZ_INTROSPECTION_SCHEMA,
104 oid: oid::LOG_MZ_SCHEDULING_ELAPSED_RAW_OID,
105 variant: LogVariant::Timely(TimelyLog::Elapsed),
106 access: vec![PUBLIC_SELECT],
107 ontology: None,
108});
109
110pub static MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_RAW: LazyLock<BuiltinLog> =
111 LazyLock::new(|| BuiltinLog {
112 name: "mz_compute_operator_durations_histogram_raw",
113 schema: MZ_INTROSPECTION_SCHEMA,
114 oid: oid::LOG_MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_RAW_OID,
115 variant: LogVariant::Timely(TimelyLog::Histogram),
116 access: vec![PUBLIC_SELECT],
117 ontology: None,
118 });
119
120pub static MZ_SCHEDULING_PARKS_HISTOGRAM_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
121 name: "mz_scheduling_parks_histogram_raw",
122 schema: MZ_INTROSPECTION_SCHEMA,
123 oid: oid::LOG_MZ_SCHEDULING_PARKS_HISTOGRAM_RAW_OID,
124 variant: LogVariant::Timely(TimelyLog::Parks),
125 access: vec![PUBLIC_SELECT],
126 ontology: None,
127});
128
129pub static MZ_ARRANGEMENT_RECORDS_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
130 name: "mz_arrangement_records_raw",
131 schema: MZ_INTROSPECTION_SCHEMA,
132 oid: oid::LOG_MZ_ARRANGEMENT_RECORDS_RAW_OID,
133 variant: LogVariant::Differential(DifferentialLog::ArrangementRecords),
134 access: vec![PUBLIC_SELECT],
135 ontology: None,
136});
137
138pub static MZ_ARRANGEMENT_BATCHES_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
139 name: "mz_arrangement_batches_raw",
140 schema: MZ_INTROSPECTION_SCHEMA,
141 oid: oid::LOG_MZ_ARRANGEMENT_BATCHES_RAW_OID,
142 variant: LogVariant::Differential(DifferentialLog::ArrangementBatches),
143 access: vec![PUBLIC_SELECT],
144 ontology: None,
145});
146
147pub static MZ_ARRANGEMENT_SHARING_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
148 name: "mz_arrangement_sharing_raw",
149 schema: MZ_INTROSPECTION_SCHEMA,
150 oid: oid::LOG_MZ_ARRANGEMENT_SHARING_RAW_OID,
151 variant: LogVariant::Differential(DifferentialLog::Sharing),
152 access: vec![PUBLIC_SELECT],
153 ontology: None,
154});
155
156pub static MZ_ARRANGEMENT_BATCHER_RECORDS_RAW: LazyLock<BuiltinLog> =
157 LazyLock::new(|| BuiltinLog {
158 name: "mz_arrangement_batcher_records_raw",
159 schema: MZ_INTROSPECTION_SCHEMA,
160 oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_RECORDS_RAW_OID,
161 variant: LogVariant::Differential(DifferentialLog::BatcherRecords),
162 access: vec![PUBLIC_SELECT],
163 ontology: None,
164 });
165
166pub static MZ_ARRANGEMENT_BATCHER_SIZE_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
167 name: "mz_arrangement_batcher_size_raw",
168 schema: MZ_INTROSPECTION_SCHEMA,
169 oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_SIZE_RAW_OID,
170 variant: LogVariant::Differential(DifferentialLog::BatcherSize),
171 access: vec![PUBLIC_SELECT],
172 ontology: None,
173});
174
175pub static MZ_ARRANGEMENT_BATCHER_CAPACITY_RAW: LazyLock<BuiltinLog> =
176 LazyLock::new(|| BuiltinLog {
177 name: "mz_arrangement_batcher_capacity_raw",
178 schema: MZ_INTROSPECTION_SCHEMA,
179 oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_CAPACITY_RAW_OID,
180 variant: LogVariant::Differential(DifferentialLog::BatcherCapacity),
181 access: vec![PUBLIC_SELECT],
182 ontology: None,
183 });
184
185pub static MZ_ARRANGEMENT_BATCHER_ALLOCATIONS_RAW: LazyLock<BuiltinLog> =
186 LazyLock::new(|| BuiltinLog {
187 name: "mz_arrangement_batcher_allocations_raw",
188 schema: MZ_INTROSPECTION_SCHEMA,
189 oid: oid::LOG_MZ_ARRANGEMENT_BATCHER_ALLOCATIONS_RAW_OID,
190 variant: LogVariant::Differential(DifferentialLog::BatcherAllocations),
191 access: vec![PUBLIC_SELECT],
192 ontology: None,
193 });
194
195pub static MZ_COMPUTE_EXPORTS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
196 name: "mz_compute_exports_per_worker",
197 schema: MZ_INTROSPECTION_SCHEMA,
198 oid: oid::LOG_MZ_COMPUTE_EXPORTS_PER_WORKER_OID,
199 variant: LogVariant::Compute(ComputeLog::DataflowCurrent),
200 access: vec![PUBLIC_SELECT],
201 ontology: Some(Ontology {
202 entity_name: "compute_export_per_worker",
203 description: "Active compute exports (dataflows) present per worker.",
204 links: &const { [] },
205 column_semantic_types: &[("export_id", SemanticType::GlobalId)],
206 }),
207});
208
209pub static MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER: LazyLock<BuiltinLog> =
210 LazyLock::new(|| BuiltinLog {
211 name: "mz_compute_dataflow_global_ids_per_worker",
212 schema: MZ_INTROSPECTION_SCHEMA,
213 oid: oid::LOG_MZ_COMPUTE_DATAFLOW_GLOBAL_IDS_PER_WORKER_OID,
214 variant: LogVariant::Compute(ComputeLog::DataflowGlobal),
215 access: vec![PUBLIC_SELECT],
216 ontology: Some(Ontology {
217 entity_name: "dataflow_global_id_per_worker",
218 description: "Mapping from internal dataflow IDs to GlobalIds per worker.",
219 links: &const {
220 [OntologyLink {
221 name: "global_id_of",
222 target: "compute_export_per_worker",
223 properties: LinkProperties::MapsTo {
224 source_column: "global_id",
225 target_column: "export_id",
226 via: None,
227 from_type: Some(SemanticType::GlobalId),
228 to_type: Some(SemanticType::GlobalId),
229 note: None,
230 },
231 }]
232 },
233 column_semantic_types: &[("global_id", SemanticType::GlobalId)],
234 }),
235 });
236
237pub static MZ_CLUSTER_PROMETHEUS_METRICS: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
238 name: "mz_cluster_prometheus_metrics",
239 schema: MZ_INTROSPECTION_SCHEMA,
240 oid: oid::LOG_MZ_CLUSTER_PROMETHEUS_METRICS_OID,
241 variant: LogVariant::Compute(ComputeLog::PrometheusMetrics),
242 access: vec![PUBLIC_SELECT],
243 ontology: Some(Ontology {
244 entity_name: "cluster_prometheus_metric",
245 description: "Prometheus metrics gathered from the cluster's metrics registry.",
246 links: &const { [] },
247 column_semantic_types: &[],
248 }),
249});
250
251pub static MZ_COMPUTE_FRONTIERS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
252 name: "mz_compute_frontiers_per_worker",
253 schema: MZ_INTROSPECTION_SCHEMA,
254 oid: oid::LOG_MZ_COMPUTE_FRONTIERS_PER_WORKER_OID,
255 variant: LogVariant::Compute(ComputeLog::FrontierCurrent),
256 access: vec![PUBLIC_SELECT],
257 ontology: Some(Ontology {
258 entity_name: "compute_frontier_per_worker",
259 description: "Per-worker output frontier timestamps for each compute export.",
260 links: &const {
261 [OntologyLink {
262 name: "frontier_of",
263 target: "compute_export_per_worker",
264 properties: LinkProperties::measures_composite(
265 "export_id",
266 "export_id",
267 "time",
268 &[("worker_id", "worker_id")],
269 ),
270 }]
271 },
272 column_semantic_types: &[
273 ("export_id", SemanticType::GlobalId),
274 ("time", SemanticType::MzTimestamp),
275 ],
276 }),
277});
278
279pub static MZ_COMPUTE_IMPORT_FRONTIERS_PER_WORKER: LazyLock<BuiltinLog> =
280 LazyLock::new(|| BuiltinLog {
281 name: "mz_compute_import_frontiers_per_worker",
282 schema: MZ_INTROSPECTION_SCHEMA,
283 oid: oid::LOG_MZ_COMPUTE_IMPORT_FRONTIERS_PER_WORKER_OID,
284 variant: LogVariant::Compute(ComputeLog::ImportFrontierCurrent),
285 access: vec![PUBLIC_SELECT],
286 ontology: Some(Ontology {
287 entity_name: "compute_import_frontier_per_worker",
288 description: "Per-worker input frontier timestamps for each compute dataflow.",
289 links: &const {
290 [OntologyLink {
291 name: "import_frontier_of",
292 target: "compute_export_per_worker",
293 properties: LinkProperties::fk_composite(
294 "export_id",
295 "export_id",
296 Cardinality::ManyToOne,
297 &[("worker_id", "worker_id")],
298 ),
299 }]
300 },
301 column_semantic_types: &[
302 ("export_id", SemanticType::GlobalId),
303 ("import_id", SemanticType::GlobalId),
304 ("time", SemanticType::MzTimestamp),
305 ],
306 }),
307 });
308
309pub static MZ_COMPUTE_ERROR_COUNTS_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
310 name: "mz_compute_error_counts_raw",
311 schema: MZ_INTROSPECTION_SCHEMA,
312 oid: oid::LOG_MZ_COMPUTE_ERROR_COUNTS_RAW_OID,
313 variant: LogVariant::Compute(ComputeLog::ErrorCount),
314 access: vec![PUBLIC_SELECT],
315 ontology: None,
316});
317
318pub static MZ_COMPUTE_HYDRATION_TIMES_PER_WORKER: LazyLock<BuiltinLog> =
319 LazyLock::new(|| BuiltinLog {
320 name: "mz_compute_hydration_times_per_worker",
321 schema: MZ_INTROSPECTION_SCHEMA,
322 oid: oid::LOG_MZ_COMPUTE_HYDRATION_TIMES_PER_WORKER_OID,
323 variant: LogVariant::Compute(ComputeLog::HydrationTime),
324 access: vec![PUBLIC_SELECT],
325 ontology: Some(Ontology {
326 entity_name: "hydration_time_per_worker",
327 description: "Time in nanoseconds for each compute export to hydrate per worker.",
328 links: &const {
329 [OntologyLink {
330 name: "hydration_time_of",
331 target: "compute_export_per_worker",
332 properties: LinkProperties::measures_composite(
333 "export_id",
334 "export_id",
335 "time_ns",
336 &[("worker_id", "worker_id")],
337 ),
338 }]
339 },
340 column_semantic_types: &[("export_id", SemanticType::GlobalId)],
341 }),
342 });
343
344pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER: LazyLock<BuiltinLog> =
345 LazyLock::new(|| BuiltinLog {
346 name: "mz_compute_operator_hydration_statuses_per_worker",
347 schema: MZ_INTROSPECTION_SCHEMA,
348 oid: oid::LOG_MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER_OID,
349 variant: LogVariant::Compute(ComputeLog::OperatorHydrationStatus),
350 access: vec![PUBLIC_SELECT],
351 ontology: Some(Ontology {
352 entity_name: "hydration_status_per_worker",
353 description: "Hydration state for each LIR node per worker.",
354 links: &const {
355 [OntologyLink {
356 name: "hydration_of",
357 target: "compute_export_per_worker",
358 properties: LinkProperties::fk_composite(
359 "export_id",
360 "export_id",
361 Cardinality::ManyToOne,
362 &[("worker_id", "worker_id")],
363 ),
364 }]
365 },
366 column_semantic_types: &[("export_id", SemanticType::GlobalId)],
367 }),
368 });
369
370pub static MZ_ACTIVE_PEEKS_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
371 name: "mz_active_peeks_per_worker",
372 schema: MZ_INTROSPECTION_SCHEMA,
373 oid: oid::LOG_MZ_ACTIVE_PEEKS_PER_WORKER_OID,
374 variant: LogVariant::Compute(ComputeLog::PeekCurrent),
375 access: vec![PUBLIC_SELECT],
376 ontology: Some(Ontology {
377 entity_name: "active_peek_per_worker",
378 description: "In-flight peek requests currently executing per worker.",
379 links: &const { [] },
380 column_semantic_types: &[
381 ("object_id", SemanticType::GlobalId),
382 ("time", SemanticType::MzTimestamp),
383 ],
384 }),
385});
386
387pub static MZ_COMPUTE_LIR_MAPPING_PER_WORKER: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
388 name: "mz_compute_lir_mapping_per_worker",
389 schema: MZ_INTROSPECTION_SCHEMA,
390 oid: oid::LOG_MZ_COMPUTE_LIR_MAPPING_PER_WORKER_OID,
391 variant: LogVariant::Compute(ComputeLog::LirMapping),
392 access: vec![PUBLIC_SELECT],
393 ontology: Some(Ontology {
394 entity_name: "lir_mapping_per_worker",
395 description: "Mapping from LIR node IDs to dataflow operator address ranges per worker.",
396 links: &const {
397 [OntologyLink {
398 name: "export_of",
399 target: "compute_export_per_worker",
400 properties: LinkProperties::fk_composite(
401 "global_id",
402 "export_id",
403 Cardinality::ManyToOne,
404 &[("worker_id", "worker_id")],
405 ),
406 }]
407 },
408 column_semantic_types: &[("global_id", SemanticType::GlobalId)],
409 }),
410});
411
412pub static MZ_PEEK_DURATIONS_HISTOGRAM_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
413 name: "mz_peek_durations_histogram_raw",
414 schema: MZ_INTROSPECTION_SCHEMA,
415 oid: oid::LOG_MZ_PEEK_DURATIONS_HISTOGRAM_RAW_OID,
416 variant: LogVariant::Compute(ComputeLog::PeekDuration),
417 access: vec![PUBLIC_SELECT],
418 ontology: None,
419});
420
421pub static MZ_ARRANGEMENT_HEAP_SIZE_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
422 name: "mz_arrangement_heap_size_raw",
423 schema: MZ_INTROSPECTION_SCHEMA,
424 oid: oid::LOG_MZ_ARRANGEMENT_HEAP_SIZE_RAW_OID,
425 variant: LogVariant::Compute(ComputeLog::ArrangementHeapSize),
426 access: vec![PUBLIC_SELECT],
427 ontology: None,
428});
429
430pub static MZ_ARRANGEMENT_HEAP_CAPACITY_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
431 name: "mz_arrangement_heap_capacity_raw",
432 schema: MZ_INTROSPECTION_SCHEMA,
433 oid: oid::LOG_MZ_ARRANGEMENT_HEAP_CAPACITY_RAW_OID,
434 variant: LogVariant::Compute(ComputeLog::ArrangementHeapCapacity),
435 access: vec![PUBLIC_SELECT],
436 ontology: None,
437});
438
439pub static MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW: LazyLock<BuiltinLog> =
440 LazyLock::new(|| BuiltinLog {
441 name: "mz_arrangement_heap_allocations_raw",
442 schema: MZ_INTROSPECTION_SCHEMA,
443 oid: oid::LOG_MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW_OID,
444 variant: LogVariant::Compute(ComputeLog::ArrangementHeapAllocations),
445 access: vec![PUBLIC_SELECT],
446 ontology: None,
447 });
448
449pub static MZ_MESSAGE_BATCH_COUNTS_RECEIVED_RAW: LazyLock<BuiltinLog> =
450 LazyLock::new(|| BuiltinLog {
451 name: "mz_message_batch_counts_received_raw",
452 schema: MZ_INTROSPECTION_SCHEMA,
453 oid: oid::LOG_MZ_MESSAGE_BATCH_COUNTS_RECEIVED_RAW_OID,
454 variant: LogVariant::Timely(TimelyLog::BatchesReceived),
455 access: vec![PUBLIC_SELECT],
456 ontology: None,
457 });
458
459pub static MZ_MESSAGE_BATCH_COUNTS_SENT_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
460 name: "mz_message_batch_counts_sent_raw",
461 schema: MZ_INTROSPECTION_SCHEMA,
462 oid: oid::LOG_MZ_MESSAGE_BATCH_COUNTS_SENT_RAW_OID,
463 variant: LogVariant::Timely(TimelyLog::BatchesSent),
464 access: vec![PUBLIC_SELECT],
465 ontology: None,
466});
467
468pub static MZ_MESSAGE_COUNTS_RECEIVED_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
469 name: "mz_message_counts_received_raw",
470 schema: MZ_INTROSPECTION_SCHEMA,
471 oid: oid::LOG_MZ_MESSAGE_COUNTS_RECEIVED_RAW_OID,
472 variant: LogVariant::Timely(TimelyLog::MessagesReceived),
473 access: vec![PUBLIC_SELECT],
474 ontology: None,
475});
476
477pub static MZ_MESSAGE_COUNTS_SENT_RAW: LazyLock<BuiltinLog> = LazyLock::new(|| BuiltinLog {
478 name: "mz_message_counts_sent_raw",
479 schema: MZ_INTROSPECTION_SCHEMA,
480 oid: oid::LOG_MZ_MESSAGE_COUNTS_SENT_RAW_OID,
481 variant: LogVariant::Timely(TimelyLog::MessagesSent),
482 access: vec![PUBLIC_SELECT],
483 ontology: None,
484});
485
486pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_RAW: LazyLock<BuiltinLog> =
487 LazyLock::new(|| BuiltinLog {
488 name: "mz_dataflow_operator_reachability_raw",
489 schema: MZ_INTROSPECTION_SCHEMA,
490 oid: oid::LOG_MZ_DATAFLOW_OPERATOR_REACHABILITY_RAW_OID,
491 variant: LogVariant::Timely(TimelyLog::Reachability),
492 access: vec![PUBLIC_SELECT],
493 ontology: None,
494 });
495
496pub static MZ_DATAFLOWS_PER_WORKER: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
497 name: "mz_dataflows_per_worker",
498 schema: MZ_INTROSPECTION_SCHEMA,
499 oid: oid::VIEW_MZ_DATAFLOWS_PER_WORKER_OID,
500 desc: RelationDesc::builder()
501 .with_column("id", SqlScalarType::UInt64.nullable(true))
502 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
503 .with_column("name", SqlScalarType::String.nullable(false))
504 .finish(),
505 column_comments: BTreeMap::new(),
506 sql: "SELECT
507 addrs.address[1] AS id,
508 ops.worker_id,
509 ops.name
510FROM
511 mz_introspection.mz_dataflow_addresses_per_worker addrs,
512 mz_introspection.mz_dataflow_operators_per_worker ops
513WHERE
514 addrs.id = ops.id AND
515 addrs.worker_id = ops.worker_id AND
516 mz_catalog.list_length(addrs.address) = 1",
517 access: vec![PUBLIC_SELECT],
518 ontology: None,
519});
520
521pub static MZ_DATAFLOWS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
522 name: "mz_dataflows",
523 schema: MZ_INTROSPECTION_SCHEMA,
524 oid: oid::VIEW_MZ_DATAFLOWS_OID,
525 desc: RelationDesc::builder()
526 .with_column("id", SqlScalarType::UInt64.nullable(true))
527 .with_column("name", SqlScalarType::String.nullable(false))
528 .finish(),
529 column_comments: BTreeMap::from_iter([
530 ("id", "The ID of the dataflow."),
531 ("name", "The internal name of the dataflow."),
532 ]),
533 sql: "
534SELECT id, name
535FROM mz_introspection.mz_dataflows_per_worker
536WHERE worker_id = 0::uint8",
537 access: vec![PUBLIC_SELECT],
538 ontology: Some(Ontology {
539 entity_name: "dataflow",
540 description: "Dataflow instances",
541 links: &const { [] },
542 column_semantic_types: &[],
543 }),
544});
545
546pub static MZ_DATAFLOW_ADDRESSES: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
547 name: "mz_dataflow_addresses",
548 schema: MZ_INTROSPECTION_SCHEMA,
549 oid: oid::VIEW_MZ_DATAFLOW_ADDRESSES_OID,
550 desc: RelationDesc::builder()
551 .with_column("id", SqlScalarType::UInt64.nullable(false))
552 .with_column(
553 "address",
554 SqlScalarType::List {
555 element_type: Box::new(SqlScalarType::UInt64),
556 custom_id: None,
557 }
558 .nullable(false),
559 )
560 .with_key(vec![0])
561 .finish(),
562 column_comments: BTreeMap::from_iter([
563 (
564 "id",
565 "The ID of the channel or operator. Corresponds to `mz_dataflow_channels.id` or `mz_dataflow_operators.id`.",
566 ),
567 (
568 "address",
569 "A list of scope-local indexes indicating the path from the root to this channel or operator.",
570 ),
571 ]),
572 sql: "
573SELECT id, address
574FROM mz_introspection.mz_dataflow_addresses_per_worker
575WHERE worker_id = 0::uint8",
576 access: vec![PUBLIC_SELECT],
577 ontology: Some(Ontology {
578 entity_name: "dataflow_address",
579 description: "Address (scope path) of dataflow operators",
580 links: &const {
581 [OntologyLink {
582 name: "address_of_operator",
583 target: "dataflow_operator",
584 properties: LinkProperties::fk("id", "id", Cardinality::OneToOne),
585 }]
586 },
587 column_semantic_types: &[],
588 }),
589});
590
591pub static MZ_DATAFLOW_CHANNELS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
592 name: "mz_dataflow_channels",
593 schema: MZ_INTROSPECTION_SCHEMA,
594 oid: oid::VIEW_MZ_DATAFLOW_CHANNELS_OID,
595 desc: RelationDesc::builder()
596 .with_column("id", SqlScalarType::UInt64.nullable(false))
597 .with_column("from_index", SqlScalarType::UInt64.nullable(false))
598 .with_column("from_port", SqlScalarType::UInt64.nullable(false))
599 .with_column("to_index", SqlScalarType::UInt64.nullable(false))
600 .with_column("to_port", SqlScalarType::UInt64.nullable(false))
601 .with_column("type", SqlScalarType::String.nullable(false))
602 .with_key(vec![0])
603 .finish(),
604 column_comments: BTreeMap::from_iter([
605 ("id", "The ID of the channel."),
606 (
607 "from_index",
608 "The scope-local index of the source operator. Corresponds to `mz_dataflow_addresses.address`.",
609 ),
610 ("from_port", "The source operator's output port."),
611 (
612 "to_index",
613 "The scope-local index of the target operator. Corresponds to `mz_dataflow_addresses.address`.",
614 ),
615 ("to_port", "The target operator's input port."),
616 ("type", "The container type of the channel."),
617 ]),
618 sql: "
619SELECT id, from_index, from_port, to_index, to_port, type
620FROM mz_introspection.mz_dataflow_channels_per_worker
621WHERE worker_id = 0::uint8",
622 access: vec![PUBLIC_SELECT],
623 ontology: Some(Ontology {
624 entity_name: "dataflow_channel",
625 description: "Communication channels between operators",
626 links: &const {
627 [OntologyLink {
628 name: "channel_in_dataflow",
629 target: "dataflow",
630 properties: LinkProperties::MapsTo {
631 source_column: "from_index",
632 target_column: "id",
633 via: Some("mz_introspection.mz_dataflow_operator_dataflows"),
634 from_type: None,
635 to_type: None,
636 note: Some(
637 "Channels do not have a direct dataflow_id. Use mz_dataflow_addresses to find the parent scope, then correlate with mz_dataflow_operator_dataflows.",
638 ),
639 },
640 }]
641 },
642 column_semantic_types: &[],
643 }),
644});
645
646pub static MZ_DATAFLOW_OPERATORS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
647 name: "mz_dataflow_operators",
648 schema: MZ_INTROSPECTION_SCHEMA,
649 oid: oid::VIEW_MZ_DATAFLOW_OPERATORS_OID,
650 desc: RelationDesc::builder()
651 .with_column("id", SqlScalarType::UInt64.nullable(false))
652 .with_column("name", SqlScalarType::String.nullable(false))
653 .with_key(vec![0])
654 .finish(),
655 column_comments: BTreeMap::from_iter([
656 ("id", "The ID of the operator."),
657 ("name", "The internal name of the operator."),
658 ]),
659 sql: "
660SELECT id, name
661FROM mz_introspection.mz_dataflow_operators_per_worker
662WHERE worker_id = 0::uint8",
663 access: vec![PUBLIC_SELECT],
664 ontology: Some(Ontology {
665 entity_name: "dataflow_operator",
666 description: "Operators within dataflows",
667 links: &const { [] },
668 column_semantic_types: &[],
669 }),
670});
671
672pub static MZ_DATAFLOW_GLOBAL_IDS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
673 name: "mz_dataflow_global_ids",
674 schema: MZ_INTROSPECTION_SCHEMA,
675 oid: oid::VIEW_MZ_DATAFLOW_GLOBAL_IDS_OID,
676 desc: RelationDesc::builder()
677 .with_column("id", SqlScalarType::UInt64.nullable(false))
678 .with_column("global_id", SqlScalarType::String.nullable(false))
679 .with_key(vec![0, 1])
680 .finish(),
681 column_comments: BTreeMap::from_iter([
682 ("id", "The dataflow ID."),
683 ("global_id", "A global ID associated with that dataflow."),
684 ]),
685 sql: "
686SELECT id, global_id
687FROM mz_introspection.mz_compute_dataflow_global_ids_per_worker
688WHERE worker_id = 0::uint8",
689 access: vec![PUBLIC_SELECT],
690 ontology: None,
691});
692
693pub static MZ_MAPPABLE_OBJECTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
694 name: "mz_mappable_objects",
695 schema: MZ_INTROSPECTION_SCHEMA,
696 oid: oid::VIEW_MZ_MAPPABLE_OBJECTS_OID,
697 desc: RelationDesc::builder()
698 .with_column("name", SqlScalarType::String.nullable(false))
699 .with_column("global_id", SqlScalarType::String.nullable(false))
700 .finish(),
701 column_comments: BTreeMap::from_iter([
702 (
703 "name",
704 "The name of the object. This name is unquoted, and you might need to call `quote_ident` if you want to reference the name shown here.",
705 ),
706 ("global_id", "The global ID of the object."),
707 ]),
708 sql: "
709SELECT COALESCE(md.name || '.', '') || ms.name || '.' || mo.name AS name, mgi.global_id AS global_id
710FROM mz_catalog.mz_objects mo
711 JOIN mz_introspection.mz_compute_exports mce ON (mo.id = mce.export_id)
712 JOIN mz_catalog.mz_schemas ms ON (mo.schema_id = ms.id)
713 JOIN mz_introspection.mz_dataflow_global_ids mgi ON (mce.dataflow_id = mgi.id)
714 LEFT JOIN mz_catalog.mz_databases md ON (ms.database_id = md.id);",
715 access: vec![PUBLIC_SELECT],
716 ontology: Some(Ontology {
717 entity_name: "mappable_object",
718 description: "Objects that can be mapped to dataflow operators",
719 links: &const { [] },
720 column_semantic_types: &[("global_id", SemanticType::GlobalId)],
721 }),
722});
723
724pub static MZ_LIR_MAPPING: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
725 name: "mz_lir_mapping",
726 schema: MZ_INTROSPECTION_SCHEMA,
727 oid: oid::VIEW_MZ_LIR_MAPPING_OID,
728 desc: RelationDesc::builder()
729 .with_column("global_id", SqlScalarType::String.nullable(false))
730 .with_column("lir_id", SqlScalarType::UInt64.nullable(false))
731 .with_column("operator", SqlScalarType::String.nullable(false))
732 .with_column("parent_lir_id", SqlScalarType::UInt64.nullable(true))
733 .with_column("nesting", SqlScalarType::UInt16.nullable(false))
734 .with_column("operator_id_start", SqlScalarType::UInt64.nullable(false))
735 .with_column("operator_id_end", SqlScalarType::UInt64.nullable(false))
736 .with_key(vec![0, 1])
737 .finish(),
738 column_comments: BTreeMap::from_iter([
739 ("global_id", "The global ID."),
740 ("lir_id", "The LIR node ID."),
741 (
742 "operator",
743 "The LIR operator, in the format `OperatorName INPUTS [OPTIONS]`.",
744 ),
745 (
746 "parent_lir_id",
747 "The parent of this LIR node. May be `NULL`.",
748 ),
749 ("nesting", "The nesting level of this LIR node."),
750 (
751 "operator_id_start",
752 "The first dataflow operator ID implementing this LIR operator (inclusive).",
753 ),
754 (
755 "operator_id_end",
756 "The first dataflow operator ID _after_ this LIR operator (exclusive).",
757 ),
758 ]),
759 sql: "
760SELECT global_id, lir_id, operator, parent_lir_id, nesting, operator_id_start, operator_id_end
761FROM mz_introspection.mz_compute_lir_mapping_per_worker
762WHERE worker_id = 0::uint8",
763 access: vec![PUBLIC_SELECT],
764 ontology: Some(Ontology {
765 entity_name: "lir_mapping",
766 description: "LIR (low-level IR) to dataflow operator mapping",
767 links: &const { [] },
768 column_semantic_types: &[("global_id", SemanticType::GlobalId)],
769 }),
770});
771
772pub static MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER: LazyLock<BuiltinView> =
773 LazyLock::new(|| BuiltinView {
774 name: "mz_dataflow_operator_dataflows_per_worker",
775 schema: MZ_INTROSPECTION_SCHEMA,
776 oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_DATAFLOWS_PER_WORKER_OID,
777 desc: RelationDesc::builder()
778 .with_column("id", SqlScalarType::UInt64.nullable(false))
779 .with_column("name", SqlScalarType::String.nullable(false))
780 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
781 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
782 .with_column("dataflow_name", SqlScalarType::String.nullable(false))
783 .finish(),
784 column_comments: BTreeMap::new(),
785 sql: "SELECT
786 ops.id,
787 ops.name,
788 ops.worker_id,
789 dfs.id as dataflow_id,
790 dfs.name as dataflow_name
791FROM
792 mz_introspection.mz_dataflow_operators_per_worker ops,
793 mz_introspection.mz_dataflow_addresses_per_worker addrs,
794 mz_introspection.mz_dataflows_per_worker dfs
795WHERE
796 ops.id = addrs.id AND
797 ops.worker_id = addrs.worker_id AND
798 dfs.id = addrs.address[1] AND
799 dfs.worker_id = addrs.worker_id",
800 access: vec![PUBLIC_SELECT],
801 ontology: None,
802 });
803
804pub static MZ_DATAFLOW_OPERATOR_DATAFLOWS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
805 name: "mz_dataflow_operator_dataflows",
806 schema: MZ_INTROSPECTION_SCHEMA,
807 oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_DATAFLOWS_OID,
808 desc: RelationDesc::builder()
809 .with_column("id", SqlScalarType::UInt64.nullable(false))
810 .with_column("name", SqlScalarType::String.nullable(false))
811 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
812 .with_column("dataflow_name", SqlScalarType::String.nullable(false))
813 .finish(),
814 column_comments: BTreeMap::from_iter([
815 (
816 "id",
817 "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
818 ),
819 ("name", "The internal name of the operator."),
820 (
821 "dataflow_id",
822 "The ID of the dataflow hosting the operator. Corresponds to `mz_dataflows.id`.",
823 ),
824 (
825 "dataflow_name",
826 "The internal name of the dataflow hosting the operator.",
827 ),
828 ]),
829 sql: "
830SELECT id, name, dataflow_id, dataflow_name
831FROM mz_introspection.mz_dataflow_operator_dataflows_per_worker
832WHERE worker_id = 0::uint8",
833 access: vec![PUBLIC_SELECT],
834 ontology: Some(Ontology {
835 entity_name: "dataflow_operator_dataflow",
836 description: "Mapping of operators to their parent dataflow",
837 links: &const {
838 [OntologyLink {
839 name: "operator_in_dataflow",
840 target: "dataflow",
841 properties: LinkProperties::fk("dataflow_id", "id", Cardinality::ManyToOne),
842 }]
843 },
844 column_semantic_types: &[],
845 }),
846});
847
848pub static MZ_COMPUTE_EXPORTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
849 name: "mz_compute_exports",
850 schema: MZ_INTROSPECTION_SCHEMA,
851 oid: oid::VIEW_MZ_COMPUTE_EXPORTS_OID,
852 desc: RelationDesc::builder()
853 .with_column("export_id", SqlScalarType::String.nullable(false))
854 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
855 .with_key(vec![0])
856 .finish(),
857 column_comments: BTreeMap::from_iter([
858 (
859 "export_id",
860 "The ID of the index, materialized view, or subscription exported by the dataflow. Corresponds to `mz_catalog.mz_indexes.id`, `mz_catalog.mz_materialized_views.id`, or `mz_internal.mz_subscriptions.id`.",
861 ),
862 (
863 "dataflow_id",
864 "The ID of the dataflow. Corresponds to `mz_dataflows.id`.",
865 ),
866 ]),
867 sql: "
868SELECT export_id, dataflow_id
869FROM mz_introspection.mz_compute_exports_per_worker
870WHERE worker_id = 0::uint8",
871 access: vec![PUBLIC_SELECT],
872 ontology: Some(Ontology {
873 entity_name: "compute_export",
874 description: "Compute exports (maintained collections)",
875 links: &const {
876 [
877 OntologyLink {
878 name: "export_of",
879 target: "object",
880 properties: LinkProperties::fk_mapped(
881 "export_id",
882 "id",
883 Cardinality::ManyToOne,
884 mz_repr::SemanticType::GlobalId,
885 "mz_internal.mz_object_global_ids",
886 ),
887 },
888 OntologyLink {
889 name: "introspection_uses_global_id",
890 target: "object_global_id",
891 properties: LinkProperties::MapsTo {
892 source_column: "export_id",
893 target_column: "global_id",
894 via: None,
895 from_type: None,
896 to_type: None,
897 note: Some(
898 "mz_introspection tables use GlobalId. To join with mz_catalog tables (which use CatalogItemId), go through mz_internal.mz_object_global_ids.",
899 ),
900 },
901 },
902 ]
903 },
904 column_semantic_types: &[("export_id", SemanticType::GlobalId)],
905 }),
906});
907
908pub static MZ_COMPUTE_FRONTIERS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
909 name: "mz_compute_frontiers",
910 schema: MZ_INTROSPECTION_SCHEMA,
911 oid: oid::VIEW_MZ_COMPUTE_FRONTIERS_OID,
912 desc: RelationDesc::builder()
913 .with_column("export_id", SqlScalarType::String.nullable(false))
914 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
915 .with_key(vec![0])
916 .finish(),
917 column_comments: BTreeMap::from_iter([
918 (
919 "export_id",
920 "The ID of the dataflow export. Corresponds to `mz_compute_exports.export_id`.",
921 ),
922 (
923 "time",
924 "The next timestamp at which the dataflow output may change.",
925 ),
926 ]),
927 sql: "SELECT
928 export_id, pg_catalog.min(time) AS time
929FROM mz_introspection.mz_compute_frontiers_per_worker
930GROUP BY export_id",
931 access: vec![PUBLIC_SELECT],
932 ontology: Some(Ontology {
933 entity_name: "compute_frontier",
934 description: "Per-replica compute frontiers",
935 links: &const {
936 [OntologyLink {
937 name: "compute_frontier_of",
938 target: "object",
939 properties: LinkProperties::fk_mapped(
940 "export_id",
941 "id",
942 Cardinality::ManyToOne,
943 mz_repr::SemanticType::GlobalId,
944 "mz_internal.mz_object_global_ids",
945 ),
946 }]
947 },
948 column_semantic_types: &const {
949 [
950 ("export_id", SemanticType::GlobalId),
951 ("time", SemanticType::MzTimestamp),
952 ]
953 },
954 }),
955});
956
957pub static MZ_DATAFLOW_CHANNEL_OPERATORS_PER_WORKER: LazyLock<BuiltinView> =
958 LazyLock::new(|| BuiltinView {
959 name: "mz_dataflow_channel_operators_per_worker",
960 schema: MZ_INTROSPECTION_SCHEMA,
961 oid: oid::VIEW_MZ_DATAFLOW_CHANNEL_OPERATORS_PER_WORKER_OID,
962 desc: RelationDesc::builder()
963 .with_column("id", SqlScalarType::UInt64.nullable(false))
964 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
965 .with_column("from_operator_id", SqlScalarType::UInt64.nullable(true))
966 .with_column(
967 "from_operator_address",
968 SqlScalarType::List {
969 element_type: Box::new(SqlScalarType::UInt64),
970 custom_id: None,
971 }
972 .nullable(false),
973 )
974 .with_column("to_operator_id", SqlScalarType::UInt64.nullable(true))
975 .with_column(
976 "to_operator_address",
977 SqlScalarType::List {
978 element_type: Box::new(SqlScalarType::UInt64),
979 custom_id: None,
980 }
981 .nullable(false),
982 )
983 .with_column("type", SqlScalarType::String.nullable(false))
984 .finish(),
985 column_comments: BTreeMap::new(),
986 sql: "
987WITH
988channel_addresses(id, worker_id, address, from_index, to_index, type) AS (
989 SELECT id, worker_id, address, from_index, to_index, type
990 FROM mz_introspection.mz_dataflow_channels_per_worker mdc
991 INNER JOIN mz_introspection.mz_dataflow_addresses_per_worker mda
992 USING (id, worker_id)
993),
994channel_operator_addresses(id, worker_id, from_address, to_address, type) AS (
995 SELECT id, worker_id,
996 address || from_index AS from_address,
997 address || to_index AS to_address,
998 type
999 FROM channel_addresses
1000),
1001operator_addresses(id, worker_id, address) AS (
1002 SELECT id, worker_id, address
1003 FROM mz_introspection.mz_dataflow_addresses_per_worker mda
1004 INNER JOIN mz_introspection.mz_dataflow_operators_per_worker mdo
1005 USING (id, worker_id)
1006)
1007SELECT coa.id,
1008 coa.worker_id,
1009 from_ops.id AS from_operator_id,
1010 coa.from_address AS from_operator_address,
1011 to_ops.id AS to_operator_id,
1012 coa.to_address AS to_operator_address,
1013 coa.type
1014FROM channel_operator_addresses coa
1015 LEFT OUTER JOIN operator_addresses from_ops
1016 ON coa.from_address = from_ops.address AND
1017 coa.worker_id = from_ops.worker_id
1018 LEFT OUTER JOIN operator_addresses to_ops
1019 ON coa.to_address = to_ops.address AND
1020 coa.worker_id = to_ops.worker_id
1021",
1022 access: vec![PUBLIC_SELECT],
1023 ontology: None,
1024 });
1025
1026pub static MZ_DATAFLOW_CHANNEL_OPERATORS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1027 name: "mz_dataflow_channel_operators",
1028 schema: MZ_INTROSPECTION_SCHEMA,
1029 oid: oid::VIEW_MZ_DATAFLOW_CHANNEL_OPERATORS_OID,
1030 desc: RelationDesc::builder()
1031 .with_column("id", SqlScalarType::UInt64.nullable(false))
1032 .with_column("from_operator_id", SqlScalarType::UInt64.nullable(true))
1033 .with_column(
1034 "from_operator_address",
1035 SqlScalarType::List {
1036 element_type: Box::new(SqlScalarType::UInt64),
1037 custom_id: None,
1038 }
1039 .nullable(false),
1040 )
1041 .with_column("to_operator_id", SqlScalarType::UInt64.nullable(true))
1042 .with_column(
1043 "to_operator_address",
1044 SqlScalarType::List {
1045 element_type: Box::new(SqlScalarType::UInt64),
1046 custom_id: None,
1047 }
1048 .nullable(false),
1049 )
1050 .with_column("type", SqlScalarType::String.nullable(false))
1051 .finish(),
1052 column_comments: BTreeMap::from_iter([
1053 (
1054 "id",
1055 "The ID of the channel. Corresponds to `mz_dataflow_channels.id`.",
1056 ),
1057 (
1058 "from_operator_id",
1059 "The ID of the source of the channel. Corresponds to `mz_dataflow_operators.id`.",
1060 ),
1061 (
1062 "from_operator_address",
1063 "The address of the source of the channel. Corresponds to `mz_dataflow_addresses.address`.",
1064 ),
1065 (
1066 "to_operator_id",
1067 "The ID of the target of the channel. Corresponds to `mz_dataflow_operators.id`.",
1068 ),
1069 (
1070 "to_operator_address",
1071 "The address of the target of the channel. Corresponds to `mz_dataflow_addresses.address`.",
1072 ),
1073 ("type", "The container type of the channel."),
1074 ]),
1075 sql: "
1076SELECT id, from_operator_id, from_operator_address, to_operator_id, to_operator_address, type
1077FROM mz_introspection.mz_dataflow_channel_operators_per_worker
1078WHERE worker_id = 0::uint8",
1079 access: vec![PUBLIC_SELECT],
1080 ontology: None,
1081});
1082
1083pub static MZ_COMPUTE_IMPORT_FRONTIERS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1084 name: "mz_compute_import_frontiers",
1085 schema: MZ_INTROSPECTION_SCHEMA,
1086 oid: oid::VIEW_MZ_COMPUTE_IMPORT_FRONTIERS_OID,
1087 desc: RelationDesc::builder()
1088 .with_column("export_id", SqlScalarType::String.nullable(false))
1089 .with_column("import_id", SqlScalarType::String.nullable(false))
1090 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
1091 .with_key(vec![0, 1])
1092 .finish(),
1093 column_comments: BTreeMap::from_iter([
1094 (
1095 "export_id",
1096 "The ID of the dataflow export. Corresponds to `mz_compute_exports.export_id`.",
1097 ),
1098 (
1099 "import_id",
1100 "The ID of the dataflow import. Corresponds to `mz_catalog.mz_sources.id` or `mz_catalog.mz_tables.id` or `mz_compute_exports.export_id`.",
1101 ),
1102 (
1103 "time",
1104 "The next timestamp at which the dataflow input may change.",
1105 ),
1106 ]),
1107 sql: "SELECT
1108 export_id, import_id, pg_catalog.min(time) AS time
1109FROM mz_introspection.mz_compute_import_frontiers_per_worker
1110GROUP BY export_id, import_id",
1111 access: vec![PUBLIC_SELECT],
1112 ontology: Some(Ontology {
1113 entity_name: "compute_import_frontier",
1114 description: "Import frontiers for compute dependencies",
1115 links: &const {
1116 [OntologyLink {
1117 name: "compute_import_frontier_of",
1118 target: "object",
1119 properties: LinkProperties::fk_mapped(
1120 "export_id",
1121 "id",
1122 Cardinality::ManyToOne,
1123 mz_repr::SemanticType::GlobalId,
1124 "mz_internal.mz_object_global_ids",
1125 ),
1126 }]
1127 },
1128 column_semantic_types: &const {
1129 [
1130 ("export_id", SemanticType::GlobalId),
1131 ("import_id", SemanticType::GlobalId),
1132 ("time", SemanticType::MzTimestamp),
1133 ]
1134 },
1135 }),
1136});
1137
1138pub static MZ_RECORDS_PER_DATAFLOW_OPERATOR_PER_WORKER: LazyLock<BuiltinView> =
1139 LazyLock::new(|| BuiltinView {
1140 name: "mz_records_per_dataflow_operator_per_worker",
1141 schema: MZ_INTROSPECTION_SCHEMA,
1142 oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_OPERATOR_PER_WORKER_OID,
1143 desc: RelationDesc::builder()
1144 .with_column("id", SqlScalarType::UInt64.nullable(false))
1145 .with_column("name", SqlScalarType::String.nullable(false))
1146 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1147 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
1148 .with_column("records", SqlScalarType::Int64.nullable(true))
1149 .with_column("batches", SqlScalarType::Int64.nullable(true))
1150 .with_column("size", SqlScalarType::Int64.nullable(true))
1151 .with_column("capacity", SqlScalarType::Int64.nullable(true))
1152 .with_column("allocations", SqlScalarType::Int64.nullable(true))
1153 .finish(),
1154 column_comments: BTreeMap::new(),
1155 sql: "
1156SELECT
1157 dod.id,
1158 dod.name,
1159 dod.worker_id,
1160 dod.dataflow_id,
1161 ar_size.records AS records,
1162 ar_size.batches AS batches,
1163 ar_size.size AS size,
1164 ar_size.capacity AS capacity,
1165 ar_size.allocations AS allocations
1166FROM
1167 mz_introspection.mz_dataflow_operator_dataflows_per_worker dod
1168 LEFT OUTER JOIN mz_introspection.mz_arrangement_sizes_per_worker ar_size ON
1169 dod.id = ar_size.operator_id AND
1170 dod.worker_id = ar_size.worker_id",
1171 access: vec![PUBLIC_SELECT],
1172 ontology: None,
1173 });
1174
1175pub static MZ_RECORDS_PER_DATAFLOW_OPERATOR: LazyLock<BuiltinView> =
1176 LazyLock::new(|| BuiltinView {
1177 name: "mz_records_per_dataflow_operator",
1178 schema: MZ_INTROSPECTION_SCHEMA,
1179 oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_OPERATOR_OID,
1180 desc: RelationDesc::builder()
1181 .with_column("id", SqlScalarType::UInt64.nullable(false))
1182 .with_column("name", SqlScalarType::String.nullable(false))
1183 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
1184 .with_column("records", SqlScalarType::Int64.nullable(true))
1185 .with_column("batches", SqlScalarType::Int64.nullable(true))
1186 .with_column("size", SqlScalarType::Int64.nullable(true))
1187 .with_column("capacity", SqlScalarType::Int64.nullable(true))
1188 .with_column("allocations", SqlScalarType::Int64.nullable(true))
1189 .with_key(vec![0, 1, 2])
1190 .finish(),
1191 column_comments: BTreeMap::from_iter([
1192 (
1193 "id",
1194 "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
1195 ),
1196 ("name", "The internal name of the operator."),
1197 (
1198 "dataflow_id",
1199 "The ID of the dataflow. Corresponds to `mz_dataflows.id`.",
1200 ),
1201 ("records", "The number of records in the operator."),
1202 ("batches", "The number of batches in the dataflow."),
1203 ("size", "The utilized size in bytes of the arrangement."),
1204 (
1205 "capacity",
1206 "The capacity in bytes of the arrangement. Can be larger than the size.",
1207 ),
1208 (
1209 "allocations",
1210 "The number of separate memory allocations backing the arrangement.",
1211 ),
1212 ]),
1213 sql: "
1214SELECT
1215 id,
1216 name,
1217 dataflow_id,
1218 SUM(records)::int8 AS records,
1219 SUM(batches)::int8 AS batches,
1220 SUM(size)::int8 AS size,
1221 SUM(capacity)::int8 AS capacity,
1222 SUM(allocations)::int8 AS allocations
1223FROM mz_introspection.mz_records_per_dataflow_operator_per_worker
1224GROUP BY id, name, dataflow_id",
1225 access: vec![PUBLIC_SELECT],
1226 ontology: None,
1227 });
1228
1229pub static MZ_RECORDS_PER_DATAFLOW_PER_WORKER: LazyLock<BuiltinView> =
1230 LazyLock::new(|| BuiltinView {
1231 name: "mz_records_per_dataflow_per_worker",
1232 schema: MZ_INTROSPECTION_SCHEMA,
1233 oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_PER_WORKER_OID,
1234 desc: RelationDesc::builder()
1235 .with_column("id", SqlScalarType::UInt64.nullable(false))
1236 .with_column("name", SqlScalarType::String.nullable(false))
1237 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1238 .with_column("records", SqlScalarType::Int64.nullable(true))
1239 .with_column("batches", SqlScalarType::Int64.nullable(true))
1240 .with_column("size", SqlScalarType::Int64.nullable(true))
1241 .with_column("capacity", SqlScalarType::Int64.nullable(true))
1242 .with_column("allocations", SqlScalarType::Int64.nullable(true))
1243 .with_key(vec![0, 1, 2])
1244 .finish(),
1245 column_comments: BTreeMap::new(),
1246 sql: "
1247SELECT
1248 rdo.dataflow_id as id,
1249 dfs.name,
1250 rdo.worker_id,
1251 SUM(rdo.records)::int8 as records,
1252 SUM(rdo.batches)::int8 as batches,
1253 SUM(rdo.size)::int8 as size,
1254 SUM(rdo.capacity)::int8 as capacity,
1255 SUM(rdo.allocations)::int8 as allocations
1256FROM
1257 mz_introspection.mz_records_per_dataflow_operator_per_worker rdo,
1258 mz_introspection.mz_dataflows_per_worker dfs
1259WHERE
1260 rdo.dataflow_id = dfs.id AND
1261 rdo.worker_id = dfs.worker_id
1262GROUP BY
1263 rdo.dataflow_id,
1264 dfs.name,
1265 rdo.worker_id",
1266 access: vec![PUBLIC_SELECT],
1267 ontology: None,
1268 });
1269
1270pub static MZ_RECORDS_PER_DATAFLOW: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1271 name: "mz_records_per_dataflow",
1272 schema: MZ_INTROSPECTION_SCHEMA,
1273 oid: oid::VIEW_MZ_RECORDS_PER_DATAFLOW_OID,
1274 desc: RelationDesc::builder()
1275 .with_column("id", SqlScalarType::UInt64.nullable(false))
1276 .with_column("name", SqlScalarType::String.nullable(false))
1277 .with_column("records", SqlScalarType::Int64.nullable(true))
1278 .with_column("batches", SqlScalarType::Int64.nullable(true))
1279 .with_column("size", SqlScalarType::Int64.nullable(true))
1280 .with_column("capacity", SqlScalarType::Int64.nullable(true))
1281 .with_column("allocations", SqlScalarType::Int64.nullable(true))
1282 .with_key(vec![0, 1])
1283 .finish(),
1284 column_comments: BTreeMap::from_iter([
1285 (
1286 "id",
1287 "The ID of the dataflow. Corresponds to `mz_dataflows.id`.",
1288 ),
1289 ("name", "The internal name of the dataflow."),
1290 ("records", "The number of records in the dataflow."),
1291 ("batches", "The number of batches in the dataflow."),
1292 ("size", "The utilized size in bytes of the arrangements."),
1293 (
1294 "capacity",
1295 "The capacity in bytes of the arrangements. Can be larger than the size.",
1296 ),
1297 (
1298 "allocations",
1299 "The number of separate memory allocations backing the arrangements.",
1300 ),
1301 ]),
1302 sql: "
1303SELECT
1304 id,
1305 name,
1306 SUM(records)::int8 as records,
1307 SUM(batches)::int8 as batches,
1308 SUM(size)::int8 as size,
1309 SUM(capacity)::int8 as capacity,
1310 SUM(allocations)::int8 as allocations
1311FROM
1312 mz_introspection.mz_records_per_dataflow_per_worker
1313GROUP BY
1314 id,
1315 name",
1316 access: vec![PUBLIC_SELECT],
1317 ontology: Some(Ontology {
1318 entity_name: "records_per_dataflow",
1319 description: "Record counts aggregated per dataflow",
1320 links: &const {
1321 [OntologyLink {
1322 name: "details_of",
1323 target: "dataflow",
1324 properties: LinkProperties::fk("id", "id", Cardinality::OneToOne),
1325 }]
1326 },
1327 column_semantic_types: &[],
1328 }),
1329});
1330
1331pub static MZ_PEEK_DURATIONS_HISTOGRAM_PER_WORKER: LazyLock<BuiltinView> =
1332 LazyLock::new(|| BuiltinView {
1333 name: "mz_peek_durations_histogram_per_worker",
1334 schema: MZ_INTROSPECTION_SCHEMA,
1335 oid: oid::VIEW_MZ_PEEK_DURATIONS_HISTOGRAM_PER_WORKER_OID,
1336 desc: RelationDesc::builder()
1337 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1338 .with_column("type", SqlScalarType::String.nullable(false))
1339 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1340 .with_column("count", SqlScalarType::Int64.nullable(false))
1341 .with_key(vec![0, 1, 2])
1342 .finish(),
1343 column_comments: BTreeMap::new(),
1344 sql: "SELECT
1345 worker_id, type, duration_ns, pg_catalog.count(*) AS count
1346FROM
1347 mz_introspection.mz_peek_durations_histogram_raw
1348GROUP BY
1349 worker_id, type, duration_ns",
1350 access: vec![PUBLIC_SELECT],
1351 ontology: None,
1352 });
1353
1354pub static MZ_PEEK_DURATIONS_HISTOGRAM: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1355 name: "mz_peek_durations_histogram",
1356 schema: MZ_INTROSPECTION_SCHEMA,
1357 oid: oid::VIEW_MZ_PEEK_DURATIONS_HISTOGRAM_OID,
1358 desc: RelationDesc::builder()
1359 .with_column("type", SqlScalarType::String.nullable(false))
1360 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1361 .with_column(
1362 "count",
1363 SqlScalarType::Numeric {
1364 max_scale: Some(NumericMaxScale::ZERO),
1365 }
1366 .nullable(false),
1367 )
1368 .with_key(vec![0, 1])
1369 .finish(),
1370 column_comments: BTreeMap::from_iter([
1371 ("type", "The peek variant: `index` or `persist`."),
1372 (
1373 "duration_ns",
1374 "The upper bound of the bucket in nanoseconds.",
1375 ),
1376 (
1377 "count",
1378 "The (noncumulative) count of peeks in this bucket.",
1379 ),
1380 ]),
1381 sql: "
1382SELECT
1383 type, duration_ns,
1384 pg_catalog.sum(count) AS count
1385FROM mz_introspection.mz_peek_durations_histogram_per_worker
1386GROUP BY type, duration_ns",
1387 access: vec![PUBLIC_SELECT],
1388 ontology: Some(Ontology {
1389 entity_name: "peek_duration",
1390 description: "Histogram of SELECT query durations",
1391 links: &const { [] },
1392 column_semantic_types: &[],
1393 }),
1394});
1395
1396pub static MZ_SCHEDULING_ELAPSED_PER_WORKER: LazyLock<BuiltinView> =
1397 LazyLock::new(|| BuiltinView {
1398 name: "mz_scheduling_elapsed_per_worker",
1399 schema: MZ_INTROSPECTION_SCHEMA,
1400 oid: oid::VIEW_MZ_SCHEDULING_ELAPSED_PER_WORKER_OID,
1401 desc: RelationDesc::builder()
1402 .with_column("id", SqlScalarType::UInt64.nullable(false))
1403 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1404 .with_column("elapsed_ns", SqlScalarType::Int64.nullable(false))
1405 .with_key(vec![0, 1])
1406 .finish(),
1407 column_comments: BTreeMap::new(),
1408 sql: "SELECT
1409 id, worker_id, pg_catalog.count(*) AS elapsed_ns
1410FROM
1411 mz_introspection.mz_scheduling_elapsed_raw
1412GROUP BY
1413 id, worker_id",
1414 access: vec![PUBLIC_SELECT],
1415 ontology: None,
1416 });
1417
1418pub static MZ_SCHEDULING_ELAPSED: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1419 name: "mz_scheduling_elapsed",
1420 schema: MZ_INTROSPECTION_SCHEMA,
1421 oid: oid::VIEW_MZ_SCHEDULING_ELAPSED_OID,
1422 desc: RelationDesc::builder()
1423 .with_column("id", SqlScalarType::UInt64.nullable(false))
1424 .with_column(
1425 "elapsed_ns",
1426 SqlScalarType::Numeric {
1427 max_scale: Some(NumericMaxScale::ZERO),
1428 }
1429 .nullable(false),
1430 )
1431 .with_key(vec![0])
1432 .finish(),
1433 column_comments: BTreeMap::from_iter([
1434 (
1435 "id",
1436 "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
1437 ),
1438 (
1439 "elapsed_ns",
1440 "The total elapsed time spent in the operator in nanoseconds.",
1441 ),
1442 ]),
1443 sql: "
1444SELECT
1445 id,
1446 pg_catalog.sum(elapsed_ns) AS elapsed_ns
1447FROM mz_introspection.mz_scheduling_elapsed_per_worker
1448GROUP BY id",
1449 access: vec![PUBLIC_SELECT],
1450 ontology: Some(Ontology {
1451 entity_name: "scheduling_elapsed",
1452 description: "CPU time spent per operator",
1453 links: &const {
1454 [OntologyLink {
1455 name: "elapsed_for_operator",
1456 target: "dataflow_operator",
1457 properties: LinkProperties::measures("id", "id", "cpu_time_ns"),
1458 }]
1459 },
1460 column_semantic_types: &[],
1461 }),
1462});
1463
1464pub static MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_PER_WORKER: LazyLock<BuiltinView> =
1465 LazyLock::new(|| BuiltinView {
1466 name: "mz_compute_operator_durations_histogram_per_worker",
1467 schema: MZ_INTROSPECTION_SCHEMA,
1468 oid: oid::VIEW_MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_PER_WORKER_OID,
1469 desc: RelationDesc::builder()
1470 .with_column("id", SqlScalarType::UInt64.nullable(false))
1471 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1472 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1473 .with_column("count", SqlScalarType::Int64.nullable(false))
1474 .with_key(vec![0, 1, 2])
1475 .finish(),
1476 column_comments: BTreeMap::new(),
1477 sql: "SELECT
1478 id, worker_id, duration_ns, pg_catalog.count(*) AS count
1479FROM
1480 mz_introspection.mz_compute_operator_durations_histogram_raw
1481GROUP BY
1482 id, worker_id, duration_ns",
1483 access: vec![PUBLIC_SELECT],
1484 ontology: None,
1485 });
1486
1487pub static MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM: LazyLock<BuiltinView> =
1488 LazyLock::new(|| BuiltinView {
1489 name: "mz_compute_operator_durations_histogram",
1490 schema: MZ_INTROSPECTION_SCHEMA,
1491 oid: oid::VIEW_MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_OID,
1492 desc: RelationDesc::builder()
1493 .with_column("id", SqlScalarType::UInt64.nullable(false))
1494 .with_column("duration_ns", SqlScalarType::UInt64.nullable(false))
1495 .with_column(
1496 "count",
1497 SqlScalarType::Numeric {
1498 max_scale: Some(NumericMaxScale::ZERO),
1499 }
1500 .nullable(false),
1501 )
1502 .with_key(vec![0, 1])
1503 .finish(),
1504 column_comments: BTreeMap::from_iter([
1505 (
1506 "id",
1507 "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
1508 ),
1509 (
1510 "duration_ns",
1511 "The upper bound of the duration bucket in nanoseconds.",
1512 ),
1513 (
1514 "count",
1515 "The (noncumulative) count of invocations in the bucket.",
1516 ),
1517 ]),
1518 sql: "
1519SELECT
1520 id,
1521 duration_ns,
1522 pg_catalog.sum(count) AS count
1523FROM mz_introspection.mz_compute_operator_durations_histogram_per_worker
1524GROUP BY id, duration_ns",
1525 access: vec![PUBLIC_SELECT],
1526 ontology: None,
1527 });
1528
1529pub static MZ_SCHEDULING_PARKS_HISTOGRAM_PER_WORKER: LazyLock<BuiltinView> =
1530 LazyLock::new(|| BuiltinView {
1531 name: "mz_scheduling_parks_histogram_per_worker",
1532 schema: MZ_INTROSPECTION_SCHEMA,
1533 oid: oid::VIEW_MZ_SCHEDULING_PARKS_HISTOGRAM_PER_WORKER_OID,
1534 desc: RelationDesc::builder()
1535 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1536 .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
1537 .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
1538 .with_column("count", SqlScalarType::Int64.nullable(false))
1539 .with_key(vec![0, 1, 2])
1540 .finish(),
1541 column_comments: BTreeMap::new(),
1542 sql: "SELECT
1543 worker_id, slept_for_ns, requested_ns, pg_catalog.count(*) AS count
1544FROM
1545 mz_introspection.mz_scheduling_parks_histogram_raw
1546GROUP BY
1547 worker_id, slept_for_ns, requested_ns",
1548 access: vec![PUBLIC_SELECT],
1549 ontology: None,
1550 });
1551
1552pub static MZ_SCHEDULING_PARKS_HISTOGRAM: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1553 name: "mz_scheduling_parks_histogram",
1554 schema: MZ_INTROSPECTION_SCHEMA,
1555 oid: oid::VIEW_MZ_SCHEDULING_PARKS_HISTOGRAM_OID,
1556 desc: RelationDesc::builder()
1557 .with_column("slept_for_ns", SqlScalarType::UInt64.nullable(false))
1558 .with_column("requested_ns", SqlScalarType::UInt64.nullable(false))
1559 .with_column(
1560 "count",
1561 SqlScalarType::Numeric {
1562 max_scale: Some(NumericMaxScale::ZERO),
1563 }
1564 .nullable(false),
1565 )
1566 .with_key(vec![0, 1])
1567 .finish(),
1568 column_comments: BTreeMap::from_iter([
1569 (
1570 "slept_for_ns",
1571 "The actual length of the park event in nanoseconds.",
1572 ),
1573 (
1574 "requested_ns",
1575 "The requested length of the park event in nanoseconds.",
1576 ),
1577 (
1578 "count",
1579 "The (noncumulative) count of park events in this bucket.",
1580 ),
1581 ]),
1582 sql: "
1583SELECT
1584 slept_for_ns,
1585 requested_ns,
1586 pg_catalog.sum(count) AS count
1587FROM mz_introspection.mz_scheduling_parks_histogram_per_worker
1588GROUP BY slept_for_ns, requested_ns",
1589 access: vec![PUBLIC_SELECT],
1590 ontology: Some(Ontology {
1591 entity_name: "scheduling_parks",
1592 description: "Histogram of operator park durations",
1593 links: &const { [] },
1594 column_semantic_types: &[],
1595 }),
1596});
1597
1598pub static MZ_COMPUTE_ERROR_COUNTS_PER_WORKER: LazyLock<BuiltinView> =
1599 LazyLock::new(|| BuiltinView {
1600 name: "mz_compute_error_counts_per_worker",
1601 schema: MZ_INTROSPECTION_SCHEMA,
1602 oid: oid::VIEW_MZ_COMPUTE_ERROR_COUNTS_PER_WORKER_OID,
1603 desc: RelationDesc::builder()
1604 .with_column("export_id", SqlScalarType::String.nullable(false))
1605 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1606 .with_column("count", SqlScalarType::Int64.nullable(false))
1607 .with_key(vec![0, 1, 2])
1608 .finish(),
1609 column_comments: BTreeMap::new(),
1610 sql: "
1611WITH MUTUALLY RECURSIVE
1612 -- Indexes that reuse existing indexes rather than maintaining separate dataflows.
1613 -- For these we don't log error counts separately, so we need to forward the error counts from
1614 -- their dependencies instead.
1615 index_reuses(reuse_id text, index_id text) AS (
1616 SELECT d.object_id, d.dependency_id
1617 FROM mz_internal.mz_compute_dependencies d
1618 JOIN mz_introspection.mz_compute_exports e ON (e.export_id = d.object_id)
1619 WHERE NOT EXISTS (
1620 SELECT 1 FROM mz_introspection.mz_dataflows
1621 WHERE id = e.dataflow_id
1622 )
1623 ),
1624 -- Error counts that were directly logged on compute exports.
1625 direct_errors(export_id text, worker_id uint8, count int8) AS (
1626 SELECT export_id, worker_id, count
1627 FROM mz_introspection.mz_compute_error_counts_raw
1628 ),
1629 -- Error counts propagated to index reused.
1630 all_errors(export_id text, worker_id uint8, count int8) AS (
1631 SELECT * FROM direct_errors
1632 UNION
1633 SELECT r.reuse_id, e.worker_id, e.count
1634 FROM all_errors e
1635 JOIN index_reuses r ON (r.index_id = e.export_id)
1636 )
1637SELECT * FROM all_errors",
1638 access: vec![PUBLIC_SELECT],
1639 ontology: Some(Ontology {
1640 entity_name: "compute_error_per_worker",
1641 description: "Error counts per compute collection per worker.",
1642 links: &const {
1643 [OntologyLink {
1644 name: "errors_in",
1645 target: "compute_export_per_worker",
1646 properties: LinkProperties::measures_composite(
1647 "export_id",
1648 "export_id",
1649 "count",
1650 &[("worker_id", "worker_id")],
1651 ),
1652 }]
1653 },
1654 column_semantic_types: &[("export_id", SemanticType::GlobalId)],
1655 }),
1656 });
1657
1658pub static MZ_COMPUTE_ERROR_COUNTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1659 name: "mz_compute_error_counts",
1660 schema: MZ_INTROSPECTION_SCHEMA,
1661 oid: oid::VIEW_MZ_COMPUTE_ERROR_COUNTS_OID,
1662 desc: RelationDesc::builder()
1663 .with_column("export_id", SqlScalarType::String.nullable(false))
1664 .with_column(
1665 "count",
1666 SqlScalarType::Numeric {
1667 max_scale: Some(NumericMaxScale::ZERO),
1668 }
1669 .nullable(false),
1670 )
1671 .with_key(vec![0])
1672 .finish(),
1673 column_comments: BTreeMap::from_iter([
1674 (
1675 "export_id",
1676 "The ID of the dataflow export. Corresponds to `mz_compute_exports.export_id`.",
1677 ),
1678 (
1679 "count",
1680 "The count of errors present in this dataflow export.",
1681 ),
1682 ]),
1683 sql: "
1684SELECT
1685 export_id,
1686 pg_catalog.sum(count) AS count
1687FROM mz_introspection.mz_compute_error_counts_per_worker
1688GROUP BY export_id
1689HAVING pg_catalog.sum(count) != 0",
1690 access: vec![PUBLIC_SELECT],
1691 ontology: Some(Ontology {
1692 entity_name: "compute_error_count",
1693 description: "Error counts per compute collection",
1694 links: &const {
1695 [OntologyLink {
1696 name: "errors_in",
1697 target: "compute_export",
1698 properties: LinkProperties::fk("export_id", "export_id", Cardinality::OneToOne),
1699 }]
1700 },
1701 column_semantic_types: &[("export_id", SemanticType::GlobalId)],
1702 }),
1703});
1704
1705pub static MZ_MESSAGE_COUNTS_PER_WORKER: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1706 name: "mz_message_counts_per_worker",
1707 schema: MZ_INTROSPECTION_SCHEMA,
1708 oid: oid::VIEW_MZ_MESSAGE_COUNTS_PER_WORKER_OID,
1709 desc: RelationDesc::builder()
1710 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
1711 .with_column("from_worker_id", SqlScalarType::UInt64.nullable(false))
1712 .with_column("to_worker_id", SqlScalarType::UInt64.nullable(false))
1713 .with_column("sent", SqlScalarType::Int64.nullable(false))
1714 .with_column("received", SqlScalarType::Int64.nullable(false))
1715 .with_column("batch_sent", SqlScalarType::Int64.nullable(false))
1716 .with_column("batch_received", SqlScalarType::Int64.nullable(false))
1717 .with_key(vec![0, 1, 2])
1718 .finish(),
1719 column_comments: BTreeMap::new(),
1720 sql: "
1721WITH batch_sent_cte AS (
1722 SELECT
1723 channel_id,
1724 from_worker_id,
1725 to_worker_id,
1726 pg_catalog.count(*) AS sent
1727 FROM
1728 mz_introspection.mz_message_batch_counts_sent_raw
1729 GROUP BY
1730 channel_id, from_worker_id, to_worker_id
1731),
1732batch_received_cte AS (
1733 SELECT
1734 channel_id,
1735 from_worker_id,
1736 to_worker_id,
1737 pg_catalog.count(*) AS received
1738 FROM
1739 mz_introspection.mz_message_batch_counts_received_raw
1740 GROUP BY
1741 channel_id, from_worker_id, to_worker_id
1742),
1743sent_cte AS (
1744 SELECT
1745 channel_id,
1746 from_worker_id,
1747 to_worker_id,
1748 pg_catalog.count(*) AS sent
1749 FROM
1750 mz_introspection.mz_message_counts_sent_raw
1751 GROUP BY
1752 channel_id, from_worker_id, to_worker_id
1753),
1754received_cte AS (
1755 SELECT
1756 channel_id,
1757 from_worker_id,
1758 to_worker_id,
1759 pg_catalog.count(*) AS received
1760 FROM
1761 mz_introspection.mz_message_counts_received_raw
1762 GROUP BY
1763 channel_id, from_worker_id, to_worker_id
1764)
1765SELECT
1766 sent_cte.channel_id,
1767 sent_cte.from_worker_id,
1768 sent_cte.to_worker_id,
1769 sent_cte.sent,
1770 received_cte.received,
1771 batch_sent_cte.sent AS batch_sent,
1772 batch_received_cte.received AS batch_received
1773FROM sent_cte
1774JOIN received_cte USING (channel_id, from_worker_id, to_worker_id)
1775JOIN batch_sent_cte USING (channel_id, from_worker_id, to_worker_id)
1776JOIN batch_received_cte USING (channel_id, from_worker_id, to_worker_id)",
1777 access: vec![PUBLIC_SELECT],
1778 ontology: None,
1779});
1780
1781pub static MZ_MESSAGE_COUNTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1782 name: "mz_message_counts",
1783 schema: MZ_INTROSPECTION_SCHEMA,
1784 oid: oid::VIEW_MZ_MESSAGE_COUNTS_OID,
1785 desc: RelationDesc::builder()
1786 .with_column("channel_id", SqlScalarType::UInt64.nullable(false))
1787 .with_column(
1788 "sent",
1789 SqlScalarType::Numeric {
1790 max_scale: Some(NumericMaxScale::ZERO),
1791 }
1792 .nullable(false),
1793 )
1794 .with_column(
1795 "received",
1796 SqlScalarType::Numeric {
1797 max_scale: Some(NumericMaxScale::ZERO),
1798 }
1799 .nullable(false),
1800 )
1801 .with_column(
1802 "batch_sent",
1803 SqlScalarType::Numeric {
1804 max_scale: Some(NumericMaxScale::ZERO),
1805 }
1806 .nullable(false),
1807 )
1808 .with_column(
1809 "batch_received",
1810 SqlScalarType::Numeric {
1811 max_scale: Some(NumericMaxScale::ZERO),
1812 }
1813 .nullable(false),
1814 )
1815 .with_key(vec![0])
1816 .finish(),
1817 column_comments: BTreeMap::from_iter([
1818 (
1819 "channel_id",
1820 "The ID of the channel. Corresponds to `mz_dataflow_channels.id`.",
1821 ),
1822 ("sent", "The number of messages sent."),
1823 ("received", "The number of messages received."),
1824 ("batch_sent", "The number of batches sent."),
1825 ("batch_received", "The number of batches received."),
1826 ]),
1827 sql: "
1828SELECT
1829 channel_id,
1830 pg_catalog.sum(sent) AS sent,
1831 pg_catalog.sum(received) AS received,
1832 pg_catalog.sum(batch_sent) AS batch_sent,
1833 pg_catalog.sum(batch_received) AS batch_received
1834FROM mz_introspection.mz_message_counts_per_worker
1835GROUP BY channel_id",
1836 access: vec![PUBLIC_SELECT],
1837 ontology: Some(Ontology {
1838 entity_name: "message_count",
1839 description: "Inter-worker message counts",
1840 links: &const {
1841 [OntologyLink {
1842 name: "counts_for",
1843 target: "dataflow_channel",
1844 properties: LinkProperties::fk("channel_id", "id", Cardinality::OneToOne),
1845 }]
1846 },
1847 column_semantic_types: &[],
1848 }),
1849});
1850
1851pub static MZ_ACTIVE_PEEKS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
1852 name: "mz_active_peeks",
1853 schema: MZ_INTROSPECTION_SCHEMA,
1854 oid: oid::VIEW_MZ_ACTIVE_PEEKS_OID,
1855 desc: RelationDesc::builder()
1856 .with_column("id", SqlScalarType::Uuid.nullable(false))
1857 .with_column("object_id", SqlScalarType::String.nullable(false))
1858 .with_column("type", SqlScalarType::String.nullable(false))
1859 .with_column("time", SqlScalarType::MzTimestamp.nullable(false))
1860 .with_key(vec![0])
1861 .finish(),
1862 column_comments: BTreeMap::from_iter([
1863 ("id", "The ID of the peek request."),
1864 (
1865 "object_id",
1866 "The ID of the collection the peek is targeting. Corresponds to `mz_catalog.mz_indexes.id`, `mz_catalog.mz_materialized_views.id`, `mz_catalog.mz_sources.id`, or `mz_catalog.mz_tables.id`.",
1867 ),
1868 (
1869 "type",
1870 "The type of the corresponding peek: `index` if targeting an index or temporary dataflow; `persist` for a source, materialized view, or table.",
1871 ),
1872 ("time", "The timestamp the peek has requested."),
1873 ]),
1874 sql: "
1875SELECT id, object_id, type, time
1876FROM mz_introspection.mz_active_peeks_per_worker
1877WHERE worker_id = 0::uint8",
1878 access: vec![PUBLIC_SELECT],
1879 ontology: Some(Ontology {
1880 entity_name: "active_peek",
1881 description: "Currently executing SELECT queries",
1882 links: &const { [] },
1883 column_semantic_types: &const {
1884 [
1885 ("object_id", SemanticType::GlobalId),
1886 ("time", SemanticType::MzTimestamp),
1887 ]
1888 },
1889 }),
1890});
1891
1892pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER: LazyLock<BuiltinView> =
1893 LazyLock::new(|| BuiltinView {
1894 name: "mz_dataflow_operator_reachability_per_worker",
1895 schema: MZ_INTROSPECTION_SCHEMA,
1896 oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER_OID,
1897 desc: RelationDesc::builder()
1898 .with_column("id", SqlScalarType::UInt64.nullable(false))
1899 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1900 .with_column("port", SqlScalarType::UInt64.nullable(false))
1901 .with_column("update_type", SqlScalarType::String.nullable(false))
1902 .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
1903 .with_column("count", SqlScalarType::Int64.nullable(false))
1904 .with_key(vec![0, 1, 2, 3, 4])
1905 .finish(),
1906 column_comments: BTreeMap::new(),
1907 sql: "SELECT
1908 addr2.id,
1909 reachability.worker_id,
1910 port,
1911 update_type,
1912 time,
1913 pg_catalog.count(*) as count
1914FROM
1915 mz_introspection.mz_dataflow_operator_reachability_raw reachability,
1916 mz_introspection.mz_dataflow_addresses_per_worker addr1,
1917 mz_introspection.mz_dataflow_addresses_per_worker addr2
1918WHERE
1919 addr2.address =
1920 CASE
1921 WHEN source = 0 THEN addr1.address
1922 ELSE addr1.address || reachability.source
1923 END
1924 AND addr1.id = reachability.id
1925 AND addr1.worker_id = reachability.worker_id
1926 AND addr2.worker_id = reachability.worker_id
1927GROUP BY addr2.id, reachability.worker_id, port, update_type, time",
1928 access: vec![PUBLIC_SELECT],
1929 ontology: None,
1930 });
1931
1932pub static MZ_DATAFLOW_OPERATOR_REACHABILITY: LazyLock<BuiltinView> =
1933 LazyLock::new(|| BuiltinView {
1934 name: "mz_dataflow_operator_reachability",
1935 schema: MZ_INTROSPECTION_SCHEMA,
1936 oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_OID,
1937 desc: RelationDesc::builder()
1938 .with_column("id", SqlScalarType::UInt64.nullable(false))
1939 .with_column("port", SqlScalarType::UInt64.nullable(false))
1940 .with_column("update_type", SqlScalarType::String.nullable(false))
1941 .with_column("time", SqlScalarType::MzTimestamp.nullable(true))
1942 .with_column(
1943 "count",
1944 SqlScalarType::Numeric {
1945 max_scale: Some(NumericMaxScale::ZERO),
1946 }
1947 .nullable(false),
1948 )
1949 .with_key(vec![0, 1, 2, 3])
1950 .finish(),
1951 column_comments: BTreeMap::new(),
1952 sql: "
1953SELECT
1954 id,
1955 port,
1956 update_type,
1957 time,
1958 pg_catalog.sum(count) as count
1959FROM mz_introspection.mz_dataflow_operator_reachability_per_worker
1960GROUP BY id, port, update_type, time",
1961 access: vec![PUBLIC_SELECT],
1962 ontology: None,
1963 });
1964
1965pub static MZ_ARRANGEMENT_SIZES_PER_WORKER: LazyLock<BuiltinView> = LazyLock::new(|| {
1966 BuiltinView {
1967 name: "mz_arrangement_sizes_per_worker",
1968 schema: MZ_INTROSPECTION_SCHEMA,
1969 oid: oid::VIEW_MZ_ARRANGEMENT_SIZES_PER_WORKER_OID,
1970 desc: RelationDesc::builder()
1971 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
1972 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
1973 .with_column("records", SqlScalarType::Int64.nullable(true))
1974 .with_column("batches", SqlScalarType::Int64.nullable(true))
1975 .with_column("size", SqlScalarType::Int64.nullable(true))
1976 .with_column("capacity", SqlScalarType::Int64.nullable(true))
1977 .with_column("allocations", SqlScalarType::Int64.nullable(true))
1978 .finish(),
1979 column_comments: BTreeMap::new(),
1980 sql: "
1981WITH operators_per_worker_cte AS (
1982 SELECT
1983 id AS operator_id,
1984 worker_id
1985 FROM
1986 mz_introspection.mz_dataflow_operators_per_worker
1987),
1988batches_cte AS (
1989 SELECT
1990 operator_id,
1991 worker_id,
1992 COUNT(*) AS batches
1993 FROM
1994 mz_introspection.mz_arrangement_batches_raw
1995 GROUP BY
1996 operator_id, worker_id
1997),
1998records_cte AS (
1999 SELECT
2000 operator_id,
2001 worker_id,
2002 COUNT(*) AS records
2003 FROM
2004 mz_introspection.mz_arrangement_records_raw
2005 GROUP BY
2006 operator_id, worker_id
2007),
2008heap_size_cte AS (
2009 SELECT
2010 operator_id,
2011 worker_id,
2012 COUNT(*) AS size
2013 FROM
2014 mz_introspection.mz_arrangement_heap_size_raw
2015 GROUP BY
2016 operator_id, worker_id
2017),
2018heap_capacity_cte AS (
2019 SELECT
2020 operator_id,
2021 worker_id,
2022 COUNT(*) AS capacity
2023 FROM
2024 mz_introspection.mz_arrangement_heap_capacity_raw
2025 GROUP BY
2026 operator_id, worker_id
2027),
2028heap_allocations_cte AS (
2029 SELECT
2030 operator_id,
2031 worker_id,
2032 COUNT(*) AS allocations
2033 FROM
2034 mz_introspection.mz_arrangement_heap_allocations_raw
2035 GROUP BY
2036 operator_id, worker_id
2037),
2038batcher_records_cte AS (
2039 SELECT
2040 operator_id,
2041 worker_id,
2042 COUNT(*) AS records
2043 FROM
2044 mz_introspection.mz_arrangement_batcher_records_raw
2045 GROUP BY
2046 operator_id, worker_id
2047),
2048batcher_size_cte AS (
2049 SELECT
2050 operator_id,
2051 worker_id,
2052 COUNT(*) AS size
2053 FROM
2054 mz_introspection.mz_arrangement_batcher_size_raw
2055 GROUP BY
2056 operator_id, worker_id
2057),
2058batcher_capacity_cte AS (
2059 SELECT
2060 operator_id,
2061 worker_id,
2062 COUNT(*) AS capacity
2063 FROM
2064 mz_introspection.mz_arrangement_batcher_capacity_raw
2065 GROUP BY
2066 operator_id, worker_id
2067),
2068batcher_allocations_cte AS (
2069 SELECT
2070 operator_id,
2071 worker_id,
2072 COUNT(*) AS allocations
2073 FROM
2074 mz_introspection.mz_arrangement_batcher_allocations_raw
2075 GROUP BY
2076 operator_id, worker_id
2077),
2078combined AS (
2079 SELECT
2080 opw.operator_id,
2081 opw.worker_id,
2082 CASE
2083 WHEN records_cte.records IS NULL AND batcher_records_cte.records IS NULL THEN NULL
2084 ELSE COALESCE(records_cte.records, 0) + COALESCE(batcher_records_cte.records, 0)
2085 END AS records,
2086 batches_cte.batches AS batches,
2087 CASE
2088 WHEN heap_size_cte.size IS NULL AND batcher_size_cte.size IS NULL THEN NULL
2089 ELSE COALESCE(heap_size_cte.size, 0) + COALESCE(batcher_size_cte.size, 0)
2090 END AS size,
2091 CASE
2092 WHEN heap_capacity_cte.capacity IS NULL AND batcher_capacity_cte.capacity IS NULL THEN NULL
2093 ELSE COALESCE(heap_capacity_cte.capacity, 0) + COALESCE(batcher_capacity_cte.capacity, 0)
2094 END AS capacity,
2095 CASE
2096 WHEN heap_allocations_cte.allocations IS NULL AND batcher_allocations_cte.allocations IS NULL THEN NULL
2097 ELSE COALESCE(heap_allocations_cte.allocations, 0) + COALESCE(batcher_allocations_cte.allocations, 0)
2098 END AS allocations
2099 FROM
2100 operators_per_worker_cte opw
2101 LEFT OUTER JOIN batches_cte USING (operator_id, worker_id)
2102 LEFT OUTER JOIN records_cte USING (operator_id, worker_id)
2103 LEFT OUTER JOIN heap_size_cte USING (operator_id, worker_id)
2104 LEFT OUTER JOIN heap_capacity_cte USING (operator_id, worker_id)
2105 LEFT OUTER JOIN heap_allocations_cte USING (operator_id, worker_id)
2106 LEFT OUTER JOIN batcher_records_cte USING (operator_id, worker_id)
2107 LEFT OUTER JOIN batcher_size_cte USING (operator_id, worker_id)
2108 LEFT OUTER JOIN batcher_capacity_cte USING (operator_id, worker_id)
2109 LEFT OUTER JOIN batcher_allocations_cte USING (operator_id, worker_id)
2110)
2111SELECT
2112 operator_id, worker_id, records, batches, size, capacity, allocations
2113FROM combined
2114WHERE
2115 records IS NOT NULL
2116 OR batches IS NOT NULL
2117 OR size IS NOT NULL
2118 OR capacity IS NOT NULL
2119 OR allocations IS NOT NULL
2120",
2121 access: vec![PUBLIC_SELECT],
2122 ontology: None,
2123 }
2124});
2125
2126pub static MZ_ARRANGEMENT_SIZES: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2127 name: "mz_arrangement_sizes",
2128 schema: MZ_INTROSPECTION_SCHEMA,
2129 oid: oid::VIEW_MZ_ARRANGEMENT_SIZES_OID,
2130 desc: RelationDesc::builder()
2131 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
2132 .with_column("records", SqlScalarType::Int64.nullable(true))
2133 .with_column("batches", SqlScalarType::Int64.nullable(true))
2134 .with_column("size", SqlScalarType::Int64.nullable(true))
2135 .with_column("capacity", SqlScalarType::Int64.nullable(true))
2136 .with_column("allocations", SqlScalarType::Int64.nullable(true))
2137 .with_key(vec![0])
2138 .finish(),
2139 column_comments: BTreeMap::from_iter([
2140 (
2141 "operator_id",
2142 "The ID of the operator that created the arrangement. Corresponds to `mz_dataflow_operators.id`.",
2143 ),
2144 ("records", "The number of records in the arrangement."),
2145 ("batches", "The number of batches in the arrangement."),
2146 ("size", "The utilized size in bytes of the arrangement."),
2147 (
2148 "capacity",
2149 "The capacity in bytes of the arrangement. Can be larger than the size.",
2150 ),
2151 (
2152 "allocations",
2153 "The number of separate memory allocations backing the arrangement.",
2154 ),
2155 ]),
2156 sql: "
2157SELECT
2158 operator_id,
2159 SUM(records)::int8 AS records,
2160 SUM(batches)::int8 AS batches,
2161 SUM(size)::int8 AS size,
2162 SUM(capacity)::int8 AS capacity,
2163 SUM(allocations)::int8 AS allocations
2164FROM mz_introspection.mz_arrangement_sizes_per_worker
2165GROUP BY operator_id",
2166 access: vec![PUBLIC_SELECT],
2167 ontology: Some(Ontology {
2168 entity_name: "arrangement_size",
2169 description: "Aggregated arrangement sizes (records, batches, bytes)",
2170 links: &const {
2171 [OntologyLink {
2172 name: "arrangement_of_operator",
2173 target: "dataflow_operator",
2174 properties: LinkProperties::Measures {
2175 source_column: "operator_id",
2176 target_column: "id",
2177 metric: "arrangement_size",
2178 source_id_type: None,
2179 requires_mapping: None,
2180 note: Some(
2181 "Both IDs are local uint64 operator IDs within a dataflow, not GlobalIds.",
2182 ),
2183 extra_key_columns: None,
2184 },
2185 }]
2186 },
2187 column_semantic_types: &[],
2188 }),
2189});
2190
2191pub static MZ_ARRANGEMENT_SHARING_PER_WORKER: LazyLock<BuiltinView> =
2192 LazyLock::new(|| BuiltinView {
2193 name: "mz_arrangement_sharing_per_worker",
2194 schema: MZ_INTROSPECTION_SCHEMA,
2195 oid: oid::VIEW_MZ_ARRANGEMENT_SHARING_PER_WORKER_OID,
2196 desc: RelationDesc::builder()
2197 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
2198 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
2199 .with_column("count", SqlScalarType::Int64.nullable(false))
2200 .with_key(vec![0, 1])
2201 .finish(),
2202 column_comments: BTreeMap::new(),
2203 sql: "
2204SELECT
2205 operator_id,
2206 worker_id,
2207 pg_catalog.count(*) AS count
2208FROM mz_introspection.mz_arrangement_sharing_raw
2209GROUP BY operator_id, worker_id",
2210 access: vec![PUBLIC_SELECT],
2211 ontology: None,
2212 });
2213
2214pub static MZ_ARRANGEMENT_SHARING: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2215 name: "mz_arrangement_sharing",
2216 schema: MZ_INTROSPECTION_SCHEMA,
2217 oid: oid::VIEW_MZ_ARRANGEMENT_SHARING_OID,
2218 desc: RelationDesc::builder()
2219 .with_column("operator_id", SqlScalarType::UInt64.nullable(false))
2220 .with_column("count", SqlScalarType::Int64.nullable(false))
2221 .with_key(vec![0])
2222 .finish(),
2223 column_comments: BTreeMap::from_iter([
2224 (
2225 "operator_id",
2226 "The ID of the operator that created the arrangement. Corresponds to `mz_dataflow_operators.id`.",
2227 ),
2228 (
2229 "count",
2230 "The number of operators that share the arrangement.",
2231 ),
2232 ]),
2233 sql: "
2234SELECT operator_id, count
2235FROM mz_introspection.mz_arrangement_sharing_per_worker
2236WHERE worker_id = 0::uint8",
2237 access: vec![PUBLIC_SELECT],
2238 ontology: Some(Ontology {
2239 entity_name: "arrangement_sharing",
2240 description: "Arrangement sharing between operators",
2241 links: &const {
2242 [OntologyLink {
2243 name: "shared_by",
2244 target: "dataflow_operator",
2245 properties: LinkProperties::fk("operator_id", "id", Cardinality::OneToOne),
2246 }]
2247 },
2248 column_semantic_types: &[],
2249 }),
2250});
2251
2252pub static MZ_DATAFLOW_OPERATOR_PARENTS_PER_WORKER: LazyLock<BuiltinView> =
2253 LazyLock::new(|| BuiltinView {
2254 name: "mz_dataflow_operator_parents_per_worker",
2255 schema: MZ_INTROSPECTION_SCHEMA,
2256 oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_PARENTS_PER_WORKER_OID,
2257 desc: RelationDesc::builder()
2258 .with_column("id", SqlScalarType::UInt64.nullable(false))
2259 .with_column("parent_id", SqlScalarType::UInt64.nullable(false))
2260 .with_column("worker_id", SqlScalarType::UInt64.nullable(false))
2261 .finish(),
2262 column_comments: BTreeMap::new(),
2263 sql: "
2264WITH operator_addrs AS(
2265 SELECT
2266 id, address, worker_id
2267 FROM mz_introspection.mz_dataflow_addresses_per_worker
2268 INNER JOIN mz_introspection.mz_dataflow_operators_per_worker
2269 USING (id, worker_id)
2270),
2271parent_addrs AS (
2272 SELECT
2273 id,
2274 address[1:list_length(address) - 1] AS parent_address,
2275 worker_id
2276 FROM operator_addrs
2277)
2278SELECT pa.id, oa.id AS parent_id, pa.worker_id
2279FROM parent_addrs AS pa
2280 INNER JOIN operator_addrs AS oa
2281 ON pa.parent_address = oa.address
2282 AND pa.worker_id = oa.worker_id",
2283 access: vec![PUBLIC_SELECT],
2284 ontology: None,
2285 });
2286
2287pub static MZ_DATAFLOW_OPERATOR_PARENTS: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2288 name: "mz_dataflow_operator_parents",
2289 schema: MZ_INTROSPECTION_SCHEMA,
2290 oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_PARENTS_OID,
2291 desc: RelationDesc::builder()
2292 .with_column("id", SqlScalarType::UInt64.nullable(false))
2293 .with_column("parent_id", SqlScalarType::UInt64.nullable(false))
2294 .finish(),
2295 column_comments: BTreeMap::from_iter([
2296 (
2297 "id",
2298 "The ID of the operator. Corresponds to `mz_dataflow_operators.id`.",
2299 ),
2300 (
2301 "parent_id",
2302 "The ID of the operator's parent operator. Corresponds to `mz_dataflow_operators.id`.",
2303 ),
2304 ]),
2305 sql: "
2306SELECT id, parent_id
2307FROM mz_introspection.mz_dataflow_operator_parents_per_worker
2308WHERE worker_id = 0::uint8",
2309 access: vec![PUBLIC_SELECT],
2310 ontology: None,
2311});
2312
2313pub static MZ_DATAFLOW_ARRANGEMENT_SIZES: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2314 name: "mz_dataflow_arrangement_sizes",
2315 schema: MZ_INTROSPECTION_SCHEMA,
2316 oid: oid::VIEW_MZ_DATAFLOW_ARRANGEMENT_SIZES_OID,
2317 desc: RelationDesc::builder()
2318 .with_column("id", SqlScalarType::UInt64.nullable(false))
2319 .with_column("name", SqlScalarType::String.nullable(false))
2320 .with_column("records", SqlScalarType::Int64.nullable(true))
2321 .with_column("batches", SqlScalarType::Int64.nullable(true))
2322 .with_column("size", SqlScalarType::Int64.nullable(true))
2323 .with_column("capacity", SqlScalarType::Int64.nullable(true))
2324 .with_column("allocations", SqlScalarType::Int64.nullable(true))
2325 .with_key(vec![0, 1])
2326 .finish(),
2327 column_comments: BTreeMap::from_iter([
2328 (
2329 "id",
2330 "The ID of the [dataflow]. Corresponds to `mz_dataflows.id`.",
2331 ),
2332 ("name", "The name of the [dataflow]."),
2333 (
2334 "records",
2335 "The number of records in all arrangements in the dataflow.",
2336 ),
2337 (
2338 "batches",
2339 "The number of batches in all arrangements in the dataflow.",
2340 ),
2341 ("size", "The utilized size in bytes of the arrangements."),
2342 (
2343 "capacity",
2344 "The capacity in bytes of the arrangements. Can be larger than the size.",
2345 ),
2346 (
2347 "allocations",
2348 "The number of separate memory allocations backing the arrangements.",
2349 ),
2350 ]),
2351 sql: "
2352SELECT
2353 mdod.dataflow_id AS id,
2354 mdod.dataflow_name AS name,
2355 SUM(mas.records)::int8 AS records,
2356 SUM(mas.batches)::int8 AS batches,
2357 SUM(mas.size)::int8 AS size,
2358 SUM(mas.capacity)::int8 AS capacity,
2359 SUM(mas.allocations)::int8 AS allocations
2360FROM mz_introspection.mz_dataflow_operator_dataflows AS mdod
2361LEFT JOIN mz_introspection.mz_arrangement_sizes AS mas
2362 ON mdod.id = mas.operator_id
2363GROUP BY mdod.dataflow_id, mdod.dataflow_name",
2364 access: vec![PUBLIC_SELECT],
2365 ontology: None,
2366});
2367
2368pub static MZ_EXPECTED_GROUP_SIZE_ADVICE: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
2369 name: "mz_expected_group_size_advice",
2370 schema: MZ_INTROSPECTION_SCHEMA,
2371 oid: oid::VIEW_MZ_EXPECTED_GROUP_SIZE_ADVICE_OID,
2372 desc: RelationDesc::builder()
2373 .with_column("dataflow_id", SqlScalarType::UInt64.nullable(false))
2374 .with_column("dataflow_name", SqlScalarType::String.nullable(false))
2375 .with_column("region_id", SqlScalarType::UInt64.nullable(false))
2376 .with_column("region_name", SqlScalarType::String.nullable(false))
2377 .with_column("levels", SqlScalarType::Int64.nullable(false))
2378 .with_column("to_cut", SqlScalarType::Int64.nullable(false))
2379 .with_column(
2380 "savings",
2381 SqlScalarType::Numeric {
2382 max_scale: Some(NumericMaxScale::ZERO),
2383 }
2384 .nullable(true),
2385 )
2386 .with_column("hint", SqlScalarType::Float64.nullable(false))
2387 .finish(),
2388 column_comments: BTreeMap::from_iter([
2389 (
2390 "dataflow_id",
2391 "The ID of the [dataflow]. Corresponds to `mz_dataflows.id`.",
2392 ),
2393 (
2394 "dataflow_name",
2395 "The internal name of the dataflow hosting the min/max aggregation or Top K.",
2396 ),
2397 (
2398 "region_id",
2399 "The ID of the root operator scope. Corresponds to `mz_dataflow_operators.id`.",
2400 ),
2401 (
2402 "region_name",
2403 "The internal name of the root operator scope for the min/max aggregation or Top K.",
2404 ),
2405 (
2406 "levels",
2407 "The number of levels in the hierarchical scheme implemented by the region.",
2408 ),
2409 (
2410 "to_cut",
2411 "The number of levels that can be eliminated (cut) from the region's hierarchy.",
2412 ),
2413 (
2414 "savings",
2415 "A conservative estimate of the amount of memory in bytes to be saved by applying the hint.",
2416 ),
2417 (
2418 "hint",
2419 "The hint value that will eliminate `to_cut` levels from the region's hierarchy.",
2420 ),
2421 ]),
2422 sql: "
2423 -- The mz_expected_group_size_advice view provides tuning suggestions for the GROUP SIZE
2424 -- query hints. This tuning hint is effective for min/max/top-k patterns, where a stack
2425 -- of arrangements must be built. For each dataflow and region corresponding to one
2426 -- such pattern, we look for how many levels can be eliminated without hitting a level
2427 -- that actually substantially filters the input. The advice is constructed so that
2428 -- setting the hint for the affected region will eliminate these redundant levels of
2429 -- the hierarchical rendering.
2430 --
2431 -- A number of helper CTEs are used for the view definition. The first one, operators,
2432 -- looks for operator names that comprise arrangements of inputs to each level of a
2433 -- min/max/top-k hierarchy.
2434 WITH operators AS (
2435 SELECT
2436 dod.dataflow_id,
2437 dor.id AS region_id,
2438 dod.id,
2439 ars.records,
2440 ars.size
2441 FROM
2442 mz_introspection.mz_dataflow_operator_dataflows dod
2443 JOIN mz_introspection.mz_dataflow_addresses doa
2444 ON dod.id = doa.id
2445 JOIN mz_introspection.mz_dataflow_addresses dra
2446 ON dra.address = doa.address[:list_length(doa.address) - 1]
2447 JOIN mz_introspection.mz_dataflow_operators dor
2448 ON dor.id = dra.id
2449 JOIN mz_introspection.mz_arrangement_sizes ars
2450 ON ars.operator_id = dod.id
2451 WHERE
2452 dod.name = 'Arranged TopK input'
2453 OR dod.name = 'Arranged MinsMaxesHierarchical input'
2454 OR dod.name = 'Arrange ReduceMinsMaxes'
2455 ),
2456 -- The second CTE, levels, simply computes the heights of the min/max/top-k hierarchies
2457 -- identified in operators above.
2458 levels AS (
2459 SELECT o.dataflow_id, o.region_id, COUNT(*) AS levels
2460 FROM operators o
2461 GROUP BY o.dataflow_id, o.region_id
2462 ),
2463 -- The third CTE, pivot, determines for each min/max/top-k hierarchy, the first input
2464 -- operator. This operator is crucially important, as it records the number of records
2465 -- that was given as input to the gadget as a whole.
2466 pivot AS (
2467 SELECT
2468 o1.dataflow_id,
2469 o1.region_id,
2470 o1.id,
2471 o1.records
2472 FROM operators o1
2473 WHERE
2474 o1.id = (
2475 SELECT MIN(o2.id)
2476 FROM operators o2
2477 WHERE
2478 o2.dataflow_id = o1.dataflow_id
2479 AND o2.region_id = o1.region_id
2480 OPTIONS (AGGREGATE INPUT GROUP SIZE = 8)
2481 )
2482 ),
2483 -- The fourth CTE, candidates, will look for operators where the number of records
2484 -- maintained is not significantly different from the number at the pivot (excluding
2485 -- the pivot itself). These are the candidates for being cut from the dataflow region
2486 -- by adjusting the hint. The query includes a constant, heuristically tuned on TPC-H
2487 -- load generator data, to give some room for small deviations in number of records.
2488 -- The intuition for allowing for this deviation is that we are looking for a strongly
2489 -- reducing point in the hierarchy. To see why one such operator ought to exist in an
2490 -- untuned hierarchy, consider that at each level, we use hashing to distribute rows
2491 -- among groups where the min/max/top-k computation is (partially) applied. If the
2492 -- hierarchy has too many levels, the first-level (pivot) groups will be such that many
2493 -- groups might be empty or contain only one row. Each subsequent level will have a number
2494 -- of groups that is reduced exponentially. So at some point, we will find the level where
2495 -- we actually start having a few rows per group. That's where we will see the row counts
2496 -- significantly drop off.
2497 candidates AS (
2498 SELECT
2499 o.dataflow_id,
2500 o.region_id,
2501 o.id,
2502 o.records,
2503 o.size
2504 FROM
2505 operators o
2506 JOIN pivot p
2507 ON o.dataflow_id = p.dataflow_id
2508 AND o.region_id = p.region_id
2509 AND o.id <> p.id
2510 WHERE o.records >= p.records * (1 - 0.15)
2511 ),
2512 -- The fifth CTE, cuts, computes for each relevant dataflow region, the number of
2513 -- candidate levels that should be cut. We only return here dataflow regions where at
2514 -- least one level must be cut. Note that once we hit a point where the hierarchy starts
2515 -- to have a filtering effect, i.e., after the last candidate, it is dangerous to suggest
2516 -- cutting the height of the hierarchy further. This is because we will have way less
2517 -- groups in the next level, so there should be even further reduction happening or there
2518 -- is some substantial skew in the data. But if the latter is the case, then we should not
2519 -- tune the GROUP SIZE hints down anyway to avoid hurting latency upon updates directed
2520 -- at these unusually large groups. In addition to selecting the levels to cut, we also
2521 -- compute a conservative estimate of the memory savings in bytes that will result from
2522 -- cutting these levels from the hierarchy. The estimate is based on the sizes of the
2523 -- input arrangements for each level to be cut. These arrangements should dominate the
2524 -- size of each level that can be cut, since the reduction gadget internal to the level
2525 -- does not remove much data at these levels.
2526 cuts AS (
2527 SELECT c.dataflow_id, c.region_id, COUNT(*) AS to_cut, SUM(c.size) AS savings
2528 FROM candidates c
2529 GROUP BY c.dataflow_id, c.region_id
2530 HAVING COUNT(*) > 0
2531 )
2532 -- Finally, we compute the hint suggestion for each dataflow region based on the number of
2533 -- levels and the number of candidates to be cut. The hint is computed taking into account
2534 -- the fan-in used in rendering for the hash partitioning and reduction of the groups,
2535 -- currently equal to 16.
2536 SELECT
2537 dod.dataflow_id,
2538 dod.dataflow_name,
2539 dod.id AS region_id,
2540 dod.name AS region_name,
2541 l.levels,
2542 c.to_cut,
2543 c.savings,
2544 pow(16, l.levels - c.to_cut) - 1 AS hint
2545 FROM cuts c
2546 JOIN levels l
2547 ON c.dataflow_id = l.dataflow_id AND c.region_id = l.region_id
2548 JOIN mz_introspection.mz_dataflow_operator_dataflows dod
2549 ON dod.dataflow_id = c.dataflow_id AND dod.id = c.region_id",
2550 access: vec![PUBLIC_SELECT],
2551 ontology: Some(Ontology {
2552 entity_name: "group_size_advice",
2553 description: "Advice on expected group sizes for reduce operators",
2554 links: &const { [] },
2555 column_semantic_types: &[],
2556 }),
2557});