Skip to main content

mz_environmentd/http/
prometheus.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apach
9
10use mz_catalog::memory::objects::{Cluster, ClusterReplica};
11use mz_sql::ast::display::escaped_string_literal;
12
13use super::sql::SqlRequest;
14
15#[derive(Debug)]
16pub(crate) struct PrometheusSqlQuery<'a> {
17    pub(crate) metric_name: &'a str,
18    pub(crate) help: &'a str,
19    pub(crate) query: &'a str,
20    pub(crate) value_column_name: &'a str,
21    pub(crate) per_replica: bool,
22}
23
24impl<'a> PrometheusSqlQuery<'a> {
25    pub(crate) fn to_sql_request(
26        &self,
27        cluster: Option<(&Cluster, &ClusterReplica)>,
28    ) -> SqlRequest {
29        SqlRequest::Simple {
30            query: if let Some((cluster, replica)) = cluster {
31                format!(
32                    "SET auto_route_catalog_queries = false; SET CLUSTER = {}; SET CLUSTER_REPLICA = {}; {}",
33                    escaped_string_literal(&cluster.name),
34                    escaped_string_literal(&replica.name),
35                    self.query
36                )
37            } else {
38                format!(
39                    "SET auto_route_catalog_queries = true; RESET CLUSTER; RESET CLUSTER_REPLICA; {}",
40                    self.query
41                )
42            },
43        }
44    }
45}
46
47pub(crate) static FRONTIER_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
48    PrometheusSqlQuery {
49        metric_name: "mz_write_frontier",
50        help: "The global write frontiers of compute and storage collections.",
51        query: "SELECT
52                    object_id AS collection_id,
53                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier
54                FROM mz_internal.mz_frontiers
55                WHERE object_id NOT LIKE 't%';",
56        value_column_name: "write_frontier",
57        per_replica: false,
58    },
59    PrometheusSqlQuery {
60        metric_name: "mz_read_frontier",
61        help: "The global read frontiers of compute and storage collections.",
62        query: "SELECT
63                    object_id AS collection_id,
64                    coalesce(read_frontier::text::uint8, 18446744073709551615::uint8) AS read_frontier
65                FROM mz_internal.mz_frontiers
66                WHERE object_id NOT LIKE 't%';",
67        value_column_name: "read_frontier",
68        per_replica: false,
69    },
70    PrometheusSqlQuery {
71        metric_name: "mz_replica_write_frontiers",
72        help: "The per-replica write frontiers of compute and storage collections.",
73        query: "SELECT
74                    object_id AS collection_id,
75                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
76                    cluster_id AS instance_id,
77                    replica_id AS replica_id
78                FROM mz_catalog.mz_cluster_replica_frontiers
79                JOIN mz_cluster_replicas ON (id = replica_id)
80                WHERE object_id NOT LIKE 't%';",
81        value_column_name: "write_frontier",
82        per_replica: false,
83    },
84    PrometheusSqlQuery {
85        metric_name: "mz_replica_write_frontiers",
86        help: "The per-replica write frontiers of compute and storage collections.",
87        query: "SELECT
88                    object_id AS collection_id,
89                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
90                    cluster_id AS instance_id,
91                    replica_id AS replica_id
92                FROM mz_catalog.mz_cluster_replica_frontiers
93                JOIN mz_cluster_replicas ON (id = replica_id)
94                WHERE object_id NOT LIKE 't%';",
95        value_column_name: "write_frontier",
96        per_replica: false,
97    },
98];
99
100pub(crate) static USAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
101    PrometheusSqlQuery {
102        metric_name: "mz_compute_cluster_status",
103        help: "Reports the name, ID, size, and availability zone of each cluster replica. Value is always 1.",
104        query: "SELECT
105                1 AS status,
106                cr.cluster_id AS compute_cluster_id,
107                cr.id AS compute_replica_id,
108                c.name AS compute_cluster_name,
109                cr.name AS compute_replica_name,
110                COALESCE(cr.size, '') AS size,
111                COALESCE(cr.availability_zone, '') AS availability_zone,
112                mz_version() AS mz_version
113          FROM
114            mz_cluster_replicas AS cr
115            JOIN mz_clusters AS c
116              ON cr.cluster_id = c.id",
117        value_column_name: "status",
118        per_replica: false,
119    },
120    PrometheusSqlQuery {
121        metric_name: "mz_workload_clusters",
122        help: "Reports the workload_class of each user cluster. Value is always 1.",
123        query: "select
124                    c.id as cluster_id,
125                    c.name as cluster_name,
126                    coalesce(wc.workload_class,'false') as workload_class,
127                    1 as value
128                from mz_clusters c
129                join mz_internal.mz_cluster_workload_classes wc
130                    on c.id = wc.id",
131        value_column_name: "value",
132        per_replica: false,
133    },
134    PrometheusSqlQuery {
135        metric_name: "mz_clusters_count",
136        help: "Number of active clusters in the instance.",
137        query: "select
138                count(id) as clusters
139            from mz_clusters
140            where id like 'u%'",
141        value_column_name: "clusters",
142        per_replica: false,
143    },
144    PrometheusSqlQuery {
145        metric_name: "mz_cluster_reps_count",
146        help: "Number of active cluster replicas in the instance, by replica size.",
147        query: "select size
148                , count(id) as replicas
149            from mz_cluster_replicas
150            where cluster_id like 'u%'
151            group by size",
152        value_column_name: "replicas",
153        per_replica: false,
154    },
155    PrometheusSqlQuery {
156        metric_name: "mz_indexes_count",
157        help: "Number of active indexes in the instance, by the type of relation on which the index is built.",
158        query: "select
159                o.type as relation_type
160                , count(i.id) as indexes
161            from mz_catalog.mz_indexes i
162            join mz_catalog.mz_objects o
163                on i.on_id = o.id
164            where i.id like 'u%'
165            group by relation_type",
166        value_column_name: "indexes",
167        per_replica: false,
168    },
169    PrometheusSqlQuery {
170        metric_name: "mz_sources_count",
171        help: "Number of active sources in the instance, by type, envelope type, and size of source.",
172        query: "SELECT
173                type,
174                COALESCE(envelope_type, '<none>') AS envelope_type,
175                mz_cluster_replicas.size AS size,
176                count(mz_sources.id) AS sources
177            FROM
178                mz_sources, mz_cluster_replicas
179            WHERE mz_sources.id LIKE 'u%'
180            AND mz_sources.type != 'subsource'
181            AND mz_sources.cluster_id = mz_cluster_replicas.cluster_id
182            GROUP BY
183            type, envelope_type, mz_cluster_replicas.size",
184        value_column_name: "sources",
185        per_replica: false,
186    },
187    PrometheusSqlQuery {
188        metric_name: "mz_views_count",
189        help: "Number of active views in the instance.",
190        query: "select count(id) as views from mz_views where id like 'u%'",
191        value_column_name: "views",
192        per_replica: false,
193    },
194    PrometheusSqlQuery {
195        metric_name: "mz_mzd_views_count",
196        help: "Number of active materialized views in the instance.",
197        query: "select count(id) as mzd_views from mz_materialized_views where id like 'u%'",
198        value_column_name: "mzd_views",
199        per_replica: false,
200    },
201    PrometheusSqlQuery {
202        metric_name: "mz_secrets_count",
203        help: "Number of active secrets in the instance.",
204        query: "select count(id) as secrets from mz_secrets where id like 'u%'",
205        value_column_name: "secrets",
206        per_replica: false,
207    },
208    PrometheusSqlQuery {
209        metric_name: "mz_sinks_count",
210        help: "Number of active sinks in the instance, by type, envelope type, and size.",
211        query: "SELECT
212                type,
213                COALESCE(envelope_type, '<none>') AS envelope_type,
214                mz_cluster_replicas.size AS size,
215                count(mz_sinks.id) AS sinks
216            FROM
217                mz_sinks, mz_cluster_replicas
218            WHERE mz_sinks.id LIKE 'u%'
219            AND mz_sinks.cluster_id = mz_cluster_replicas.cluster_id
220            GROUP BY type, envelope_type, mz_cluster_replicas.size",
221        value_column_name: "sinks",
222        per_replica: false,
223    },
224    PrometheusSqlQuery {
225        metric_name: "mz_connections_count",
226        help: "Number of active connections in the instance, by type.",
227        query: "select
228                type
229                , count(id) as connections
230            from mz_connections
231            where id like 'u%'
232            group by type",
233        value_column_name: "connections",
234        per_replica: false,
235    },
236    PrometheusSqlQuery {
237        metric_name: "mz_tables_count",
238        help: "Number of active tables in the instance.",
239        query: "select count(id) as tables from mz_tables where id like 'u%'",
240        value_column_name: "tables",
241        per_replica: false,
242    },
243    PrometheusSqlQuery {
244        metric_name: "mz_catalog_items",
245        help: "Mapping internal id for catalog item.",
246        query: "SELECT
247                concat(d.name, '.', s.name, '.', o.name) AS label,
248                0 AS value
249            FROM mz_objects o
250            JOIN mz_schemas s ON (o.schema_id = s.id)
251            JOIN mz_databases d ON (s.database_id = d.id)
252            WHERE o.id LIKE 'u%'",
253        value_column_name: "value",
254        per_replica: false,
255    },
256    PrometheusSqlQuery {
257        metric_name: "mz_object_id",
258        help: "Mapping external name for catalog item.",
259        query: "SELECT
260                o.id AS label1,
261                concat(d.name, '.', s.name, '.', o.name) AS label2,
262                0 AS value
263            FROM mz_objects o
264            JOIN mz_schemas s ON (o.schema_id = s.id)
265            JOIN mz_databases d ON (s.database_id = d.id)
266            WHERE o.id LIKE 'u%'",
267        value_column_name: "value",
268        per_replica: false,
269    },
270];
271
272pub(crate) static COMPUTE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
273    PrometheusSqlQuery {
274        metric_name: "mz_arrangement_count",
275        help: "The number of arrangements in a dataflow.",
276        query: "WITH
277            arrangements AS (
278                SELECT DISTINCT operator_id AS id
279                FROM mz_internal.mz_arrangement_records_raw
280            ),
281            collections AS (
282                SELECT
283                    id,
284                    CASE
285                        WHEN starts_with(export_id, 't') THEN 'transient'
286                        ELSE export_id
287                    END AS export_id
288                FROM
289                    mz_internal.mz_dataflow_addresses,
290                    mz_internal.mz_compute_exports
291                WHERE address[1] = dataflow_id
292            )
293        SELECT
294            COALESCE(export_id, 'none') AS collection_id,
295            count(*) as count
296        FROM arrangements
297        LEFT JOIN collections USING (id)
298        GROUP BY export_id",
299        value_column_name: "count",
300        per_replica: true,
301    },
302    PrometheusSqlQuery {
303        metric_name: "mz_arrangement_record_count",
304        help: "The number of records in all arrangements in a dataflow.",
305        query: "WITH
306            collections AS (
307                SELECT
308                    id,
309                    CASE
310                        WHEN starts_with(export_id, 't') THEN 'transient'
311                        ELSE export_id
312                    END AS export_id
313                FROM
314                    mz_internal.mz_dataflow_addresses,
315                    mz_internal.mz_compute_exports
316                WHERE address[1] = dataflow_id
317            )
318        SELECT
319            worker_id,
320            COALESCE(export_id, 'none') AS collection_id,
321            count(*) as count
322        FROM mz_internal.mz_arrangement_records_raw
323        LEFT JOIN collections ON (operator_id = id)
324        GROUP BY worker_id, export_id",
325        value_column_name: "count",
326        per_replica: true,
327    },
328    PrometheusSqlQuery {
329        metric_name: "mz_arrangement_batch_count",
330        help: "The number of batches in all arrangements in a dataflow.",
331        query: "WITH
332            collections AS (
333                SELECT
334                    id,
335                    CASE
336                        WHEN starts_with(export_id, 't') THEN 'transient'
337                        ELSE export_id
338                    END AS export_id
339                FROM
340                    mz_internal.mz_dataflow_addresses,
341                    mz_internal.mz_compute_exports
342                WHERE address[1] = dataflow_id
343            )
344        SELECT
345            worker_id,
346            COALESCE(export_id, 'none') AS collection_id,
347            count(*) as count
348        FROM mz_internal.mz_arrangement_batches_raw
349        LEFT JOIN collections ON (operator_id = id)
350        GROUP BY worker_id, export_id",
351        value_column_name: "count",
352        per_replica: true,
353    },
354    PrometheusSqlQuery {
355        metric_name: "mz_arrangement_size_bytes",
356        help: "The size of all arrangements in a dataflow.",
357        query: "WITH
358            collections AS (
359                SELECT
360                    id,
361                    CASE
362                        WHEN starts_with(export_id, 't') THEN 'transient'
363                        ELSE export_id
364                    END AS export_id
365                FROM
366                    mz_internal.mz_dataflow_addresses,
367                    mz_internal.mz_compute_exports
368                WHERE address[1] = dataflow_id
369            )
370        SELECT
371            worker_id,
372            COALESCE(export_id, 'none') AS collection_id,
373            count(*) as count
374        FROM mz_internal.mz_arrangement_heap_size_raw
375        LEFT JOIN collections ON (operator_id = id)
376        GROUP BY worker_id, export_id",
377        value_column_name: "count",
378        per_replica: true,
379    },
380    PrometheusSqlQuery {
381        metric_name: "mz_arrangement_capacity_bytes",
382        help: "The capacity of all arrangements in all dataflows.",
383        query: "SELECT
384            worker_id,
385            count(*) as count
386        FROM mz_internal.mz_arrangement_heap_capacity_raw
387        GROUP BY worker_id",
388        value_column_name: "count",
389        per_replica: true,
390    },
391    PrometheusSqlQuery {
392        metric_name: "mz_arrangement_allocation_count",
393        help: "The number of allocations in all arrangements in all dataflows.",
394        query: "SELECT
395            worker_id,
396            count(*) as count
397        FROM mz_internal.mz_arrangement_heap_allocations_raw
398        GROUP BY worker_id",
399        value_column_name: "count",
400        per_replica: true,
401    },
402    PrometheusSqlQuery {
403        metric_name: "mz_compute_replica_park_duration_seconds_total",
404        help: "The total time workers were parked since restart.",
405        query: "SELECT
406            worker_id,
407            sum(slept_for_ns * count)::float8 / 1000000000 AS duration_s
408        FROM mz_internal.mz_scheduling_parks_histogram_per_worker
409        GROUP BY worker_id",
410        value_column_name: "duration_s",
411        per_replica: true,
412    },
413    PrometheusSqlQuery {
414        metric_name: "mz_compute_replica_peek_count",
415        help: "The number of pending peeks.",
416        query: "SELECT worker_id, count(*) as count
417        FROM mz_internal.mz_active_peeks_per_worker
418        GROUP BY worker_id",
419        value_column_name: "count",
420        per_replica: true,
421    },
422    PrometheusSqlQuery {
423        metric_name: "mz_dataflow_elapsed_seconds_total",
424        help: "The total time spent computing a dataflow.",
425        query: "SELECT
426            worker_id,
427            CASE
428                WHEN starts_with(export_id, 't') THEN 'transient'
429                ELSE export_id
430            END AS collection_id,
431            sum(elapsed_ns)::float8 / 1000000000 AS elapsed_s
432        FROM
433            mz_internal.mz_scheduling_elapsed_per_worker AS s,
434            mz_internal.mz_dataflow_operators AS o,
435            mz_internal.mz_dataflow_addresses AS a,
436            mz_internal.mz_compute_exports AS e
437        WHERE
438            o.id = s.id AND
439            o.id = a.id AND
440            list_length(a.address) = 1 AND
441            e.dataflow_id = a.address[1]
442        GROUP BY worker_id, collection_id",
443        value_column_name: "elapsed_s",
444        per_replica: true,
445    },
446    PrometheusSqlQuery {
447        metric_name: "mz_dataflow_error_count",
448        help: "The number of errors in a dataflow",
449        query: "SELECT
450            CASE
451                WHEN starts_with(export_id, 't') THEN 'transient'
452                ELSE export_id
453            END AS collection_id,
454            count::uint8 as count
455        FROM mz_internal.mz_compute_error_counts",
456        value_column_name: "count",
457        per_replica: true,
458    },
459];
460
461pub(crate) static STORAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
462    PrometheusSqlQuery {
463        metric_name: "mz_storage_objects",
464        help: "Nicely labeled information about existing sources and sinks.",
465        value_column_name: "value",
466        per_replica: false,
467        query: "
468        WITH
469            -- All user, non- progress or subsource sources.
470            top_level_sources AS (
471                SELECT id, connection_id, type, envelope_type, cluster_id
472                FROM mz_sources
473                WHERE id LIKE 'u%' AND type NOT IN ('progress', 'subsource')
474            ),
475            -- Sources enriched with the type of the core connection.
476            source_and_conns AS (
477                SELECT top_level_sources.*, mc.type AS connection_type
478                FROM top_level_sources
479                LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sources.connection_id
480            ),
481
482            -- All user sinks.
483            top_level_sinks AS (
484                SELECT id, connection_id, type, envelope_type, cluster_id
485                FROM mz_sinks
486                WHERE id LIKE 'u%'
487            ),
488            -- Sinks enriched with the type of the core connection.
489            sink_and_conns AS (
490                SELECT top_level_sinks.*, mc.type AS connection_type
491                FROM top_level_sinks
492                LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sinks.connection_id
493            ),
494
495            -- All objects we care about
496            object_and_conns AS (
497                SELECT * FROM source_and_conns
498                UNION ALL
499                SELECT * FROM sink_and_conns
500            ),
501
502            -- The networking the object connection uses, if any.
503            networking AS (
504                SELECT object_and_conns.id, mc.id AS networking_id, mc.type AS networking_type
505                FROM object_and_conns
506                JOIN mz_internal.mz_object_dependencies mod ON object_and_conns.connection_id = mod.object_id
507                JOIN mz_connections mc ON mc.id = mod.referenced_object_id
508                -- Not required but made explicit
509                WHERE object_and_conns.connection_id IS NOT NULL
510            ),
511            -- The connection the format of the object uses, if any. This uses `mz_object_dependencies`
512            -- and a filter to find non-core objects the object depends on.
513            format_conns AS (
514                SELECT object_and_conns.id, mc.id AS format_connection_id, mc.type AS format_connection
515                FROM mz_internal.mz_object_dependencies mod
516                JOIN object_and_conns ON mod.object_id = object_and_conns.id
517                JOIN mz_connections mc ON mc.id = mod.referenced_object_id
518                WHERE mc.id NOT IN (SELECT connection_id FROM top_level_sources UNION ALL SELECT connection_id FROM top_level_sinks)
519                -- Not required but made explicit
520                AND object_and_conns.connection_id IS NOT NULL
521            ),
522            -- The networking used by `format_conns`, if any.
523            format_conn_deps AS (
524                SELECT format_conns.id, mc.id AS format_connection_networking_id, mc.type AS format_connection_networking
525                FROM format_conns
526                JOIN mz_internal.mz_object_dependencies mod ON mod.object_id = format_conns.format_connection_id
527                JOIN mz_connections mc
528                ON mc.id = mod.referenced_object_id
529            ),
530
531            -- When aggregating values that are known to be the same, we just use `MAX` for simplicity.
532
533            -- source_and_conns LEFT JOINed with the networking and format connection information.
534            -- Missing networking/format connections are coalesced to `none`, and aggregated with a comma.
535            -- This is because sources can have multiple type of networking (i.e. different kafka brokers with
536            -- different configuration), and multiple connections for formats (for key and value formats).
537            --
538            -- The actual format type is not yet included, as it depends on https://github.com/MaterializeInc/materialize/pull/23880
539            sources AS (
540                SELECT
541                -- Whether its a source or sink
542                'source' AS type,
543                1 AS value,
544                source_and_conns.id AS id,
545                -- What type of source/sink it is
546                MAX(type) AS object_type,
547                COALESCE(MAX(connection_type), 'none') AS connection_type,
548                COALESCE(MAX(envelope_type), 'none') AS envelope_type,
549                STRING_AGG(
550                    DISTINCT COALESCE(networking_type, 'none'),
551                    ','
552                    ORDER BY COALESCE(networking_type, 'none') ASC
553                ) AS networking_type,
554                STRING_AGG(
555                    DISTINCT COALESCE(format_connection, 'none'),
556                    ','
557                    ORDER BY COALESCE(format_connection, 'none') ASC
558                ) AS format_connection,
559                STRING_AGG(
560                    DISTINCT COALESCE(format_connection_networking, 'none'),
561                    ','
562                    ORDER BY COALESCE(format_connection_networking, 'none') ASC
563                ) AS format_connection_networking,
564                MAX(cluster_id) AS cluster_id
565                FROM source_and_conns
566                LEFT OUTER JOIN networking ON networking.id = source_and_conns.id
567                LEFT OUTER JOIN format_conns ON format_conns.id = source_and_conns.id
568                LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = source_and_conns.id
569                GROUP BY source_and_conns.id
570            ),
571
572            sinks AS (
573                SELECT
574                'sink' AS type,
575                1 AS value,
576                sink_and_conns.id AS id,
577                MAX(type) AS object_type,
578                COALESCE(MAX(connection_type), 'none') AS connection_type,
579                COALESCE(MAX(envelope_type), 'none') AS envelope_type,
580                STRING_AGG(
581                    DISTINCT COALESCE(networking_type, 'none'),
582                    ','
583                    ORDER BY COALESCE(networking_type, 'none') ASC
584                ) AS networking_type,
585                -- Sinks can only have 1 format connection but we aggregate
586                -- for consistency.
587                STRING_AGG(
588                    DISTINCT COALESCE(format_connection, 'none'),
589                    ','
590                    ORDER BY COALESCE(format_connection, 'none') ASC
591                ) AS format_connection,
592                STRING_AGG(
593                    DISTINCT COALESCE(format_connection_networking, 'none'),
594                    ','
595                    ORDER BY COALESCE(format_connection_networking, 'none') ASC
596                ) AS format_connection_networking,
597                MAX(cluster_id) AS cluster_id
598                FROM sink_and_conns
599                LEFT OUTER JOIN networking ON networking.id = sink_and_conns.id
600                LEFT OUTER JOIN format_conns ON format_conns.id = sink_and_conns.id
601                LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = sink_and_conns.id
602                GROUP BY sink_and_conns.id
603            ),
604
605            -- everything without replicas
606            together AS (
607                SELECT * FROM sources
608                UNION ALL
609                SELECT * from sinks
610            ),
611
612            with_cluster_replicas AS (
613                SELECT
614                -- `together.*` doesn't work because we need to aggregate the columns :(
615                MAX(together.id) AS id,
616                MAX(together.type) AS type,
617                MAX(together.object_type) AS object_type,
618                -- We just report 1 to the gauge. The `replica_id` labels aggregates the 0-many replicas associated
619                -- with this object.
620                MAX(together.value) AS value,
621                MAX(together.connection_type) AS connection_type,
622                MAX(together.envelope_type) AS envelope_type,
623                MAX(together.networking_type) AS networking_type,
624                MAX(together.format_connection) AS format_connection,
625                MAX(together.format_connection_networking) AS format_connection_networking,
626                MAX(together.cluster_id) AS cluster_id,
627                mcr.id AS replica_id,
628                -- Coalesce to 0 when there is no replica. This ensures `with_pod` below will generate
629                -- 1 row for the object. Also, -1 because `generate_series` is inclusive.
630                COALESCE((mcrs.processes - 1)::int, 0) AS processes
631                FROM together
632                LEFT OUTER JOIN mz_cluster_replicas mcr ON together.cluster_id = mcr.cluster_id
633                LEFT OUTER JOIN mz_catalog.mz_cluster_replica_sizes mcrs ON mcr.size = mcrs.size
634                GROUP BY together.id, mcr.id, mcrs.processes
635            ),
636
637            with_pod AS (
638                SELECT
639                id,
640                type,
641                object_type,
642                value,
643                connection_type,
644                envelope_type,
645                networking_type,
646                format_connection,
647                format_connection_networking,
648                cluster_id,
649                COALESCE(replica_id, 'none') AS replica_id,
650                -- When `replica_id` is NULL`, this coalesces to `none`.
651                COALESCE('cluster-' || cluster_id || '-replica-' || replica_id || '-' || generated, 'none') AS synthesized_pod
652                FROM with_cluster_replicas, generate_series(0, processes) generated
653            )
654
655        SELECT * FROM with_pod;
656        ",
657    },
658];