1use mz_catalog::memory::objects::{Cluster, ClusterReplica};
11
12use super::sql::SqlRequest;
13
14#[derive(Debug)]
15pub(crate) struct PrometheusSqlQuery<'a> {
16 pub(crate) metric_name: &'a str,
17 pub(crate) help: &'a str,
18 pub(crate) query: &'a str,
19 pub(crate) value_column_name: &'a str,
20 pub(crate) per_replica: bool,
21}
22
23impl<'a> PrometheusSqlQuery<'a> {
24 pub(crate) fn to_sql_request(
25 &self,
26 cluster: Option<(&Cluster, &ClusterReplica)>,
27 ) -> SqlRequest {
28 SqlRequest::Simple {
29 query: if let Some((cluster, replica)) = cluster {
30 format!(
31 "SET auto_route_catalog_queries = false; SET CLUSTER = '{}'; SET CLUSTER_REPLICA = '{}'; {}",
32 cluster.name, replica.name, self.query
33 )
34 } else {
35 format!(
36 "SET auto_route_catalog_queries = true; RESET CLUSTER; RESET CLUSTER_REPLICA; {}",
37 self.query
38 )
39 },
40 }
41 }
42}
43
44pub(crate) static FRONTIER_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
45 PrometheusSqlQuery {
46 metric_name: "mz_write_frontier",
47 help: "The global write frontiers of compute and storage collections.",
48 query: "SELECT
49 object_id AS collection_id,
50 coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier
51 FROM mz_internal.mz_frontiers
52 WHERE object_id NOT LIKE 't%';",
53 value_column_name: "write_frontier",
54 per_replica: false,
55 },
56 PrometheusSqlQuery {
57 metric_name: "mz_read_frontier",
58 help: "The global read frontiers of compute and storage collections.",
59 query: "SELECT
60 object_id AS collection_id,
61 coalesce(read_frontier::text::uint8, 18446744073709551615::uint8) AS read_frontier
62 FROM mz_internal.mz_frontiers
63 WHERE object_id NOT LIKE 't%';",
64 value_column_name: "read_frontier",
65 per_replica: false,
66 },
67 PrometheusSqlQuery {
68 metric_name: "mz_replica_write_frontiers",
69 help: "The per-replica write frontiers of compute and storage collections.",
70 query: "SELECT
71 object_id AS collection_id,
72 coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
73 cluster_id AS instance_id,
74 replica_id AS replica_id
75 FROM mz_catalog.mz_cluster_replica_frontiers
76 JOIN mz_cluster_replicas ON (id = replica_id)
77 WHERE object_id NOT LIKE 't%';",
78 value_column_name: "write_frontier",
79 per_replica: false,
80 },
81 PrometheusSqlQuery {
82 metric_name: "mz_replica_write_frontiers",
83 help: "The per-replica write frontiers of compute and storage collections.",
84 query: "SELECT
85 object_id AS collection_id,
86 coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
87 cluster_id AS instance_id,
88 replica_id AS replica_id
89 FROM mz_catalog.mz_cluster_replica_frontiers
90 JOIN mz_cluster_replicas ON (id = replica_id)
91 WHERE object_id NOT LIKE 't%';",
92 value_column_name: "write_frontier",
93 per_replica: false,
94 },
95];
96
97pub(crate) static USAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
98 PrometheusSqlQuery {
99 metric_name: "mz_compute_cluster_status",
100 help: "Reports the name, ID, size, and availability zone of each cluster replica. Value is always 1.",
101 query: "SELECT
102 1 AS status,
103 cr.cluster_id AS compute_cluster_id,
104 cr.id AS compute_replica_id,
105 c.name AS compute_cluster_name,
106 cr.name AS compute_replica_name,
107 COALESCE(cr.size, '') AS size,
108 COALESCE(cr.availability_zone, '') AS availability_zone,
109 mz_version() AS mz_version
110 FROM
111 mz_cluster_replicas AS cr
112 JOIN mz_clusters AS c
113 ON cr.cluster_id = c.id",
114 value_column_name: "status",
115 per_replica: false,
116 },
117 PrometheusSqlQuery {
118 metric_name: "mz_workload_clusters",
119 help: "Reports the workload_class of each user cluster. Value is always 1.",
120 query: "select
121 c.id as cluster_id,
122 c.name as cluster_name,
123 coalesce(wc.workload_class,'false') as workload_class,
124 1 as value
125 from mz_clusters c
126 join mz_internal.mz_cluster_workload_classes wc
127 on c.id = wc.id",
128 value_column_name: "value",
129 per_replica: false,
130 },
131 PrometheusSqlQuery {
132 metric_name: "mz_clusters_count",
133 help: "Number of active clusters in the instance.",
134 query: "select
135 count(id) as clusters
136 from mz_clusters
137 where id like 'u%'",
138 value_column_name: "clusters",
139 per_replica: false,
140 },
141 PrometheusSqlQuery {
142 metric_name: "mz_cluster_reps_count",
143 help: "Number of active cluster replicas in the instance, by replica size.",
144 query: "select size
145 , count(id) as replicas
146 from mz_cluster_replicas
147 where cluster_id like 'u%'
148 group by size",
149 value_column_name: "replicas",
150 per_replica: false,
151 },
152 PrometheusSqlQuery {
153 metric_name: "mz_indexes_count",
154 help: "Number of active indexes in the instance, by the type of relation on which the index is built.",
155 query: "select
156 o.type as relation_type
157 , count(i.id) as indexes
158 from mz_catalog.mz_indexes i
159 join mz_catalog.mz_objects o
160 on i.on_id = o.id
161 where i.id like 'u%'
162 group by relation_type",
163 value_column_name: "indexes",
164 per_replica: false,
165 },
166 PrometheusSqlQuery {
167 metric_name: "mz_sources_count",
168 help: "Number of active sources in the instance, by type, envelope type, and size of source.",
169 query: "SELECT
170 type,
171 COALESCE(envelope_type, '<none>') AS envelope_type,
172 mz_cluster_replicas.size AS size,
173 count(mz_sources.id) AS sources
174 FROM
175 mz_sources, mz_cluster_replicas
176 WHERE mz_sources.id LIKE 'u%'
177 AND mz_sources.type != 'subsource'
178 AND mz_sources.cluster_id = mz_cluster_replicas.cluster_id
179 GROUP BY
180 type, envelope_type, mz_cluster_replicas.size",
181 value_column_name: "sources",
182 per_replica: false,
183 },
184 PrometheusSqlQuery {
185 metric_name: "mz_views_count",
186 help: "Number of active views in the instance.",
187 query: "select count(id) as views from mz_views where id like 'u%'",
188 value_column_name: "views",
189 per_replica: false,
190 },
191 PrometheusSqlQuery {
192 metric_name: "mz_mzd_views_count",
193 help: "Number of active materialized views in the instance.",
194 query: "select count(id) as mzd_views from mz_materialized_views where id like 'u%'",
195 value_column_name: "mzd_views",
196 per_replica: false,
197 },
198 PrometheusSqlQuery {
199 metric_name: "mz_secrets_count",
200 help: "Number of active secrets in the instance.",
201 query: "select count(id) as secrets from mz_secrets where id like 'u%'",
202 value_column_name: "secrets",
203 per_replica: false,
204 },
205 PrometheusSqlQuery {
206 metric_name: "mz_sinks_count",
207 help: "Number of active sinks in the instance, by type, envelope type, and size.",
208 query: "SELECT
209 type,
210 COALESCE(envelope_type, '<none>') AS envelope_type,
211 mz_cluster_replicas.size AS size,
212 count(mz_sinks.id) AS sinks
213 FROM
214 mz_sinks, mz_cluster_replicas
215 WHERE mz_sinks.id LIKE 'u%'
216 AND mz_sinks.cluster_id = mz_cluster_replicas.cluster_id
217 GROUP BY type, envelope_type, mz_cluster_replicas.size",
218 value_column_name: "sinks",
219 per_replica: false,
220 },
221 PrometheusSqlQuery {
222 metric_name: "mz_connections_count",
223 help: "Number of active connections in the instance, by type.",
224 query: "select
225 type
226 , count(id) as connections
227 from mz_connections
228 where id like 'u%'
229 group by type",
230 value_column_name: "connections",
231 per_replica: false,
232 },
233 PrometheusSqlQuery {
234 metric_name: "mz_tables_count",
235 help: "Number of active tables in the instance.",
236 query: "select count(id) as tables from mz_tables where id like 'u%'",
237 value_column_name: "tables",
238 per_replica: false,
239 },
240 PrometheusSqlQuery {
241 metric_name: "mz_catalog_items",
242 help: "Mapping internal id for catalog item.",
243 query: "SELECT
244 concat(d.name, '.', s.name, '.', o.name) AS label,
245 0 AS value
246 FROM mz_objects o
247 JOIN mz_schemas s ON (o.schema_id = s.id)
248 JOIN mz_databases d ON (s.database_id = d.id)
249 WHERE o.id LIKE 'u%'",
250 value_column_name: "value",
251 per_replica: false,
252 },
253 PrometheusSqlQuery {
254 metric_name: "mz_object_id",
255 help: "Mapping external name for catalog item.",
256 query: "SELECT
257 o.id AS label1,
258 concat(d.name, '.', s.name, '.', o.name) AS label2,
259 0 AS value
260 FROM mz_objects o
261 JOIN mz_schemas s ON (o.schema_id = s.id)
262 JOIN mz_databases d ON (s.database_id = d.id)
263 WHERE o.id LIKE 'u%'",
264 value_column_name: "value",
265 per_replica: false,
266 },
267];
268
269pub(crate) static COMPUTE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
270 PrometheusSqlQuery {
271 metric_name: "mz_arrangement_count",
272 help: "The number of arrangements in a dataflow.",
273 query: "WITH
274 arrangements AS (
275 SELECT DISTINCT operator_id AS id
276 FROM mz_internal.mz_arrangement_records_raw
277 ),
278 collections AS (
279 SELECT
280 id,
281 regexp_replace(export_id, '^t.+', 'transient') as export_id
282 FROM
283 mz_internal.mz_dataflow_addresses,
284 mz_internal.mz_compute_exports
285 WHERE address[1] = dataflow_id
286 )
287 SELECT
288 COALESCE(export_id, 'none') AS collection_id,
289 count(*) as count
290 FROM arrangements
291 LEFT JOIN collections USING (id)
292 GROUP BY export_id",
293 value_column_name: "count",
294 per_replica: true,
295 },
296 PrometheusSqlQuery {
297 metric_name: "mz_arrangement_record_count",
298 help: "The number of records in all arrangements in a dataflow.",
299 query: "WITH
300 collections AS (
301 SELECT
302 id,
303 regexp_replace(export_id, '^t.+', 'transient') as export_id
304 FROM
305 mz_internal.mz_dataflow_addresses,
306 mz_internal.mz_compute_exports
307 WHERE address[1] = dataflow_id
308 )
309 SELECT
310 worker_id,
311 COALESCE(export_id, 'none') AS collection_id,
312 count(*) as count
313 FROM mz_internal.mz_arrangement_records_raw
314 LEFT JOIN collections ON (operator_id = id)
315 GROUP BY worker_id, export_id",
316 value_column_name: "count",
317 per_replica: true,
318 },
319 PrometheusSqlQuery {
320 metric_name: "mz_arrangement_batch_count",
321 help: "The number of batches in all arrangements in a dataflow.",
322 query: "WITH
323 collections AS (
324 SELECT
325 id,
326 regexp_replace(export_id, '^t.+', 'transient') as export_id
327 FROM
328 mz_internal.mz_dataflow_addresses,
329 mz_internal.mz_compute_exports
330 WHERE address[1] = dataflow_id
331 )
332 SELECT
333 worker_id,
334 COALESCE(export_id, 'none') AS collection_id,
335 count(*) as count
336 FROM mz_internal.mz_arrangement_batches_raw
337 LEFT JOIN collections ON (operator_id = id)
338 GROUP BY worker_id, export_id",
339 value_column_name: "count",
340 per_replica: true,
341 },
342 PrometheusSqlQuery {
343 metric_name: "mz_arrangement_size_bytes",
344 help: "The size of all arrangements in a dataflow.",
345 query: "WITH
346 collections AS (
347 SELECT
348 id,
349 regexp_replace(export_id, '^t.+', 'transient') as export_id
350 FROM
351 mz_internal.mz_dataflow_addresses,
352 mz_internal.mz_compute_exports
353 WHERE address[1] = dataflow_id
354 )
355 SELECT
356 worker_id,
357 COALESCE(export_id, 'none') AS collection_id,
358 count(*) as count
359 FROM mz_internal.mz_arrangement_heap_size_raw
360 LEFT JOIN collections ON (operator_id = id)
361 GROUP BY worker_id, export_id",
362 value_column_name: "count",
363 per_replica: true,
364 },
365 PrometheusSqlQuery {
366 metric_name: "mz_arrangement_capacity_bytes",
367 help: "The capacity of all arrangements in all dataflows.",
368 query: "SELECT
369 worker_id,
370 count(*) as count
371 FROM mz_internal.mz_arrangement_heap_capacity_raw
372 GROUP BY worker_id",
373 value_column_name: "count",
374 per_replica: true,
375 },
376 PrometheusSqlQuery {
377 metric_name: "mz_arrangement_allocation_count",
378 help: "The number of allocations in all arrangements in all dataflows.",
379 query: "SELECT
380 worker_id,
381 count(*) as count
382 FROM mz_internal.mz_arrangement_heap_allocations_raw
383 GROUP BY worker_id",
384 value_column_name: "count",
385 per_replica: true,
386 },
387 PrometheusSqlQuery {
388 metric_name: "mz_compute_replica_park_duration_seconds_total",
389 help: "The total time workers were parked since restart.",
390 query: "SELECT
391 worker_id,
392 sum(slept_for_ns * count)::float8 / 1000000000 AS duration_s
393 FROM mz_internal.mz_scheduling_parks_histogram_per_worker
394 GROUP BY worker_id",
395 value_column_name: "duration_s",
396 per_replica: true,
397 },
398 PrometheusSqlQuery {
399 metric_name: "mz_compute_replica_peek_count",
400 help: "The number of pending peeks.",
401 query: "SELECT worker_id, count(*) as count
402 FROM mz_internal.mz_active_peeks_per_worker
403 GROUP BY worker_id",
404 value_column_name: "count",
405 per_replica: true,
406 },
407 PrometheusSqlQuery {
408 metric_name: "mz_dataflow_elapsed_seconds_total",
409 help: "The total time spent computing a dataflow.",
410 query: "SELECT
411 worker_id,
412 regexp_replace(export_id, '^t.+', 'transient') AS collection_id,
413 sum(elapsed_ns)::float8 / 1000000000 AS elapsed_s
414 FROM
415 mz_internal.mz_scheduling_elapsed_per_worker AS s,
416 mz_internal.mz_dataflow_operators AS o,
417 mz_internal.mz_dataflow_addresses AS a,
418 mz_internal.mz_compute_exports AS e
419 WHERE
420 o.id = s.id AND
421 o.id = a.id AND
422 list_length(a.address) = 1 AND
423 e.dataflow_id = a.address[1]
424 GROUP BY worker_id, collection_id",
425 value_column_name: "elapsed_s",
426 per_replica: true,
427 },
428 PrometheusSqlQuery {
429 metric_name: "mz_dataflow_error_count",
430 help: "The number of errors in a dataflow",
431 query: "SELECT
432 regexp_replace(export_id, '^t.+', 'transient') AS collection_id,
433 count::uint8 as count
434 FROM mz_internal.mz_compute_error_counts",
435 value_column_name: "count",
436 per_replica: true,
437 },
438];
439
440pub(crate) static STORAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
441 PrometheusSqlQuery {
442 metric_name: "mz_storage_objects",
443 help: "Nicely labeled information about existing sources and sinks.",
444 value_column_name: "value",
445 per_replica: false,
446 query: "
447 WITH
448 -- All user, non- progress or subsource sources.
449 top_level_sources AS (
450 SELECT id, connection_id, type, envelope_type, cluster_id
451 FROM mz_sources
452 WHERE id LIKE 'u%' AND type NOT IN ('progress', 'subsource')
453 ),
454 -- Sources enriched with the type of the core connection.
455 source_and_conns AS (
456 SELECT top_level_sources.*, mc.type AS connection_type
457 FROM top_level_sources
458 LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sources.connection_id
459 ),
460
461 -- All user sinks.
462 top_level_sinks AS (
463 SELECT id, connection_id, type, envelope_type, cluster_id
464 FROM mz_sinks
465 WHERE id LIKE 'u%'
466 ),
467 -- Sinks enriched with the type of the core connection.
468 sink_and_conns AS (
469 SELECT top_level_sinks.*, mc.type AS connection_type
470 FROM top_level_sinks
471 LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sinks.connection_id
472 ),
473
474 -- All objects we care about
475 object_and_conns AS (
476 SELECT * FROM source_and_conns
477 UNION ALL
478 SELECT * FROM sink_and_conns
479 ),
480
481 -- The networking the object connection uses, if any.
482 networking AS (
483 SELECT object_and_conns.id, mc.id AS networking_id, mc.type AS networking_type
484 FROM object_and_conns
485 JOIN mz_internal.mz_object_dependencies mod ON object_and_conns.connection_id = mod.object_id
486 JOIN mz_connections mc ON mc.id = mod.referenced_object_id
487 -- Not required but made explicit
488 WHERE object_and_conns.connection_id IS NOT NULL
489 ),
490 -- The connection the format of the object uses, if any. This uses `mz_object_dependencies`
491 -- and a filter to find non-core objects the object depends on.
492 format_conns AS (
493 SELECT object_and_conns.id, mc.id AS format_connection_id, mc.type AS format_connection
494 FROM mz_internal.mz_object_dependencies mod
495 JOIN object_and_conns ON mod.object_id = object_and_conns.id
496 JOIN mz_connections mc ON mc.id = mod.referenced_object_id
497 WHERE mc.id NOT IN (SELECT connection_id FROM top_level_sources UNION ALL SELECT connection_id FROM top_level_sinks)
498 -- Not required but made explicit
499 AND object_and_conns.connection_id IS NOT NULL
500 ),
501 -- The networking used by `format_conns`, if any.
502 format_conn_deps AS (
503 SELECT format_conns.id, mc.id AS format_connection_networking_id, mc.type AS format_connection_networking
504 FROM format_conns
505 JOIN mz_internal.mz_object_dependencies mod ON mod.object_id = format_conns.format_connection_id
506 JOIN mz_connections mc
507 ON mc.id = mod.referenced_object_id
508 ),
509
510 -- When aggregating values that are known to be the same, we just use `MAX` for simplicity.
511
512 -- source_and_conns LEFT JOINed with the networking and format connection information.
513 -- Missing networking/format connections are coalesced to `none`, and aggregated with a comma.
514 -- This is because sources can have multiple type of networking (i.e. different kafka brokers with
515 -- different configuration), and multiple connections for formats (for key and value formats).
516 --
517 -- The actual format type is not yet included, as it depends on https://github.com/MaterializeInc/materialize/pull/23880
518 sources AS (
519 SELECT
520 -- Whether its a source or sink
521 'source' AS type,
522 1 AS value,
523 source_and_conns.id AS id,
524 -- What type of source/sink it is
525 MAX(type) AS object_type,
526 COALESCE(MAX(connection_type), 'none') AS connection_type,
527 COALESCE(MAX(envelope_type), 'none') AS envelope_type,
528 STRING_AGG(
529 DISTINCT COALESCE(networking_type, 'none'),
530 ','
531 ORDER BY COALESCE(networking_type, 'none') ASC
532 ) AS networking_type,
533 STRING_AGG(
534 DISTINCT COALESCE(format_connection, 'none'),
535 ','
536 ORDER BY COALESCE(format_connection, 'none') ASC
537 ) AS format_connection,
538 STRING_AGG(
539 DISTINCT COALESCE(format_connection_networking, 'none'),
540 ','
541 ORDER BY COALESCE(format_connection_networking, 'none') ASC
542 ) AS format_connection_networking,
543 MAX(cluster_id) AS cluster_id
544 FROM source_and_conns
545 LEFT OUTER JOIN networking ON networking.id = source_and_conns.id
546 LEFT OUTER JOIN format_conns ON format_conns.id = source_and_conns.id
547 LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = source_and_conns.id
548 GROUP BY source_and_conns.id
549 ),
550
551 sinks AS (
552 SELECT
553 'sink' AS type,
554 1 AS value,
555 sink_and_conns.id AS id,
556 MAX(type) AS object_type,
557 COALESCE(MAX(connection_type), 'none') AS connection_type,
558 COALESCE(MAX(envelope_type), 'none') AS envelope_type,
559 STRING_AGG(
560 DISTINCT COALESCE(networking_type, 'none'),
561 ','
562 ORDER BY COALESCE(networking_type, 'none') ASC
563 ) AS networking_type,
564 -- Sinks can only have 1 format connection but we aggregate
565 -- for consistency.
566 STRING_AGG(
567 DISTINCT COALESCE(format_connection, 'none'),
568 ','
569 ORDER BY COALESCE(format_connection, 'none') ASC
570 ) AS format_connection,
571 STRING_AGG(
572 DISTINCT COALESCE(format_connection_networking, 'none'),
573 ','
574 ORDER BY COALESCE(format_connection_networking, 'none') ASC
575 ) AS format_connection_networking,
576 MAX(cluster_id) AS cluster_id
577 FROM sink_and_conns
578 LEFT OUTER JOIN networking ON networking.id = sink_and_conns.id
579 LEFT OUTER JOIN format_conns ON format_conns.id = sink_and_conns.id
580 LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = sink_and_conns.id
581 GROUP BY sink_and_conns.id
582 ),
583
584 -- everything without replicas
585 together AS (
586 SELECT * FROM sources
587 UNION ALL
588 SELECT * from sinks
589 ),
590
591 with_cluster_replicas AS (
592 SELECT
593 -- `together.*` doesn't work because we need to aggregate the columns :(
594 MAX(together.id) AS id,
595 MAX(together.type) AS type,
596 MAX(together.object_type) AS object_type,
597 -- We just report 1 to the gauge. The `replica_id` labels aggregates the 0-many replicas associated
598 -- with this object.
599 MAX(together.value) AS value,
600 MAX(together.connection_type) AS connection_type,
601 MAX(together.envelope_type) AS envelope_type,
602 MAX(together.networking_type) AS networking_type,
603 MAX(together.format_connection) AS format_connection,
604 MAX(together.format_connection_networking) AS format_connection_networking,
605 MAX(together.cluster_id) AS cluster_id,
606 mcr.id AS replica_id,
607 -- Coalesce to 0 when there is no replica. This ensures `with_pod` below will generate
608 -- 1 row for the object. Also, -1 because `generate_series` is inclusive.
609 COALESCE((mcrs.processes - 1)::int, 0) AS processes
610 FROM together
611 LEFT OUTER JOIN mz_cluster_replicas mcr ON together.cluster_id = mcr.cluster_id
612 LEFT OUTER JOIN mz_catalog.mz_cluster_replica_sizes mcrs ON mcr.size = mcrs.size
613 GROUP BY together.id, mcr.id, mcrs.processes
614 ),
615
616 with_pod AS (
617 SELECT
618 id,
619 type,
620 object_type,
621 value,
622 connection_type,
623 envelope_type,
624 networking_type,
625 format_connection,
626 format_connection_networking,
627 cluster_id,
628 COALESCE(replica_id, 'none') AS replica_id,
629 -- When `replica_id` is NULL`, this coalesces to `none`.
630 COALESCE('cluster-' || cluster_id || '-replica-' || replica_id || '-' || generated, 'none') AS synthesized_pod
631 FROM with_cluster_replicas, generate_series(0, processes) generated
632 )
633
634 SELECT * FROM with_pod;
635 ",
636 },
637];