Skip to main content

mz_environmentd/http/
prometheus.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apach
9
10use mz_catalog::memory::objects::{Cluster, ClusterReplica};
11use mz_ore::sql;
12use mz_ore::sql::Sql;
13
14use super::sql::SqlRequest;
15
16#[derive(Debug)]
17pub(crate) struct PrometheusSqlQuery<'a> {
18    pub(crate) metric_name: &'a str,
19    pub(crate) help: &'a str,
20    pub(crate) query: &'static str,
21    pub(crate) value_column_name: &'a str,
22    pub(crate) per_replica: bool,
23}
24
25impl<'a> PrometheusSqlQuery<'a> {
26    pub(crate) fn to_sql_request(
27        &self,
28        cluster: Option<(&Cluster, &ClusterReplica)>,
29    ) -> SqlRequest {
30        let query = Sql::new(self.query);
31        let query = if let Some((cluster, replica)) = cluster {
32            sql!(
33                "SET auto_route_catalog_queries = false; SET CLUSTER = {}; SET CLUSTER_REPLICA = {}; {}",
34                Sql::literal(&cluster.name),
35                Sql::literal(&replica.name),
36                query
37            )
38        } else {
39            sql!(
40                "SET auto_route_catalog_queries = true; RESET CLUSTER; RESET CLUSTER_REPLICA; {}",
41                query
42            )
43        };
44        SqlRequest::Simple { query }
45    }
46}
47
48pub(crate) static FRONTIER_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
49    PrometheusSqlQuery {
50        metric_name: "mz_write_frontier",
51        help: "The global write frontiers of compute and storage collections.",
52        query: "SELECT
53                    object_id AS collection_id,
54                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier
55                FROM mz_internal.mz_frontiers
56                WHERE object_id NOT LIKE 't%';",
57        value_column_name: "write_frontier",
58        per_replica: false,
59    },
60    PrometheusSqlQuery {
61        metric_name: "mz_read_frontier",
62        help: "The global read frontiers of compute and storage collections.",
63        query: "SELECT
64                    object_id AS collection_id,
65                    coalesce(read_frontier::text::uint8, 18446744073709551615::uint8) AS read_frontier
66                FROM mz_internal.mz_frontiers
67                WHERE object_id NOT LIKE 't%';",
68        value_column_name: "read_frontier",
69        per_replica: false,
70    },
71    PrometheusSqlQuery {
72        metric_name: "mz_replica_write_frontiers",
73        help: "The per-replica write frontiers of compute and storage collections.",
74        query: "SELECT
75                    object_id AS collection_id,
76                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
77                    cluster_id AS instance_id,
78                    replica_id AS replica_id
79                FROM mz_catalog.mz_cluster_replica_frontiers
80                JOIN mz_cluster_replicas ON (id = replica_id)
81                WHERE object_id NOT LIKE 't%';",
82        value_column_name: "write_frontier",
83        per_replica: false,
84    },
85    PrometheusSqlQuery {
86        metric_name: "mz_replica_write_frontiers",
87        help: "The per-replica write frontiers of compute and storage collections.",
88        query: "SELECT
89                    object_id AS collection_id,
90                    coalesce(write_frontier::text::uint8, 18446744073709551615::uint8) AS write_frontier,
91                    cluster_id AS instance_id,
92                    replica_id AS replica_id
93                FROM mz_catalog.mz_cluster_replica_frontiers
94                JOIN mz_cluster_replicas ON (id = replica_id)
95                WHERE object_id NOT LIKE 't%';",
96        value_column_name: "write_frontier",
97        per_replica: false,
98    },
99];
100
101pub(crate) static USAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
102    PrometheusSqlQuery {
103        metric_name: "mz_compute_cluster_status",
104        help: "Reports the name, ID, size, and availability zone of each cluster replica. Value is always 1.",
105        query: "SELECT
106                1 AS status,
107                cr.cluster_id AS compute_cluster_id,
108                cr.id AS compute_replica_id,
109                c.name AS compute_cluster_name,
110                cr.name AS compute_replica_name,
111                COALESCE(cr.size, '') AS size,
112                COALESCE(cr.availability_zone, '') AS availability_zone,
113                mz_version() AS mz_version
114          FROM
115            mz_cluster_replicas AS cr
116            JOIN mz_clusters AS c
117              ON cr.cluster_id = c.id",
118        value_column_name: "status",
119        per_replica: false,
120    },
121    PrometheusSqlQuery {
122        metric_name: "mz_workload_clusters",
123        help: "Reports the workload_class of each user cluster. Value is always 1.",
124        query: "select
125                    c.id as cluster_id,
126                    c.name as cluster_name,
127                    coalesce(wc.workload_class,'false') as workload_class,
128                    1 as value
129                from mz_clusters c
130                join mz_internal.mz_cluster_workload_classes wc
131                    on c.id = wc.id",
132        value_column_name: "value",
133        per_replica: false,
134    },
135    PrometheusSqlQuery {
136        metric_name: "mz_clusters_count",
137        help: "Number of active clusters in the instance.",
138        query: "select
139                count(id) as clusters
140            from mz_clusters
141            where id like 'u%'",
142        value_column_name: "clusters",
143        per_replica: false,
144    },
145    PrometheusSqlQuery {
146        metric_name: "mz_cluster_reps_count",
147        help: "Number of active cluster replicas in the instance, by replica size.",
148        query: "select size
149                , count(id) as replicas
150            from mz_cluster_replicas
151            where cluster_id like 'u%'
152            group by size",
153        value_column_name: "replicas",
154        per_replica: false,
155    },
156    PrometheusSqlQuery {
157        metric_name: "mz_indexes_count",
158        help: "Number of active indexes in the instance, by the type of relation on which the index is built.",
159        query: "select
160                o.type as relation_type
161                , count(i.id) as indexes
162            from mz_catalog.mz_indexes i
163            join mz_catalog.mz_objects o
164                on i.on_id = o.id
165            where i.id like 'u%'
166            group by relation_type",
167        value_column_name: "indexes",
168        per_replica: false,
169    },
170    PrometheusSqlQuery {
171        metric_name: "mz_sources_count",
172        help: "Number of active sources in the instance, by type, envelope type, and size of source.",
173        query: "SELECT
174                type,
175                COALESCE(envelope_type, '<none>') AS envelope_type,
176                mz_cluster_replicas.size AS size,
177                count(mz_sources.id) AS sources
178            FROM
179                mz_sources, mz_cluster_replicas
180            WHERE mz_sources.id LIKE 'u%'
181            AND mz_sources.type != 'subsource'
182            AND mz_sources.cluster_id = mz_cluster_replicas.cluster_id
183            GROUP BY
184            type, envelope_type, mz_cluster_replicas.size",
185        value_column_name: "sources",
186        per_replica: false,
187    },
188    PrometheusSqlQuery {
189        metric_name: "mz_views_count",
190        help: "Number of active views in the instance.",
191        query: "select count(id) as views from mz_views where id like 'u%'",
192        value_column_name: "views",
193        per_replica: false,
194    },
195    PrometheusSqlQuery {
196        metric_name: "mz_mzd_views_count",
197        help: "Number of active materialized views in the instance.",
198        query: "select count(id) as mzd_views from mz_materialized_views where id like 'u%'",
199        value_column_name: "mzd_views",
200        per_replica: false,
201    },
202    PrometheusSqlQuery {
203        metric_name: "mz_secrets_count",
204        help: "Number of active secrets in the instance.",
205        query: "select count(id) as secrets from mz_secrets where id like 'u%'",
206        value_column_name: "secrets",
207        per_replica: false,
208    },
209    PrometheusSqlQuery {
210        metric_name: "mz_sinks_count",
211        help: "Number of active sinks in the instance, by type, envelope type, and size.",
212        query: "SELECT
213                type,
214                COALESCE(envelope_type, '<none>') AS envelope_type,
215                mz_cluster_replicas.size AS size,
216                count(mz_sinks.id) AS sinks
217            FROM
218                mz_sinks, mz_cluster_replicas
219            WHERE mz_sinks.id LIKE 'u%'
220            AND mz_sinks.cluster_id = mz_cluster_replicas.cluster_id
221            GROUP BY type, envelope_type, mz_cluster_replicas.size",
222        value_column_name: "sinks",
223        per_replica: false,
224    },
225    PrometheusSqlQuery {
226        metric_name: "mz_connections_count",
227        help: "Number of active connections in the instance, by type.",
228        query: "select
229                type
230                , count(id) as connections
231            from mz_connections
232            where id like 'u%'
233            group by type",
234        value_column_name: "connections",
235        per_replica: false,
236    },
237    PrometheusSqlQuery {
238        metric_name: "mz_tables_count",
239        help: "Number of active tables in the instance.",
240        query: "select count(id) as tables from mz_tables where id like 'u%'",
241        value_column_name: "tables",
242        per_replica: false,
243    },
244    PrometheusSqlQuery {
245        metric_name: "mz_catalog_items",
246        help: "Mapping internal id for catalog item.",
247        query: "SELECT
248                concat(d.name, '.', s.name, '.', o.name) AS label,
249                0 AS value
250            FROM mz_objects o
251            JOIN mz_schemas s ON (o.schema_id = s.id)
252            JOIN mz_databases d ON (s.database_id = d.id)
253            WHERE o.id LIKE 'u%'",
254        value_column_name: "value",
255        per_replica: false,
256    },
257    PrometheusSqlQuery {
258        metric_name: "mz_object_id",
259        help: "Mapping external name for catalog item.",
260        query: "SELECT
261                o.id AS label1,
262                concat(d.name, '.', s.name, '.', o.name) AS label2,
263                0 AS value
264            FROM mz_objects o
265            JOIN mz_schemas s ON (o.schema_id = s.id)
266            JOIN mz_databases d ON (s.database_id = d.id)
267            WHERE o.id LIKE 'u%'",
268        value_column_name: "value",
269        per_replica: false,
270    },
271];
272
273pub(crate) static COMPUTE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
274    PrometheusSqlQuery {
275        metric_name: "mz_arrangement_count",
276        help: "The number of arrangements in a dataflow.",
277        query: "WITH
278            arrangements AS (
279                SELECT DISTINCT operator_id AS id
280                FROM mz_internal.mz_arrangement_records_raw
281            ),
282            collections AS (
283                SELECT
284                    id,
285                    CASE
286                        WHEN starts_with(export_id, 't') THEN 'transient'
287                        ELSE export_id
288                    END AS export_id
289                FROM
290                    mz_internal.mz_dataflow_addresses,
291                    mz_internal.mz_compute_exports
292                WHERE address[1] = dataflow_id
293            )
294        SELECT
295            COALESCE(export_id, 'none') AS collection_id,
296            count(*) as count
297        FROM arrangements
298        LEFT JOIN collections USING (id)
299        GROUP BY export_id",
300        value_column_name: "count",
301        per_replica: true,
302    },
303    PrometheusSqlQuery {
304        metric_name: "mz_arrangement_record_count",
305        help: "The number of records in all arrangements in a dataflow.",
306        query: "WITH
307            collections AS (
308                SELECT
309                    id,
310                    CASE
311                        WHEN starts_with(export_id, 't') THEN 'transient'
312                        ELSE export_id
313                    END AS export_id
314                FROM
315                    mz_internal.mz_dataflow_addresses,
316                    mz_internal.mz_compute_exports
317                WHERE address[1] = dataflow_id
318            )
319        SELECT
320            worker_id,
321            COALESCE(export_id, 'none') AS collection_id,
322            count(*) as count
323        FROM mz_internal.mz_arrangement_records_raw
324        LEFT JOIN collections ON (operator_id = id)
325        GROUP BY worker_id, export_id",
326        value_column_name: "count",
327        per_replica: true,
328    },
329    PrometheusSqlQuery {
330        metric_name: "mz_arrangement_batch_count",
331        help: "The number of batches in all arrangements in a dataflow.",
332        query: "WITH
333            collections AS (
334                SELECT
335                    id,
336                    CASE
337                        WHEN starts_with(export_id, 't') THEN 'transient'
338                        ELSE export_id
339                    END AS export_id
340                FROM
341                    mz_internal.mz_dataflow_addresses,
342                    mz_internal.mz_compute_exports
343                WHERE address[1] = dataflow_id
344            )
345        SELECT
346            worker_id,
347            COALESCE(export_id, 'none') AS collection_id,
348            count(*) as count
349        FROM mz_internal.mz_arrangement_batches_raw
350        LEFT JOIN collections ON (operator_id = id)
351        GROUP BY worker_id, export_id",
352        value_column_name: "count",
353        per_replica: true,
354    },
355    PrometheusSqlQuery {
356        metric_name: "mz_arrangement_size_bytes",
357        help: "The size of all arrangements in a dataflow.",
358        query: "WITH
359            collections AS (
360                SELECT
361                    id,
362                    CASE
363                        WHEN starts_with(export_id, 't') THEN 'transient'
364                        ELSE export_id
365                    END AS export_id
366                FROM
367                    mz_internal.mz_dataflow_addresses,
368                    mz_internal.mz_compute_exports
369                WHERE address[1] = dataflow_id
370            )
371        SELECT
372            worker_id,
373            COALESCE(export_id, 'none') AS collection_id,
374            count(*) as count
375        FROM mz_internal.mz_arrangement_heap_size_raw
376        LEFT JOIN collections ON (operator_id = id)
377        GROUP BY worker_id, export_id",
378        value_column_name: "count",
379        per_replica: true,
380    },
381    PrometheusSqlQuery {
382        metric_name: "mz_arrangement_capacity_bytes",
383        help: "The capacity of all arrangements in all dataflows.",
384        query: "SELECT
385            worker_id,
386            count(*) as count
387        FROM mz_internal.mz_arrangement_heap_capacity_raw
388        GROUP BY worker_id",
389        value_column_name: "count",
390        per_replica: true,
391    },
392    PrometheusSqlQuery {
393        metric_name: "mz_arrangement_allocation_count",
394        help: "The number of allocations in all arrangements in all dataflows.",
395        query: "SELECT
396            worker_id,
397            count(*) as count
398        FROM mz_internal.mz_arrangement_heap_allocations_raw
399        GROUP BY worker_id",
400        value_column_name: "count",
401        per_replica: true,
402    },
403    PrometheusSqlQuery {
404        metric_name: "mz_compute_replica_park_duration_seconds_total",
405        help: "The total time workers were parked since restart.",
406        query: "SELECT
407            worker_id,
408            sum(slept_for_ns * count)::float8 / 1000000000 AS duration_s
409        FROM mz_internal.mz_scheduling_parks_histogram_per_worker
410        GROUP BY worker_id",
411        value_column_name: "duration_s",
412        per_replica: true,
413    },
414    PrometheusSqlQuery {
415        metric_name: "mz_compute_replica_peek_count",
416        help: "The number of pending peeks.",
417        query: "SELECT worker_id, count(*) as count
418        FROM mz_internal.mz_active_peeks_per_worker
419        GROUP BY worker_id",
420        value_column_name: "count",
421        per_replica: true,
422    },
423    PrometheusSqlQuery {
424        metric_name: "mz_dataflow_elapsed_seconds_total",
425        help: "The total time spent computing a dataflow.",
426        query: "SELECT
427            worker_id,
428            CASE
429                WHEN starts_with(export_id, 't') THEN 'transient'
430                ELSE export_id
431            END AS collection_id,
432            sum(elapsed_ns)::float8 / 1000000000 AS elapsed_s
433        FROM
434            mz_internal.mz_scheduling_elapsed_per_worker AS s,
435            mz_internal.mz_dataflow_operators AS o,
436            mz_internal.mz_dataflow_addresses AS a,
437            mz_internal.mz_compute_exports AS e
438        WHERE
439            o.id = s.id AND
440            o.id = a.id AND
441            list_length(a.address) = 1 AND
442            e.dataflow_id = a.address[1]
443        GROUP BY worker_id, collection_id",
444        value_column_name: "elapsed_s",
445        per_replica: true,
446    },
447    PrometheusSqlQuery {
448        metric_name: "mz_dataflow_error_count",
449        help: "The number of errors in a dataflow",
450        query: "SELECT
451            CASE
452                WHEN starts_with(export_id, 't') THEN 'transient'
453                ELSE export_id
454            END AS collection_id,
455            count::uint8 as count
456        FROM mz_internal.mz_compute_error_counts",
457        value_column_name: "count",
458        per_replica: true,
459    },
460];
461
462pub(crate) static STORAGE_METRIC_QUERIES: &[PrometheusSqlQuery] = &[
463    PrometheusSqlQuery {
464        metric_name: "mz_storage_objects",
465        help: "Nicely labeled information about existing sources and sinks.",
466        value_column_name: "value",
467        per_replica: false,
468        query: "
469        WITH
470            -- All user, non- progress or subsource sources.
471            top_level_sources AS (
472                SELECT id, connection_id, type, envelope_type, cluster_id
473                FROM mz_sources
474                WHERE id LIKE 'u%' AND type NOT IN ('progress', 'subsource')
475            ),
476            -- Sources enriched with the type of the core connection.
477            source_and_conns AS (
478                SELECT top_level_sources.*, mc.type AS connection_type
479                FROM top_level_sources
480                LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sources.connection_id
481            ),
482
483            -- All user sinks.
484            top_level_sinks AS (
485                SELECT id, connection_id, type, envelope_type, cluster_id
486                FROM mz_sinks
487                WHERE id LIKE 'u%'
488            ),
489            -- Sinks enriched with the type of the core connection.
490            sink_and_conns AS (
491                SELECT top_level_sinks.*, mc.type AS connection_type
492                FROM top_level_sinks
493                LEFT OUTER JOIN mz_connections mc ON mc.id = top_level_sinks.connection_id
494            ),
495
496            -- All objects we care about
497            object_and_conns AS (
498                SELECT * FROM source_and_conns
499                UNION ALL
500                SELECT * FROM sink_and_conns
501            ),
502
503            -- The networking the object connection uses, if any.
504            networking AS (
505                SELECT object_and_conns.id, mc.id AS networking_id, mc.type AS networking_type
506                FROM object_and_conns
507                JOIN mz_internal.mz_object_dependencies mod ON object_and_conns.connection_id = mod.object_id
508                JOIN mz_connections mc ON mc.id = mod.referenced_object_id
509                -- Not required but made explicit
510                WHERE object_and_conns.connection_id IS NOT NULL
511            ),
512            -- The connection the format of the object uses, if any. This uses `mz_object_dependencies`
513            -- and a filter to find non-core objects the object depends on.
514            format_conns AS (
515                SELECT object_and_conns.id, mc.id AS format_connection_id, mc.type AS format_connection
516                FROM mz_internal.mz_object_dependencies mod
517                JOIN object_and_conns ON mod.object_id = object_and_conns.id
518                JOIN mz_connections mc ON mc.id = mod.referenced_object_id
519                WHERE mc.id NOT IN (SELECT connection_id FROM top_level_sources UNION ALL SELECT connection_id FROM top_level_sinks)
520                -- Not required but made explicit
521                AND object_and_conns.connection_id IS NOT NULL
522            ),
523            -- The networking used by `format_conns`, if any.
524            format_conn_deps AS (
525                SELECT format_conns.id, mc.id AS format_connection_networking_id, mc.type AS format_connection_networking
526                FROM format_conns
527                JOIN mz_internal.mz_object_dependencies mod ON mod.object_id = format_conns.format_connection_id
528                JOIN mz_connections mc
529                ON mc.id = mod.referenced_object_id
530            ),
531
532            -- When aggregating values that are known to be the same, we just use `MAX` for simplicity.
533
534            -- source_and_conns LEFT JOINed with the networking and format connection information.
535            -- Missing networking/format connections are coalesced to `none`, and aggregated with a comma.
536            -- This is because sources can have multiple type of networking (i.e. different kafka brokers with
537            -- different configuration), and multiple connections for formats (for key and value formats).
538            --
539            -- The actual format type is not yet included, as it depends on https://github.com/MaterializeInc/materialize/pull/23880
540            sources AS (
541                SELECT
542                -- Whether its a source or sink
543                'source' AS type,
544                1 AS value,
545                source_and_conns.id AS id,
546                -- What type of source/sink it is
547                MAX(type) AS object_type,
548                COALESCE(MAX(connection_type), 'none') AS connection_type,
549                COALESCE(MAX(envelope_type), 'none') AS envelope_type,
550                STRING_AGG(
551                    DISTINCT COALESCE(networking_type, 'none'),
552                    ','
553                    ORDER BY COALESCE(networking_type, 'none') ASC
554                ) AS networking_type,
555                STRING_AGG(
556                    DISTINCT COALESCE(format_connection, 'none'),
557                    ','
558                    ORDER BY COALESCE(format_connection, 'none') ASC
559                ) AS format_connection,
560                STRING_AGG(
561                    DISTINCT COALESCE(format_connection_networking, 'none'),
562                    ','
563                    ORDER BY COALESCE(format_connection_networking, 'none') ASC
564                ) AS format_connection_networking,
565                MAX(cluster_id) AS cluster_id
566                FROM source_and_conns
567                LEFT OUTER JOIN networking ON networking.id = source_and_conns.id
568                LEFT OUTER JOIN format_conns ON format_conns.id = source_and_conns.id
569                LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = source_and_conns.id
570                GROUP BY source_and_conns.id
571            ),
572
573            sinks AS (
574                SELECT
575                'sink' AS type,
576                1 AS value,
577                sink_and_conns.id AS id,
578                MAX(type) AS object_type,
579                COALESCE(MAX(connection_type), 'none') AS connection_type,
580                COALESCE(MAX(envelope_type), 'none') AS envelope_type,
581                STRING_AGG(
582                    DISTINCT COALESCE(networking_type, 'none'),
583                    ','
584                    ORDER BY COALESCE(networking_type, 'none') ASC
585                ) AS networking_type,
586                -- Sinks can only have 1 format connection but we aggregate
587                -- for consistency.
588                STRING_AGG(
589                    DISTINCT COALESCE(format_connection, 'none'),
590                    ','
591                    ORDER BY COALESCE(format_connection, 'none') ASC
592                ) AS format_connection,
593                STRING_AGG(
594                    DISTINCT COALESCE(format_connection_networking, 'none'),
595                    ','
596                    ORDER BY COALESCE(format_connection_networking, 'none') ASC
597                ) AS format_connection_networking,
598                MAX(cluster_id) AS cluster_id
599                FROM sink_and_conns
600                LEFT OUTER JOIN networking ON networking.id = sink_and_conns.id
601                LEFT OUTER JOIN format_conns ON format_conns.id = sink_and_conns.id
602                LEFT OUTER JOIN format_conn_deps ON format_conn_deps.id = sink_and_conns.id
603                GROUP BY sink_and_conns.id
604            ),
605
606            -- everything without replicas
607            together AS (
608                SELECT * FROM sources
609                UNION ALL
610                SELECT * from sinks
611            ),
612
613            with_cluster_replicas AS (
614                SELECT
615                -- `together.*` doesn't work because we need to aggregate the columns :(
616                MAX(together.id) AS id,
617                MAX(together.type) AS type,
618                MAX(together.object_type) AS object_type,
619                -- We just report 1 to the gauge. The `replica_id` labels aggregates the 0-many replicas associated
620                -- with this object.
621                MAX(together.value) AS value,
622                MAX(together.connection_type) AS connection_type,
623                MAX(together.envelope_type) AS envelope_type,
624                MAX(together.networking_type) AS networking_type,
625                MAX(together.format_connection) AS format_connection,
626                MAX(together.format_connection_networking) AS format_connection_networking,
627                MAX(together.cluster_id) AS cluster_id,
628                mcr.id AS replica_id,
629                -- Coalesce to 0 when there is no replica. This ensures `with_pod` below will generate
630                -- 1 row for the object. Also, -1 because `generate_series` is inclusive.
631                COALESCE((mcrs.processes - 1)::int, 0) AS processes
632                FROM together
633                LEFT OUTER JOIN mz_cluster_replicas mcr ON together.cluster_id = mcr.cluster_id
634                LEFT OUTER JOIN mz_catalog.mz_cluster_replica_sizes mcrs ON mcr.size = mcrs.size
635                GROUP BY together.id, mcr.id, mcrs.processes
636            ),
637
638            with_pod AS (
639                SELECT
640                id,
641                type,
642                object_type,
643                value,
644                connection_type,
645                envelope_type,
646                networking_type,
647                format_connection,
648                format_connection_networking,
649                cluster_id,
650                COALESCE(replica_id, 'none') AS replica_id,
651                -- When `replica_id` is NULL`, this coalesces to `none`.
652                COALESCE('cluster-' || cluster_id || '-replica-' || replica_id || '-' || generated, 'none') AS synthesized_pod
653                FROM with_cluster_replicas, generate_series(0, processes) generated
654            )
655
656        SELECT * FROM with_pod;
657        ",
658    },
659];