1use mz_catalog::memory::objects::{Cluster, ClusterReplica};
11
12use super::sql::SqlRequest;
13
14#[derive(Debug)]
15pub(crate) struct PrometheusSqlQuery<'a> {
16 pub(crate) metric_name: &'a str,
17 pub(crate) help: &'a str,
18 pub(crate) query: &'a str,
19 pub(crate) value_column_name: &'a str,
20 pub(crate) per_replica: bool,
21}
22
23impl<'a> PrometheusSqlQuery<'a> {
24 pub(crate) fn to_sql_request(
25 &self,
26 cluster: Option<(&Cluster, &ClusterReplica)>,
27 ) -> SqlRequest {
28 SqlRequest::Simple {
29 query: if let Some((cluster, replica)) = cluster {
30 format!(
31 "SET auto_route_catalog_queries = false; SET CLUSTER = '{}'; SET CLUSTER_REPLICA = '{}'; {}",
32 cluster.name, replica.name, self.query
33 )
34 } else {
35 format!(
36 "SET auto_route_catalog_queries = true; RESET CLUSTER; RESET CLUSTER_REPLICA; {}",
37 self.query
38 )
39 },
40 }
41 }
42}
43
44pub(crate) static FRONTIER_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
45 PrometheusSqlQuery {
46 metric_name: "mz_write_frontier",
47 help: "The global write frontiers of compute and storage collections.",
48 query: "SELECT
49 object_id AS collection_id,
50 coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier
51 FROM mz_internal.mz_frontiers
52 WHERE object_id NOT LIKE 't%';",
53 value_column_name: "write_frontier",
54 per_replica: false,
55 },
56 PrometheusSqlQuery {
57 metric_name: "mz_read_frontier",
58 help: "The global read frontiers of compute and storage collections.",
59 query: "SELECT
60 object_id AS collection_id,
61 coalesce(read_frontier::text::uint8, 18446744073709551615::uint8) AS read_frontier
62 FROM mz_internal.mz_frontiers
63 WHERE object_id NOT LIKE 't%';",
64 value_column_name: "read_frontier",
65 per_replica: false,
66 },
67 PrometheusSqlQuery {
68 metric_name: "mz_replica_write_frontiers",
69 help: "The per-replica write frontiers of compute and storage collections.",
70 query: "SELECT
71 object_id AS collection_id,
72 coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
73 cluster_id AS instance_id,
74 replica_id AS replica_id
75 FROM mz_catalog.mz_cluster_replica_frontiers
76 JOIN mz_cluster_replicas ON (id = replica_id)
77 WHERE object_id NOT LIKE 't%';",
78 value_column_name: "write_frontier",
79 per_replica: false,
80 },
81 PrometheusSqlQuery {
82 metric_name: "mz_replica_write_frontiers",
83 help: "The per-replica write frontiers of compute and storage collections.",
84 query: "SELECT
85 object_id AS collection_id,
86 coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
87 cluster_id AS instance_id,
88 replica_id AS replica_id
89 FROM mz_catalog.mz_cluster_replica_frontiers
90 JOIN mz_cluster_replicas ON (id = replica_id)
91 WHERE object_id NOT LIKE 't%';",
92 value_column_name: "write_frontier",
93 per_replica: false,
94 },
95];
96
97pub(crate) static USAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
98 PrometheusSqlQuery {
99 metric_name: "mz_compute_cluster_status",
100 help: "Reports the name, ID, size, and availability zone of each cluster replica. Value is always 1.",
101 query: "SELECT
102 1 AS status,
103 cr.cluster_id AS compute_cluster_id,
104 cr.id AS compute_replica_id,
105 c.name AS compute_cluster_name,
106 cr.name AS compute_replica_name,
107 COALESCE(cr.size, '') AS size,
108 COALESCE(cr.availability_zone, '') AS availability_zone,
109 mz_version() AS mz_version
110 FROM
111 mz_cluster_replicas AS cr
112 JOIN mz_clusters AS c
113 ON cr.cluster_id = c.id",
114 value_column_name: "status",
115 per_replica: false,
116 },
117 PrometheusSqlQuery {
118 metric_name: "mz_workload_clusters",
119 help: "Reports the workload_class of each user cluster. Value is always 1.",
120 query: "select
121 c.id as cluster_id,
122 c.name as cluster_name,
123 coalesce(wc.workload_class,'false') as workload_class,
124 1 as value
125 from mz_clusters c
126 join mz_internal.mz_cluster_workload_classes wc
127 on c.id = wc.id",
128 value_column_name: "value",
129 per_replica: false,
130 },
131 PrometheusSqlQuery {
132 metric_name: "mz_clusters_count",
133 help: "Number of active clusters in the instance.",
134 query: "select
135 count(id) as clusters
136 from mz_clusters
137 where id like 'u%'",
138 value_column_name: "clusters",
139 per_replica: false,
140 },
141 PrometheusSqlQuery {
142 metric_name: "mz_cluster_reps_count",
143 help: "Number of active cluster replicas in the instance, by replica size.",
144 query: "select size
145 , count(id) as replicas
146 from mz_cluster_replicas
147 where cluster_id like 'u%'
148 group by size",
149 value_column_name: "replicas",
150 per_replica: false,
151 },
152 PrometheusSqlQuery {
153 metric_name: "mz_indexes_count",
154 help: "Number of active indexes in the instance, by the type of relation on which the index is built.",
155 query: "select
156 o.type as relation_type
157 , count(i.id) as indexes
158 from mz_catalog.mz_indexes i
159 join mz_catalog.mz_objects o
160 on i.on_id = o.id
161 where i.id like 'u%'
162 group by relation_type",
163 value_column_name: "indexes",
164 per_replica: false,
165 },
166 PrometheusSqlQuery {
167 metric_name: "mz_sources_count",
168 help: "Number of active sources in the instance, by type, envelope type, and size of source.",
169 query: "SELECT
170 type,
171 COALESCE(envelope_type, '<none>') AS envelope_type,
172 mz_cluster_replicas.size AS size,
173 count(mz_sources.id) AS sources
174 FROM
175 mz_sources, mz_cluster_replicas
176 WHERE mz_sources.id LIKE 'u%'
177 AND mz_sources.type != 'subsource'
178 AND mz_sources.cluster_id = mz_cluster_replicas.cluster_id
179 GROUP BY
180 type, envelope_type, mz_cluster_replicas.size",
181 value_column_name: "sources",
182 per_replica: false,
183 },
184 PrometheusSqlQuery {
185 metric_name: "mz_views_count",
186 help: "Number of active views in the instance.",
187 query: "select count(id) as views from mz_views where id like 'u%'",
188 value_column_name: "views",
189 per_replica: false,
190 },
191 PrometheusSqlQuery {
192 metric_name: "mz_mzd_views_count",
193 help: "Number of active materialized views in the instance.",
194 query: "select count(id) as mzd_views from mz_materialized_views where id like 'u%'",
195 value_column_name: "mzd_views",
196 per_replica: false,
197 },
198 PrometheusSqlQuery {
199 metric_name: "mz_secrets_count",
200 help: "Number of active secrets in the instance.",
201 query: "select count(id) as secrets from mz_secrets where id like 'u%'",
202 value_column_name: "secrets",
203 per_replica: false,
204 },
205 PrometheusSqlQuery {
206 metric_name: "mz_sinks_count",
207 help: "Number of active sinks in the instance, by type, envelope type, and size.",
208 query: "SELECT
209 type,
210 COALESCE(envelope_type, '<none>') AS envelope_type,
211 mz_cluster_replicas.size AS size,
212 count(mz_sinks.id) AS sinks
213 FROM
214 mz_sinks, mz_cluster_replicas
215 WHERE mz_sinks.id LIKE 'u%'
216 AND mz_sinks.cluster_id = mz_cluster_replicas.cluster_id
217 GROUP BY type, envelope_type, mz_cluster_replicas.size",
218 value_column_name: "sinks",
219 per_replica: false,
220 },
221 PrometheusSqlQuery {
222 metric_name: "mz_connections_count",
223 help: "Number of active connections in the instance, by type.",
224 query: "select
225 type
226 , count(id) as connections
227 from mz_connections
228 where id like 'u%'
229 group by type",
230 value_column_name: "connections",
231 per_replica: false,
232 },
233 PrometheusSqlQuery {
234 metric_name: "mz_tables_count",
235 help: "Number of active tables in the instance.",
236 query: "select count(id) as tables from mz_tables where id like 'u%'",
237 value_column_name: "tables",
238 per_replica: false,
239 },
240 PrometheusSqlQuery {
241 metric_name: "mz_catalog_items",
242 help: "Mapping internal id for catalog item.",
243 query: "SELECT
244 concat(d.name, '.', s.name, '.', o.name) AS label,
245 0 AS value
246 FROM mz_objects o
247 JOIN mz_schemas s ON (o.schema_id = s.id)
248 JOIN mz_databases d ON (s.database_id = d.id)
249 WHERE o.id LIKE 'u%'",
250 value_column_name: "value",
251 per_replica: false,
252 },
253 PrometheusSqlQuery {
254 metric_name: "mz_object_id",
255 help: "Mapping external name for catalog item.",
256 query: "SELECT
257 o.id AS label1,
258 concat(d.name, '.', s.name, '.', o.name) AS label2,
259 0 AS value
260 FROM mz_objects o
261 JOIN mz_schemas s ON (o.schema_id = s.id)
262 JOIN mz_databases d ON (s.database_id = d.id)
263 WHERE o.id LIKE 'u%'",
264 value_column_name: "value",
265 per_replica: false,
266 },
267];
268
269pub(crate) static COMPUTE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
270 PrometheusSqlQuery {
271 metric_name: "mz_arrangement_count",
272 help: "The number of arrangements in a dataflow.",
273 query: "WITH
274 arrangements AS (
275 SELECT DISTINCT operator_id AS id
276 FROM mz_internal.mz_arrangement_records_raw
277 ),
278 collections AS (
279 SELECT
280 id,
281 CASE
282 WHEN starts_with(export_id, 't') THEN 'transient'
283 ELSE export_id
284 END AS export_id
285 FROM
286 mz_internal.mz_dataflow_addresses,
287 mz_internal.mz_compute_exports
288 WHERE address[1] = dataflow_id
289 )
290 SELECT
291 COALESCE(export_id, 'none') AS collection_id,
292 count(*) as count
293 FROM arrangements
294 LEFT JOIN collections USING (id)
295 GROUP BY export_id",
296 value_column_name: "count",
297 per_replica: true,
298 },
299 PrometheusSqlQuery {
300 metric_name: "mz_arrangement_record_count",
301 help: "The number of records in all arrangements in a dataflow.",
302 query: "WITH
303 collections AS (
304 SELECT
305 id,
306 CASE
307 WHEN starts_with(export_id, 't') THEN 'transient'
308 ELSE export_id
309 END AS export_id
310 FROM
311 mz_internal.mz_dataflow_addresses,
312 mz_internal.mz_compute_exports
313 WHERE address[1] = dataflow_id
314 )
315 SELECT
316 worker_id,
317 COALESCE(export_id, 'none') AS collection_id,
318 count(*) as count
319 FROM mz_internal.mz_arrangement_records_raw
320 LEFT JOIN collections ON (operator_id = id)
321 GROUP BY worker_id, export_id",
322 value_column_name: "count",
323 per_replica: true,
324 },
325 PrometheusSqlQuery {
326 metric_name: "mz_arrangement_batch_count",
327 help: "The number of batches in all arrangements in a dataflow.",
328 query: "WITH
329 collections AS (
330 SELECT
331 id,
332 CASE
333 WHEN starts_with(export_id, 't') THEN 'transient'
334 ELSE export_id
335 END AS export_id
336 FROM
337 mz_internal.mz_dataflow_addresses,
338 mz_internal.mz_compute_exports
339 WHERE address[1] = dataflow_id
340 )
341 SELECT
342 worker_id,
343 COALESCE(export_id, 'none') AS collection_id,
344 count(*) as count
345 FROM mz_internal.mz_arrangement_batches_raw
346 LEFT JOIN collections ON (operator_id = id)
347 GROUP BY worker_id, export_id",
348 value_column_name: "count",
349 per_replica: true,
350 },
351 PrometheusSqlQuery {
352 metric_name: "mz_arrangement_size_bytes",
353 help: "The size of all arrangements in a dataflow.",
354 query: "WITH
355 collections AS (
356 SELECT
357 id,
358 CASE
359 WHEN starts_with(export_id, 't') THEN 'transient'
360 ELSE export_id
361 END AS export_id
362 FROM
363 mz_internal.mz_dataflow_addresses,
364 mz_internal.mz_compute_exports
365 WHERE address[1] = dataflow_id
366 )
367 SELECT
368 worker_id,
369 COALESCE(export_id, 'none') AS collection_id,
370 count(*) as count
371 FROM mz_internal.mz_arrangement_heap_size_raw
372 LEFT JOIN collections ON (operator_id = id)
373 GROUP BY worker_id, export_id",
374 value_column_name: "count",
375 per_replica: true,
376 },
377 PrometheusSqlQuery {
378 metric_name: "mz_arrangement_capacity_bytes",
379 help: "The capacity of all arrangements in all dataflows.",
380 query: "SELECT
381 worker_id,
382 count(*) as count
383 FROM mz_internal.mz_arrangement_heap_capacity_raw
384 GROUP BY worker_id",
385 value_column_name: "count",
386 per_replica: true,
387 },
388 PrometheusSqlQuery {
389 metric_name: "mz_arrangement_allocation_count",
390 help: "The number of allocations in all arrangements in all dataflows.",
391 query: "SELECT
392 worker_id,
393 count(*) as count
394 FROM mz_internal.mz_arrangement_heap_allocations_raw
395 GROUP BY worker_id",
396 value_column_name: "count",
397 per_replica: true,
398 },
399 PrometheusSqlQuery {
400 metric_name: "mz_compute_replica_park_duration_seconds_total",
401 help: "The total time workers were parked since restart.",
402 query: "SELECT
403 worker_id,
404 sum(slept_for_ns * count)::float8 / 1000000000 AS duration_s
405 FROM mz_internal.mz_scheduling_parks_histogram_per_worker
406 GROUP BY worker_id",
407 value_column_name: "duration_s",
408 per_replica: true,
409 },
410 PrometheusSqlQuery {
411 metric_name: "mz_compute_replica_peek_count",
412 help: "The number of pending peeks.",
413 query: "SELECT worker_id, count(*) as count
414 FROM mz_internal.mz_active_peeks_per_worker
415 GROUP BY worker_id",
416 value_column_name: "count",
417 per_replica: true,
418 },
419 PrometheusSqlQuery {
420 metric_name: "mz_dataflow_elapsed_seconds_total",
421 help: "The total time spent computing a dataflow.",
422 query: "SELECT
423 worker_id,
424 CASE
425 WHEN starts_with(export_id, 't') THEN 'transient'
426 ELSE export_id
427 END AS collection_id,
428 sum(elapsed_ns)::float8 / 1000000000 AS elapsed_s
429 FROM
430 mz_internal.mz_scheduling_elapsed_per_worker AS s,
431 mz_internal.mz_dataflow_operators AS o,
432 mz_internal.mz_dataflow_addresses AS a,
433 mz_internal.mz_compute_exports AS e
434 WHERE
435 o.id = s.id AND
436 o.id = a.id AND
437 list_length(a.address) = 1 AND
438 e.dataflow_id = a.address[1]
439 GROUP BY worker_id, collection_id",
440 value_column_name: "elapsed_s",
441 per_replica: true,
442 },
443 PrometheusSqlQuery {
444 metric_name: "mz_dataflow_error_count",
445 help: "The number of errors in a dataflow",
446 query: "SELECT
447 CASE
448 WHEN starts_with(export_id, 't') THEN 'transient'
449 ELSE export_id
450 END AS collection_id,
451 count::uint8 as count
452 FROM mz_internal.mz_compute_error_counts",
453 value_column_name: "count",
454 per_replica: true,
455 },
456];
457
458pub(crate) static STORAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
459 PrometheusSqlQuery {
460 metric_name: "mz_storage_objects",
461 help: "Nicely labeled information about existing sources and sinks.",
462 value_column_name: "value",
463 per_replica: false,
464 query: "
465 WITH
466 -- All user, non- progress or subsource sources.
467 top_level_sources AS (
468 SELECT id, connection_id, type, envelope_type, cluster_id
469 FROM mz_sources
470 WHERE id LIKE 'u%' AND type NOT IN ('progress', 'subsource')
471 ),
472 -- Sources enriched with the type of the core connection.
473 source_and_conns AS (
474 SELECT top_level_sources.*, mc.type AS connection_type
475 FROM top_level_sources
476 LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sources.connection_id
477 ),
478
479 -- All user sinks.
480 top_level_sinks AS (
481 SELECT id, connection_id, type, envelope_type, cluster_id
482 FROM mz_sinks
483 WHERE id LIKE 'u%'
484 ),
485 -- Sinks enriched with the type of the core connection.
486 sink_and_conns AS (
487 SELECT top_level_sinks.*, mc.type AS connection_type
488 FROM top_level_sinks
489 LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sinks.connection_id
490 ),
491
492 -- All objects we care about
493 object_and_conns AS (
494 SELECT * FROM source_and_conns
495 UNION ALL
496 SELECT * FROM sink_and_conns
497 ),
498
499 -- The networking the object connection uses, if any.
500 networking AS (
501 SELECT object_and_conns.id, mc.id AS networking_id, mc.type AS networking_type
502 FROM object_and_conns
503 JOIN mz_internal.mz_object_dependencies mod ON object_and_conns.connection_id = mod.object_id
504 JOIN mz_connections mc ON mc.id = mod.referenced_object_id
505 -- Not required but made explicit
506 WHERE object_and_conns.connection_id IS NOT NULL
507 ),
508 -- The connection the format of the object uses, if any. This uses `mz_object_dependencies`
509 -- and a filter to find non-core objects the object depends on.
510 format_conns AS (
511 SELECT object_and_conns.id, mc.id AS format_connection_id, mc.type AS format_connection
512 FROM mz_internal.mz_object_dependencies mod
513 JOIN object_and_conns ON mod.object_id = object_and_conns.id
514 JOIN mz_connections mc ON mc.id = mod.referenced_object_id
515 WHERE mc.id NOT IN (SELECT connection_id FROM top_level_sources UNION ALL SELECT connection_id FROM top_level_sinks)
516 -- Not required but made explicit
517 AND object_and_conns.connection_id IS NOT NULL
518 ),
519 -- The networking used by `format_conns`, if any.
520 format_conn_deps AS (
521 SELECT format_conns.id, mc.id AS format_connection_networking_id, mc.type AS format_connection_networking
522 FROM format_conns
523 JOIN mz_internal.mz_object_dependencies mod ON mod.object_id = format_conns.format_connection_id
524 JOIN mz_connections mc
525 ON mc.id = mod.referenced_object_id
526 ),
527
528 -- When aggregating values that are known to be the same, we just use `MAX` for simplicity.
529
530 -- source_and_conns LEFT JOINed with the networking and format connection information.
531 -- Missing networking/format connections are coalesced to `none`, and aggregated with a comma.
532 -- This is because sources can have multiple type of networking (i.e. different kafka brokers with
533 -- different configuration), and multiple connections for formats (for key and value formats).
534 --
535 -- The actual format type is not yet included, as it depends on https://github.com/MaterializeInc/materialize/pull/23880
536 sources AS (
537 SELECT
538 -- Whether its a source or sink
539 'source' AS type,
540 1 AS value,
541 source_and_conns.id AS id,
542 -- What type of source/sink it is
543 MAX(type) AS object_type,
544 COALESCE(MAX(connection_type), 'none') AS connection_type,
545 COALESCE(MAX(envelope_type), 'none') AS envelope_type,
546 STRING_AGG(
547 DISTINCT COALESCE(networking_type, 'none'),
548 ','
549 ORDER BY COALESCE(networking_type, 'none') ASC
550 ) AS networking_type,
551 STRING_AGG(
552 DISTINCT COALESCE(format_connection, 'none'),
553 ','
554 ORDER BY COALESCE(format_connection, 'none') ASC
555 ) AS format_connection,
556 STRING_AGG(
557 DISTINCT COALESCE(format_connection_networking, 'none'),
558 ','
559 ORDER BY COALESCE(format_connection_networking, 'none') ASC
560 ) AS format_connection_networking,
561 MAX(cluster_id) AS cluster_id
562 FROM source_and_conns
563 LEFT OUTER JOIN networking ON networking.id = source_and_conns.id
564 LEFT OUTER JOIN format_conns ON format_conns.id = source_and_conns.id
565 LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = source_and_conns.id
566 GROUP BY source_and_conns.id
567 ),
568
569 sinks AS (
570 SELECT
571 'sink' AS type,
572 1 AS value,
573 sink_and_conns.id AS id,
574 MAX(type) AS object_type,
575 COALESCE(MAX(connection_type), 'none') AS connection_type,
576 COALESCE(MAX(envelope_type), 'none') AS envelope_type,
577 STRING_AGG(
578 DISTINCT COALESCE(networking_type, 'none'),
579 ','
580 ORDER BY COALESCE(networking_type, 'none') ASC
581 ) AS networking_type,
582 -- Sinks can only have 1 format connection but we aggregate
583 -- for consistency.
584 STRING_AGG(
585 DISTINCT COALESCE(format_connection, 'none'),
586 ','
587 ORDER BY COALESCE(format_connection, 'none') ASC
588 ) AS format_connection,
589 STRING_AGG(
590 DISTINCT COALESCE(format_connection_networking, 'none'),
591 ','
592 ORDER BY COALESCE(format_connection_networking, 'none') ASC
593 ) AS format_connection_networking,
594 MAX(cluster_id) AS cluster_id
595 FROM sink_and_conns
596 LEFT OUTER JOIN networking ON networking.id = sink_and_conns.id
597 LEFT OUTER JOIN format_conns ON format_conns.id = sink_and_conns.id
598 LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = sink_and_conns.id
599 GROUP BY sink_and_conns.id
600 ),
601
602 -- everything without replicas
603 together AS (
604 SELECT * FROM sources
605 UNION ALL
606 SELECT * from sinks
607 ),
608
609 with_cluster_replicas AS (
610 SELECT
611 -- `together.*` doesn't work because we need to aggregate the columns :(
612 MAX(together.id) AS id,
613 MAX(together.type) AS type,
614 MAX(together.object_type) AS object_type,
615 -- We just report 1 to the gauge. The `replica_id` labels aggregates the 0-many replicas associated
616 -- with this object.
617 MAX(together.value) AS value,
618 MAX(together.connection_type) AS connection_type,
619 MAX(together.envelope_type) AS envelope_type,
620 MAX(together.networking_type) AS networking_type,
621 MAX(together.format_connection) AS format_connection,
622 MAX(together.format_connection_networking) AS format_connection_networking,
623 MAX(together.cluster_id) AS cluster_id,
624 mcr.id AS replica_id,
625 -- Coalesce to 0 when there is no replica. This ensures `with_pod` below will generate
626 -- 1 row for the object. Also, -1 because `generate_series` is inclusive.
627 COALESCE((mcrs.processes - 1)::int, 0) AS processes
628 FROM together
629 LEFT OUTER JOIN mz_cluster_replicas mcr ON together.cluster_id = mcr.cluster_id
630 LEFT OUTER JOIN mz_catalog.mz_cluster_replica_sizes mcrs ON mcr.size = mcrs.size
631 GROUP BY together.id, mcr.id, mcrs.processes
632 ),
633
634 with_pod AS (
635 SELECT
636 id,
637 type,
638 object_type,
639 value,
640 connection_type,
641 envelope_type,
642 networking_type,
643 format_connection,
644 format_connection_networking,
645 cluster_id,
646 COALESCE(replica_id, 'none') AS replica_id,
647 -- When `replica_id` is NULL`, this coalesces to `none`.
648 COALESCE('cluster-' || cluster_id || '-replica-' || replica_id || '-' || generated, 'none') AS synthesized_pod
649 FROM with_cluster_replicas, generate_series(0, processes) generated
650 )
651
652 SELECT * FROM with_pod;
653 ",
654 },
655];