Skip to main content

mz_environmentd/http/
prometheus.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apach
9
10use mz_catalog::memory::objects::{Cluster, ClusterReplica};
11
12use super::sql::SqlRequest;
13
14#[derive(Debug)]
15pub(crate) struct PrometheusSqlQuery<'a> {
16    pub(crate) metric_name: &'a str,
17    pub(crate) help: &'a str,
18    pub(crate) query: &'a str,
19    pub(crate) value_column_name: &'a str,
20    pub(crate) per_replica: bool,
21}
22
23impl<'a> PrometheusSqlQuery<'a> {
24    pub(crate) fn to_sql_request(
25        &self,
26        cluster: Option<(&Cluster, &ClusterReplica)>,
27    ) -> SqlRequest {
28        SqlRequest::Simple {
29            query: if let Some((cluster, replica)) = cluster {
30                format!(
31                    "SET auto_route_catalog_queries = false; SET CLUSTER = '{}'; SET CLUSTER_REPLICA = '{}'; {}",
32                    cluster.name, replica.name, self.query
33                )
34            } else {
35                format!(
36                    "SET auto_route_catalog_queries = true; RESET CLUSTER; RESET CLUSTER_REPLICA; {}",
37                    self.query
38                )
39            },
40        }
41    }
42}
43
44pub(crate) static FRONTIER_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
45    PrometheusSqlQuery {
46        metric_name: "mz_write_frontier",
47        help: "The global write frontiers of compute and storage collections.",
48        query: "SELECT
49                    object_id AS collection_id,
50                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier
51                FROM mz_internal.mz_frontiers
52                WHERE object_id NOT LIKE 't%';",
53        value_column_name: "write_frontier",
54        per_replica: false,
55    },
56    PrometheusSqlQuery {
57        metric_name: "mz_read_frontier",
58        help: "The global read frontiers of compute and storage collections.",
59        query: "SELECT
60                    object_id AS collection_id,
61                    coalesce(read_frontier::text::uint8, 18446744073709551615::uint8) AS read_frontier
62                FROM mz_internal.mz_frontiers
63                WHERE object_id NOT LIKE 't%';",
64        value_column_name: "read_frontier",
65        per_replica: false,
66    },
67    PrometheusSqlQuery {
68        metric_name: "mz_replica_write_frontiers",
69        help: "The per-replica write frontiers of compute and storage collections.",
70        query: "SELECT
71                    object_id AS collection_id,
72                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
73                    cluster_id AS instance_id,
74                    replica_id AS replica_id
75                FROM mz_catalog.mz_cluster_replica_frontiers
76                JOIN mz_cluster_replicas ON (id = replica_id)
77                WHERE object_id NOT LIKE 't%';",
78        value_column_name: "write_frontier",
79        per_replica: false,
80    },
81    PrometheusSqlQuery {
82        metric_name: "mz_replica_write_frontiers",
83        help: "The per-replica write frontiers of compute and storage collections.",
84        query: "SELECT
85                    object_id AS collection_id,
86                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
87                    cluster_id AS instance_id,
88                    replica_id AS replica_id
89                FROM mz_catalog.mz_cluster_replica_frontiers
90                JOIN mz_cluster_replicas ON (id = replica_id)
91                WHERE object_id NOT LIKE 't%';",
92        value_column_name: "write_frontier",
93        per_replica: false,
94    },
95];
96
97pub(crate) static USAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
98    PrometheusSqlQuery {
99        metric_name: "mz_compute_cluster_status",
100        help: "Reports the name, ID, size, and availability zone of each cluster replica. Value is always 1.",
101        query: "SELECT
102                1 AS status,
103                cr.cluster_id AS compute_cluster_id,
104                cr.id AS compute_replica_id,
105                c.name AS compute_cluster_name,
106                cr.name AS compute_replica_name,
107                COALESCE(cr.size, '') AS size,
108                COALESCE(cr.availability_zone, '') AS availability_zone,
109                mz_version() AS mz_version
110          FROM
111            mz_cluster_replicas AS cr
112            JOIN mz_clusters AS c
113              ON cr.cluster_id = c.id",
114        value_column_name: "status",
115        per_replica: false,
116    },
117    PrometheusSqlQuery {
118        metric_name: "mz_workload_clusters",
119        help: "Reports the workload_class of each user cluster. Value is always 1.",
120        query: "select
121                    c.id as cluster_id,
122                    c.name as cluster_name,
123                    coalesce(wc.workload_class,'false') as workload_class,
124                    1 as value
125                from mz_clusters c
126                join mz_internal.mz_cluster_workload_classes wc
127                    on c.id = wc.id",
128        value_column_name: "value",
129        per_replica: false,
130    },
131    PrometheusSqlQuery {
132        metric_name: "mz_clusters_count",
133        help: "Number of active clusters in the instance.",
134        query: "select
135                count(id) as clusters
136            from mz_clusters
137            where id like 'u%'",
138        value_column_name: "clusters",
139        per_replica: false,
140    },
141    PrometheusSqlQuery {
142        metric_name: "mz_cluster_reps_count",
143        help: "Number of active cluster replicas in the instance, by replica size.",
144        query: "select size
145                , count(id) as replicas
146            from mz_cluster_replicas
147            where cluster_id like 'u%'
148            group by size",
149        value_column_name: "replicas",
150        per_replica: false,
151    },
152    PrometheusSqlQuery {
153        metric_name: "mz_indexes_count",
154        help: "Number of active indexes in the instance, by the type of relation on which the index is built.",
155        query: "select
156                o.type as relation_type
157                , count(i.id) as indexes
158            from mz_catalog.mz_indexes i
159            join mz_catalog.mz_objects o
160                on i.on_id = o.id
161            where i.id like 'u%'
162            group by relation_type",
163        value_column_name: "indexes",
164        per_replica: false,
165    },
166    PrometheusSqlQuery {
167        metric_name: "mz_sources_count",
168        help: "Number of active sources in the instance, by type, envelope type, and size of source.",
169        query: "SELECT
170                type,
171                COALESCE(envelope_type, '<none>') AS envelope_type,
172                mz_cluster_replicas.size AS size,
173                count(mz_sources.id) AS sources
174            FROM
175                mz_sources, mz_cluster_replicas
176            WHERE mz_sources.id LIKE 'u%'
177            AND mz_sources.type != 'subsource'
178            AND mz_sources.cluster_id = mz_cluster_replicas.cluster_id
179            GROUP BY
180            type, envelope_type, mz_cluster_replicas.size",
181        value_column_name: "sources",
182        per_replica: false,
183    },
184    PrometheusSqlQuery {
185        metric_name: "mz_views_count",
186        help: "Number of active views in the instance.",
187        query: "select count(id) as views from mz_views where id like 'u%'",
188        value_column_name: "views",
189        per_replica: false,
190    },
191    PrometheusSqlQuery {
192        metric_name: "mz_mzd_views_count",
193        help: "Number of active materialized views in the instance.",
194        query: "select count(id) as mzd_views from mz_materialized_views where id like 'u%'",
195        value_column_name: "mzd_views",
196        per_replica: false,
197    },
198    PrometheusSqlQuery {
199        metric_name: "mz_secrets_count",
200        help: "Number of active secrets in the instance.",
201        query: "select count(id) as secrets from mz_secrets where id like 'u%'",
202        value_column_name: "secrets",
203        per_replica: false,
204    },
205    PrometheusSqlQuery {
206        metric_name: "mz_sinks_count",
207        help: "Number of active sinks in the instance, by type, envelope type, and size.",
208        query: "SELECT
209                type,
210                COALESCE(envelope_type, '<none>') AS envelope_type,
211                mz_cluster_replicas.size AS size,
212                count(mz_sinks.id) AS sinks
213            FROM
214                mz_sinks, mz_cluster_replicas
215            WHERE mz_sinks.id LIKE 'u%'
216            AND mz_sinks.cluster_id = mz_cluster_replicas.cluster_id
217            GROUP BY type, envelope_type, mz_cluster_replicas.size",
218        value_column_name: "sinks",
219        per_replica: false,
220    },
221    PrometheusSqlQuery {
222        metric_name: "mz_connections_count",
223        help: "Number of active connections in the instance, by type.",
224        query: "select
225                type
226                , count(id) as connections
227            from mz_connections
228            where id like 'u%'
229            group by type",
230        value_column_name: "connections",
231        per_replica: false,
232    },
233    PrometheusSqlQuery {
234        metric_name: "mz_tables_count",
235        help: "Number of active tables in the instance.",
236        query: "select count(id) as tables from mz_tables where id like 'u%'",
237        value_column_name: "tables",
238        per_replica: false,
239    },
240    PrometheusSqlQuery {
241        metric_name: "mz_catalog_items",
242        help: "Mapping internal id for catalog item.",
243        query: "SELECT
244                concat(d.name, '.', s.name, '.', o.name) AS label,
245                0 AS value
246            FROM mz_objects o
247            JOIN mz_schemas s ON (o.schema_id = s.id)
248            JOIN mz_databases d ON (s.database_id = d.id)
249            WHERE o.id LIKE 'u%'",
250        value_column_name: "value",
251        per_replica: false,
252    },
253    PrometheusSqlQuery {
254        metric_name: "mz_object_id",
255        help: "Mapping external name for catalog item.",
256        query: "SELECT
257                o.id AS label1,
258                concat(d.name, '.', s.name, '.', o.name) AS label2,
259                0 AS value
260            FROM mz_objects o
261            JOIN mz_schemas s ON (o.schema_id = s.id)
262            JOIN mz_databases d ON (s.database_id = d.id)
263            WHERE o.id LIKE 'u%'",
264        value_column_name: "value",
265        per_replica: false,
266    },
267];
268
269pub(crate) static COMPUTE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
270    PrometheusSqlQuery {
271        metric_name: "mz_arrangement_count",
272        help: "The number of arrangements in a dataflow.",
273        query: "WITH
274            arrangements AS (
275                SELECT DISTINCT operator_id AS id
276                FROM mz_internal.mz_arrangement_records_raw
277            ),
278            collections AS (
279                SELECT
280                    id,
281                    CASE
282                        WHEN starts_with(export_id, 't') THEN 'transient'
283                        ELSE export_id
284                    END AS export_id
285                FROM
286                    mz_internal.mz_dataflow_addresses,
287                    mz_internal.mz_compute_exports
288                WHERE address[1] = dataflow_id
289            )
290        SELECT
291            COALESCE(export_id, 'none') AS collection_id,
292            count(*) as count
293        FROM arrangements
294        LEFT JOIN collections USING (id)
295        GROUP BY export_id",
296        value_column_name: "count",
297        per_replica: true,
298    },
299    PrometheusSqlQuery {
300        metric_name: "mz_arrangement_record_count",
301        help: "The number of records in all arrangements in a dataflow.",
302        query: "WITH
303            collections AS (
304                SELECT
305                    id,
306                    CASE
307                        WHEN starts_with(export_id, 't') THEN 'transient'
308                        ELSE export_id
309                    END AS export_id
310                FROM
311                    mz_internal.mz_dataflow_addresses,
312                    mz_internal.mz_compute_exports
313                WHERE address[1] = dataflow_id
314            )
315        SELECT
316            worker_id,
317            COALESCE(export_id, 'none') AS collection_id,
318            count(*) as count
319        FROM mz_internal.mz_arrangement_records_raw
320        LEFT JOIN collections ON (operator_id = id)
321        GROUP BY worker_id, export_id",
322        value_column_name: "count",
323        per_replica: true,
324    },
325    PrometheusSqlQuery {
326        metric_name: "mz_arrangement_batch_count",
327        help: "The number of batches in all arrangements in a dataflow.",
328        query: "WITH
329            collections AS (
330                SELECT
331                    id,
332                    CASE
333                        WHEN starts_with(export_id, 't') THEN 'transient'
334                        ELSE export_id
335                    END AS export_id
336                FROM
337                    mz_internal.mz_dataflow_addresses,
338                    mz_internal.mz_compute_exports
339                WHERE address[1] = dataflow_id
340            )
341        SELECT
342            worker_id,
343            COALESCE(export_id, 'none') AS collection_id,
344            count(*) as count
345        FROM mz_internal.mz_arrangement_batches_raw
346        LEFT JOIN collections ON (operator_id = id)
347        GROUP BY worker_id, export_id",
348        value_column_name: "count",
349        per_replica: true,
350    },
351    PrometheusSqlQuery {
352        metric_name: "mz_arrangement_size_bytes",
353        help: "The size of all arrangements in a dataflow.",
354        query: "WITH
355            collections AS (
356                SELECT
357                    id,
358                    CASE
359                        WHEN starts_with(export_id, 't') THEN 'transient'
360                        ELSE export_id
361                    END AS export_id
362                FROM
363                    mz_internal.mz_dataflow_addresses,
364                    mz_internal.mz_compute_exports
365                WHERE address[1] = dataflow_id
366            )
367        SELECT
368            worker_id,
369            COALESCE(export_id, 'none') AS collection_id,
370            count(*) as count
371        FROM mz_internal.mz_arrangement_heap_size_raw
372        LEFT JOIN collections ON (operator_id = id)
373        GROUP BY worker_id, export_id",
374        value_column_name: "count",
375        per_replica: true,
376    },
377    PrometheusSqlQuery {
378        metric_name: "mz_arrangement_capacity_bytes",
379        help: "The capacity of all arrangements in all dataflows.",
380        query: "SELECT
381            worker_id,
382            count(*) as count
383        FROM mz_internal.mz_arrangement_heap_capacity_raw
384        GROUP BY worker_id",
385        value_column_name: "count",
386        per_replica: true,
387    },
388    PrometheusSqlQuery {
389        metric_name: "mz_arrangement_allocation_count",
390        help: "The number of allocations in all arrangements in all dataflows.",
391        query: "SELECT
392            worker_id,
393            count(*) as count
394        FROM mz_internal.mz_arrangement_heap_allocations_raw
395        GROUP BY worker_id",
396        value_column_name: "count",
397        per_replica: true,
398    },
399    PrometheusSqlQuery {
400        metric_name: "mz_compute_replica_park_duration_seconds_total",
401        help: "The total time workers were parked since restart.",
402        query: "SELECT
403            worker_id,
404            sum(slept_for_ns * count)::float8 / 1000000000 AS duration_s
405        FROM mz_internal.mz_scheduling_parks_histogram_per_worker
406        GROUP BY worker_id",
407        value_column_name: "duration_s",
408        per_replica: true,
409    },
410    PrometheusSqlQuery {
411        metric_name: "mz_compute_replica_peek_count",
412        help: "The number of pending peeks.",
413        query: "SELECT worker_id, count(*) as count
414        FROM mz_internal.mz_active_peeks_per_worker
415        GROUP BY worker_id",
416        value_column_name: "count",
417        per_replica: true,
418    },
419    PrometheusSqlQuery {
420        metric_name: "mz_dataflow_elapsed_seconds_total",
421        help: "The total time spent computing a dataflow.",
422        query: "SELECT
423            worker_id,
424            CASE
425                WHEN starts_with(export_id, 't') THEN 'transient'
426                ELSE export_id
427            END AS collection_id,
428            sum(elapsed_ns)::float8 / 1000000000 AS elapsed_s
429        FROM
430            mz_internal.mz_scheduling_elapsed_per_worker AS s,
431            mz_internal.mz_dataflow_operators AS o,
432            mz_internal.mz_dataflow_addresses AS a,
433            mz_internal.mz_compute_exports AS e
434        WHERE
435            o.id = s.id AND
436            o.id = a.id AND
437            list_length(a.address) = 1 AND
438            e.dataflow_id = a.address[1]
439        GROUP BY worker_id, collection_id",
440        value_column_name: "elapsed_s",
441        per_replica: true,
442    },
443    PrometheusSqlQuery {
444        metric_name: "mz_dataflow_error_count",
445        help: "The number of errors in a dataflow",
446        query: "SELECT
447            CASE
448                WHEN starts_with(export_id, 't') THEN 'transient'
449                ELSE export_id
450            END AS collection_id,
451            count::uint8 as count
452        FROM mz_internal.mz_compute_error_counts",
453        value_column_name: "count",
454        per_replica: true,
455    },
456];
457
458pub(crate) static STORAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
459    PrometheusSqlQuery {
460        metric_name: "mz_storage_objects",
461        help: "Nicely labeled information about existing sources and sinks.",
462        value_column_name: "value",
463        per_replica: false,
464        query: "
465        WITH
466            -- All user, non- progress or subsource sources.
467            top_level_sources AS (
468                SELECT id, connection_id, type, envelope_type, cluster_id
469                FROM mz_sources
470                WHERE id LIKE 'u%' AND type NOT IN ('progress', 'subsource')
471            ),
472            -- Sources enriched with the type of the core connection.
473            source_and_conns AS (
474                SELECT top_level_sources.*, mc.type AS connection_type
475                FROM top_level_sources
476                LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sources.connection_id
477            ),
478
479            -- All user sinks.
480            top_level_sinks AS (
481                SELECT id, connection_id, type, envelope_type, cluster_id
482                FROM mz_sinks
483                WHERE id LIKE 'u%'
484            ),
485            -- Sinks enriched with the type of the core connection.
486            sink_and_conns AS (
487                SELECT top_level_sinks.*, mc.type AS connection_type
488                FROM top_level_sinks
489                LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sinks.connection_id
490            ),
491
492            -- All objects we care about
493            object_and_conns AS (
494                SELECT * FROM source_and_conns
495                UNION ALL
496                SELECT * FROM sink_and_conns
497            ),
498
499            -- The networking the object connection uses, if any.
500            networking AS (
501                SELECT object_and_conns.id, mc.id AS networking_id, mc.type AS networking_type
502                FROM object_and_conns
503                JOIN mz_internal.mz_object_dependencies mod ON object_and_conns.connection_id = mod.object_id
504                JOIN mz_connections mc ON mc.id = mod.referenced_object_id
505                -- Not required but made explicit
506                WHERE object_and_conns.connection_id IS NOT NULL
507            ),
508            -- The connection the format of the object uses, if any. This uses `mz_object_dependencies`
509            -- and a filter to find non-core objects the object depends on.
510            format_conns AS (
511                SELECT object_and_conns.id, mc.id AS format_connection_id, mc.type AS format_connection
512                FROM mz_internal.mz_object_dependencies mod
513                JOIN object_and_conns ON mod.object_id = object_and_conns.id
514                JOIN mz_connections mc ON mc.id = mod.referenced_object_id
515                WHERE mc.id NOT IN (SELECT connection_id FROM top_level_sources UNION ALL SELECT connection_id FROM top_level_sinks)
516                -- Not required but made explicit
517                AND object_and_conns.connection_id IS NOT NULL
518            ),
519            -- The networking used by `format_conns`, if any.
520            format_conn_deps AS (
521                SELECT format_conns.id, mc.id AS format_connection_networking_id, mc.type AS format_connection_networking
522                FROM format_conns
523                JOIN mz_internal.mz_object_dependencies mod ON mod.object_id = format_conns.format_connection_id
524                JOIN mz_connections mc
525                ON mc.id = mod.referenced_object_id
526            ),
527
528            -- When aggregating values that are known to be the same, we just use `MAX` for simplicity.
529
530            -- source_and_conns LEFT JOINed with the networking and format connection information.
531            -- Missing networking/format connections are coalesced to `none`, and aggregated with a comma.
532            -- This is because sources can have multiple type of networking (i.e. different kafka brokers with
533            -- different configuration), and multiple connections for formats (for key and value formats).
534            --
535            -- The actual format type is not yet included, as it depends on https://github.com/MaterializeInc/materialize/pull/23880
536            sources AS (
537                SELECT
538                -- Whether its a source or sink
539                'source' AS type,
540                1 AS value,
541                source_and_conns.id AS id,
542                -- What type of source/sink it is
543                MAX(type) AS object_type,
544                COALESCE(MAX(connection_type), 'none') AS connection_type,
545                COALESCE(MAX(envelope_type), 'none') AS envelope_type,
546                STRING_AGG(
547                    DISTINCT COALESCE(networking_type, 'none'),
548                    ','
549                    ORDER BY COALESCE(networking_type, 'none') ASC
550                ) AS networking_type,
551                STRING_AGG(
552                    DISTINCT COALESCE(format_connection, 'none'),
553                    ','
554                    ORDER BY COALESCE(format_connection, 'none') ASC
555                ) AS format_connection,
556                STRING_AGG(
557                    DISTINCT COALESCE(format_connection_networking, 'none'),
558                    ','
559                    ORDER BY COALESCE(format_connection_networking, 'none') ASC
560                ) AS format_connection_networking,
561                MAX(cluster_id) AS cluster_id
562                FROM source_and_conns
563                LEFT OUTER JOIN networking ON networking.id = source_and_conns.id
564                LEFT OUTER JOIN format_conns ON format_conns.id = source_and_conns.id
565                LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = source_and_conns.id
566                GROUP BY source_and_conns.id
567            ),
568
569            sinks AS (
570                SELECT
571                'sink' AS type,
572                1 AS value,
573                sink_and_conns.id AS id,
574                MAX(type) AS object_type,
575                COALESCE(MAX(connection_type), 'none') AS connection_type,
576                COALESCE(MAX(envelope_type), 'none') AS envelope_type,
577                STRING_AGG(
578                    DISTINCT COALESCE(networking_type, 'none'),
579                    ','
580                    ORDER BY COALESCE(networking_type, 'none') ASC
581                ) AS networking_type,
582                -- Sinks can only have 1 format connection but we aggregate
583                -- for consistency.
584                STRING_AGG(
585                    DISTINCT COALESCE(format_connection, 'none'),
586                    ','
587                    ORDER BY COALESCE(format_connection, 'none') ASC
588                ) AS format_connection,
589                STRING_AGG(
590                    DISTINCT COALESCE(format_connection_networking, 'none'),
591                    ','
592                    ORDER BY COALESCE(format_connection_networking, 'none') ASC
593                ) AS format_connection_networking,
594                MAX(cluster_id) AS cluster_id
595                FROM sink_and_conns
596                LEFT OUTER JOIN networking ON networking.id = sink_and_conns.id
597                LEFT OUTER JOIN format_conns ON format_conns.id = sink_and_conns.id
598                LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = sink_and_conns.id
599                GROUP BY sink_and_conns.id
600            ),
601
602            -- everything without replicas
603            together AS (
604                SELECT * FROM sources
605                UNION ALL
606                SELECT * from sinks
607            ),
608
609            with_cluster_replicas AS (
610                SELECT
611                -- `together.*` doesn't work because we need to aggregate the columns :(
612                MAX(together.id) AS id,
613                MAX(together.type) AS type,
614                MAX(together.object_type) AS object_type,
615                -- We just report 1 to the gauge. The `replica_id` labels aggregates the 0-many replicas associated
616                -- with this object.
617                MAX(together.value) AS value,
618                MAX(together.connection_type) AS connection_type,
619                MAX(together.envelope_type) AS envelope_type,
620                MAX(together.networking_type) AS networking_type,
621                MAX(together.format_connection) AS format_connection,
622                MAX(together.format_connection_networking) AS format_connection_networking,
623                MAX(together.cluster_id) AS cluster_id,
624                mcr.id AS replica_id,
625                -- Coalesce to 0 when there is no replica. This ensures `with_pod` below will generate
626                -- 1 row for the object. Also, -1 because `generate_series` is inclusive.
627                COALESCE((mcrs.processes - 1)::int, 0) AS processes
628                FROM together
629                LEFT OUTER JOIN mz_cluster_replicas mcr ON together.cluster_id = mcr.cluster_id
630                LEFT OUTER JOIN mz_catalog.mz_cluster_replica_sizes mcrs ON mcr.size = mcrs.size
631                GROUP BY together.id, mcr.id, mcrs.processes
632            ),
633
634            with_pod AS (
635                SELECT
636                id,
637                type,
638                object_type,
639                value,
640                connection_type,
641                envelope_type,
642                networking_type,
643                format_connection,
644                format_connection_networking,
645                cluster_id,
646                COALESCE(replica_id, 'none') AS replica_id,
647                -- When `replica_id` is NULL`, this coalesces to `none`.
648                COALESCE('cluster-' || cluster_id || '-replica-' || replica_id || '-' || generated, 'none') AS synthesized_pod
649                FROM with_cluster_replicas, generate_series(0, processes) generated
650            )
651
652        SELECT * FROM with_pod;
653        ",
654    },
655];