Skip to main content

mz_deploy/client/
introspection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Read-only catalog introspection queries.
11//!
12//! Methods on [`IntrospectionClient`] query the `mz_catalog` and
13//! `information_schema` to inspect the live environment without modifying it.
14//! Provides batch existence checks for schemas, clusters, and objects, as well
15//! as dependency lookups used during deployment planning and sink repointing.
16
17use crate::client::connection::{Client, IntrospectionClient};
18use crate::client::errors::ConnectionError;
19use crate::client::models::{Cluster, ClusterConfig, ClusterOptions, ClusterReplica, ObjectGrant};
20use crate::client::quote_identifier;
21use crate::client::sql_placeholders;
22use crate::client::staging_suffix_like_pattern;
23use crate::project::SchemaQualifier;
24use crate::project::ir::object_id::ObjectId;
25use itertools::Itertools;
26use std::collections::{BTreeMap, BTreeSet};
27use tokio_postgres::types::ToSql;
28
29/// A sink that depends on an object in a schema being dropped.
30///
31/// Used during apply to identify sinks that need to be repointed to new
32/// upstream objects before the old schemas are dropped with CASCADE.
33#[derive(Debug, Clone, serde::Serialize)]
34pub struct DependentSink {
35    pub sink_database: String,
36    pub sink_schema: String,
37    pub sink_name: String,
38    pub dependency_database: String,
39    pub dependency_schema: String,
40    pub dependency_name: String,
41    pub dependency_type: String,
42}
43
44/// Check if a schema exists in the specified database.
45pub(super) async fn schema_exists(
46    client: &Client,
47    database: &str,
48    schema: &str,
49) -> Result<bool, ConnectionError> {
50    let query = r#"
51        SELECT EXISTS(
52            SELECT 1
53            FROM mz_catalog.mz_schemas s
54            JOIN mz_catalog.mz_databases d ON s.database_id = d.id
55            WHERE s.name = $1 AND d.name = $2
56        ) AS exists
57    "#;
58
59    let row = client.query_one(query, &[&schema, &database]).await?;
60
61    Ok(row.get("exists"))
62}
63
64/// Check if a cluster exists.
65pub(super) async fn cluster_exists(client: &Client, name: &str) -> Result<bool, ConnectionError> {
66    let query = r#"
67        SELECT EXISTS(
68            SELECT 1 FROM mz_catalog.mz_clusters WHERE name = $1
69        ) AS exists
70    "#;
71
72    let row = client.query_one(query, &[&name]).await?;
73
74    Ok(row.get("exists"))
75}
76
77/// Get a cluster by name.
78pub(super) async fn get_cluster(
79    client: &Client,
80    name: &str,
81) -> Result<Option<Cluster>, ConnectionError> {
82    let query = r#"
83        SELECT
84            id,
85            name,
86            size,
87            replication_factor::bigint AS replication_factor
88        FROM mz_catalog.mz_clusters
89        WHERE name = $1
90    "#;
91
92    let rows = client.query(query, &[&name]).await?;
93
94    if rows.is_empty() {
95        return Ok(None);
96    }
97
98    let row = &rows[0];
99    Ok(Some(Cluster {
100        id: row.get("id"),
101        name: row.get("name"),
102        size: row.get("size"),
103        replication_factor: row.get("replication_factor"),
104    }))
105}
106
107/// List all clusters.
108pub(super) async fn list_clusters(client: &Client) -> Result<Vec<Cluster>, ConnectionError> {
109    let query = r#"
110        SELECT
111            id,
112            name,
113            size,
114            replication_factor::bigint AS replication_factor
115        FROM mz_catalog.mz_clusters
116        ORDER BY name
117    "#;
118
119    let rows = client.query(query, &[]).await?;
120
121    Ok(rows
122        .iter()
123        .map(|row| Cluster {
124            id: row.get("id"),
125            name: row.get("name"),
126            size: row.get("size"),
127            replication_factor: row.get("replication_factor"),
128        })
129        .collect())
130}
131
132/// Get cluster configuration including replicas and grants.
133///
134/// This fetches all information needed to clone a cluster's configuration:
135/// - For managed clusters: size and replication factor
136/// - For unmanaged clusters: replica configurations
137/// - For both: privilege grants
138pub(super) async fn get_cluster_config(
139    client: &Client,
140    name: &str,
141) -> Result<Option<ClusterConfig>, ConnectionError> {
142    // Query 1: Get cluster info and replicas with LEFT JOIN
143    let cluster_query = r#"
144        SELECT
145            c.id,
146            c.name,
147            c.managed,
148            c.size,
149            c.replication_factor::bigint AS replication_factor,
150            r.name AS replica_name,
151            r.size AS replica_size,
152            r.availability_zone
153        FROM mz_catalog.mz_clusters c
154        LEFT JOIN mz_catalog.mz_cluster_replicas r ON c.id = r.cluster_id
155        WHERE c.name = $1
156        ORDER BY r.name
157    "#;
158
159    let cluster_rows = client.query(cluster_query, &[&name]).await?;
160
161    if cluster_rows.is_empty() {
162        return Ok(None);
163    }
164
165    // Extract cluster-level info from first row
166    let first_row = &cluster_rows[0];
167    let managed: bool = first_row.get("managed");
168    let size: Option<String> = first_row.get("size");
169    let replication_factor: Option<i64> = first_row.get("replication_factor");
170
171    // Query 2: Get grants (excluding owner's implicit privileges)
172    let grants_query = r#"
173        WITH cluster_privilege AS (
174            SELECT mz_internal.mz_aclexplode(privileges).*, owner_id
175            FROM mz_clusters
176            WHERE name = $1
177        )
178        SELECT
179            grantee.name AS grantee,
180            c.privilege_type
181        FROM cluster_privilege AS c
182        JOIN mz_roles AS grantee ON c.grantee = grantee.id
183        WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
184          AND c.grantee != c.owner_id
185    "#;
186
187    let grant_rows = client.query(grants_query, &[&name]).await?;
188
189    let grants: Vec<ObjectGrant> = grant_rows
190        .iter()
191        .map(|row| ObjectGrant {
192            grantee: row.get("grantee"),
193            privilege_type: row.get("privilege_type"),
194        })
195        .collect();
196
197    if managed {
198        // Managed cluster
199        let size = size.ok_or_else(|| {
200            ConnectionError::Message(format!(
201                "Managed cluster '{}' has no size (unexpected)",
202                name
203            ))
204        })?;
205
206        let replication_factor = replication_factor.unwrap_or(1).try_into().map_err(|_| {
207            ConnectionError::Message(format!("Invalid replication_factor for cluster '{}'", name))
208        })?;
209
210        Ok(Some(ClusterConfig::Managed {
211            options: ClusterOptions {
212                size,
213                replication_factor,
214            },
215            grants,
216        }))
217    } else {
218        // Unmanaged cluster - collect replicas
219        let mut replicas = Vec::new();
220        for row in &cluster_rows {
221            let replica_name: Option<String> = row.get("replica_name");
222            if let Some(replica_name) = replica_name {
223                replicas.push(ClusterReplica {
224                    name: replica_name,
225                    size: row.get("replica_size"),
226                    availability_zone: row.get("availability_zone"),
227                });
228            }
229        }
230
231        Ok(Some(ClusterConfig::Unmanaged { replicas, grants }))
232    }
233}
234
235/// Check if a network policy exists.
236pub(super) async fn network_policy_exists(
237    client: &Client,
238    name: &str,
239) -> Result<bool, ConnectionError> {
240    let query = r#"
241        SELECT EXISTS(
242            SELECT 1 FROM mz_catalog.mz_network_policies WHERE name = $1
243        ) AS exists
244    "#;
245
246    let row = client.query_one(query, &[&name]).await?;
247
248    Ok(row.get("exists"))
249}
250
251/// Check if a role exists.
252pub(super) async fn role_exists(client: &Client, name: &str) -> Result<bool, ConnectionError> {
253    let query = r#"
254        SELECT EXISTS(
255            SELECT 1 FROM mz_catalog.mz_roles WHERE name = $1
256        ) AS exists
257    "#;
258
259    let row = client.query_one(query, &[&name]).await?;
260
261    Ok(row.get("exists"))
262}
263
264/// Get the members granted to a role.
265pub(super) async fn get_role_members(
266    client: &Client,
267    role_name: &str,
268) -> Result<Vec<String>, ConnectionError> {
269    let query = r#"
270        SELECT m.name AS member
271        FROM mz_catalog.mz_role_members rm
272        JOIN mz_catalog.mz_roles r ON r.id = rm.role_id
273        JOIN mz_catalog.mz_roles m ON m.id = rm.member
274        WHERE r.name = $1
275        ORDER BY m.name
276    "#;
277
278    let rows = client.query(query, &[&role_name]).await?;
279
280    Ok(rows.iter().map(|row| row.get("member")).collect())
281}
282
283/// Get session default parameter names for a role.
284pub(super) async fn get_role_parameters(
285    client: &Client,
286    role_name: &str,
287) -> Result<Vec<String>, ConnectionError> {
288    let query = r#"
289        SELECT rp.parameter_name
290        FROM mz_catalog.mz_role_parameters rp
291        JOIN mz_catalog.mz_roles r ON r.id = rp.role_id
292        WHERE r.name = $1
293        ORDER BY rp.parameter_name
294    "#;
295
296    let rows = client.query(query, &[&role_name]).await?;
297
298    Ok(rows.iter().map(|row| row.get("parameter_name")).collect())
299}
300
301/// Get the current Materialize user/role.
302pub(super) async fn get_current_user(client: &Client) -> Result<String, ConnectionError> {
303    let row = client.query_one("SELECT current_user()", &[]).await?;
304
305    Ok(row.get(0))
306}
307
308/// Check which schemas from a set of (database, schema) pairs exist.
309///
310/// Returns a BTreeSet of (database, schema) tuples that exist.
311pub(super) async fn check_schemas_exist(
312    client: &Client,
313    schemas: &[(String, String)],
314) -> Result<BTreeSet<(String, String)>, ConnectionError> {
315    if schemas.is_empty() {
316        return Ok(BTreeSet::new());
317    }
318
319    // Build FQN strings and a lookup map from FQN -> original tuple (reusing the same strings)
320    let fqns: Vec<String> = schemas
321        .iter()
322        .map(|(db, schema)| format!("{}.{}", db, schema))
323        .collect();
324
325    let fqn_map: BTreeMap<&str, &(String, String)> = fqns
326        .iter()
327        .zip_eq(schemas.iter())
328        .map(|(fqn, pair)| (fqn.as_str(), pair))
329        .collect();
330
331    let placeholders_str = sql_placeholders(fqns.len());
332
333    let query = format!(
334        r#"
335        SELECT d.name || '.' || s.name as fqn
336        FROM mz_catalog.mz_schemas s
337        JOIN mz_catalog.mz_databases d ON s.database_id = d.id
338        WHERE d.name || '.' || s.name IN ({})
339        ORDER BY fqn
340    "#,
341        placeholders_str
342    );
343
344    let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
345    for fqn in &fqns {
346        params.push(fqn);
347    }
348
349    let rows = client.query(&query, &params).await?;
350
351    let mut existing = BTreeSet::new();
352    for row in rows {
353        let fqn: String = row.get("fqn");
354        if let Some(pair) = fqn_map.get(fqn.as_str()) {
355            existing.insert((*pair).clone());
356        }
357    }
358
359    Ok(existing)
360}
361
362/// Check which clusters from a set of names exist.
363///
364/// Returns a BTreeSet of cluster names that exist.
365pub(super) async fn check_clusters_exist(
366    client: &Client,
367    clusters: &[String],
368) -> Result<BTreeSet<String>, ConnectionError> {
369    if clusters.is_empty() {
370        return Ok(BTreeSet::new());
371    }
372
373    let placeholders_str = sql_placeholders(clusters.len());
374
375    let query = format!(
376        r#"
377        SELECT name FROM mz_catalog.mz_clusters
378        WHERE name IN ({})
379        ORDER BY name
380    "#,
381        placeholders_str
382    );
383
384    let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
385    for name in clusters {
386        params.push(name);
387    }
388
389    let rows = client.query(&query, &params).await?;
390
391    Ok(rows.iter().map(|row| row.get("name")).collect())
392}
393
394/// Check which objects from a set exist in the production database.
395pub(super) async fn check_objects_exist(
396    client: &Client,
397    objects: &BTreeSet<ObjectId>,
398) -> Result<BTreeSet<ObjectId>, ConnectionError> {
399    if objects.is_empty() {
400        return Ok(BTreeSet::new());
401    }
402
403    let fqn_map: BTreeMap<String, &ObjectId> = objects.iter().map(|o| (o.to_string(), o)).collect();
404    let fqns: Vec<&String> = fqn_map.keys().collect();
405
406    let placeholders_str = sql_placeholders(fqns.len());
407
408    let query = format!(
409        r#"
410        SELECT d.name || '.' || s.name || '.' || mo.name as fqn
411        FROM mz_objects mo
412        JOIN mz_schemas s ON mo.schema_id = s.id
413        JOIN mz_databases d ON s.database_id = d.id
414        WHERE d.name || '.' || s.name || '.' || mo.name IN ({})
415        AND mo.type IN ('table', 'view', 'materialized-view', 'source', 'sink')
416        ORDER BY fqn
417    "#,
418        placeholders_str
419    );
420
421    let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
422    for fqn in &fqns {
423        params.push(fqn);
424    }
425
426    let rows = client.query(&query, &params).await?;
427
428    Ok(rows
429        .iter()
430        .filter_map(|row| {
431            let fqn: String = row.get("fqn");
432            fqn_map.get(&fqn).map(|id| (*id).clone())
433        })
434        .collect())
435}
436
437/// Check which objects from the given set exist in a specific catalog table.
438///
439/// Returns a BTreeSet of ObjectIds for objects that already exist.
440async fn check_catalog_objects_exist(
441    client: &Client,
442    objects: &BTreeSet<ObjectId>,
443    catalog_table: &str,
444) -> Result<BTreeSet<ObjectId>, ConnectionError> {
445    if objects.is_empty() {
446        return Ok(BTreeSet::new());
447    }
448
449    // Build a lookup map from FQN string -> ObjectId for O(1) matching
450    let fqn_map: BTreeMap<String, &ObjectId> = objects.iter().map(|o| (o.to_string(), o)).collect();
451    let fqns: Vec<&String> = fqn_map.keys().collect();
452
453    let placeholders_str = sql_placeholders(fqns.len());
454
455    let query = format!(
456        r#"
457        SELECT d.name || '.' || s.name || '.' || t.name as fqn
458        FROM {} t
459        JOIN mz_schemas s ON t.schema_id = s.id
460        JOIN mz_databases d ON s.database_id = d.id
461        WHERE d.name || '.' || s.name || '.' || t.name IN ({})
462        ORDER BY fqn
463    "#,
464        catalog_table, placeholders_str
465    );
466
467    let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
468    for fqn in &fqns {
469        params.push(*fqn);
470    }
471
472    let rows = client.query(&query, &params).await?;
473
474    let mut existing = BTreeSet::new();
475    for row in rows {
476        let fqn: String = row.get("fqn");
477        if let Some(obj_id) = fqn_map.get(&fqn) {
478            existing.insert((*obj_id).clone());
479        }
480    }
481
482    Ok(existing)
483}
484
485/// Check which tables from the given set exist in the database.
486///
487/// Returns a BTreeSet of ObjectIds for tables that already exist.
488pub(super) async fn check_tables_exist(
489    client: &Client,
490    tables: &BTreeSet<ObjectId>,
491) -> Result<BTreeSet<ObjectId>, ConnectionError> {
492    check_catalog_objects_exist(client, tables, "mz_tables").await
493}
494
495/// Check which sources from the given set exist in the database.
496///
497/// Returns a BTreeSet of ObjectIds for sources that already exist.
498pub(super) async fn check_sources_exist(
499    client: &Client,
500    sources: &BTreeSet<ObjectId>,
501) -> Result<BTreeSet<ObjectId>, ConnectionError> {
502    check_catalog_objects_exist(client, sources, "mz_sources").await
503}
504
505/// Check which secrets from the given set exist in the database.
506///
507/// Returns a BTreeSet of ObjectIds for secrets that already exist.
508pub(super) async fn check_secrets_exist(
509    client: &Client,
510    secrets: &BTreeSet<ObjectId>,
511) -> Result<BTreeSet<ObjectId>, ConnectionError> {
512    check_catalog_objects_exist(client, secrets, "mz_secrets").await
513}
514
515/// Check which connections from the given set exist in the database.
516///
517/// Returns a BTreeSet of ObjectIds for connections that already exist.
518pub(super) async fn check_connections_exist(
519    client: &Client,
520    connections: &BTreeSet<ObjectId>,
521) -> Result<BTreeSet<ObjectId>, ConnectionError> {
522    check_catalog_objects_exist(client, connections, "mz_connections").await
523}
524
525/// Check which sinks from the given set exist in the database.
526///
527/// Returns a BTreeSet of ObjectIds for sinks that already exist.
528/// Used during apply to skip creating sinks that already exist (like tables).
529pub(super) async fn check_sinks_exist(
530    client: &Client,
531    sinks: &BTreeSet<ObjectId>,
532) -> Result<BTreeSet<ObjectId>, ConnectionError> {
533    check_catalog_objects_exist(client, sinks, "mz_sinks").await
534}
535
536/// Find sinks that depend on objects in the specified schemas.
537///
538/// This is used during apply to identify sinks that need to be repointed
539/// before old schemas are dropped with CASCADE. Only returns sinks whose
540/// upstream object (FROM clause) is in one of the specified schemas.
541pub(super) async fn find_sinks_depending_on_schemas(
542    client: &Client,
543    schemas: &[SchemaQualifier],
544) -> Result<Vec<DependentSink>, ConnectionError> {
545    if schemas.is_empty() {
546        return Ok(Vec::new());
547    }
548
549    // Build WHERE clause for (database, schema) pairs
550    let mut conditions = Vec::new();
551    let mut param_idx = 1;
552
553    for _ in schemas {
554        conditions.push(format!(
555            "(dep_db.name = ${} AND dep_schema.name = ${})",
556            param_idx,
557            param_idx + 1
558        ));
559        param_idx += 2;
560    }
561
562    let where_clause = conditions.join(" OR ");
563
564    let query = format!(
565        r#"
566        SELECT
567            sink_db.name as sink_database,
568            sink_schema.name as sink_schema,
569            sinks.name as sink_name,
570            dep_db.name as dependency_database,
571            dep_schema.name as dependency_schema,
572            dep_obj.name as dependency_name,
573            dep_obj.type as dependency_type
574        FROM mz_sinks sinks
575        JOIN mz_schemas sink_schema ON sinks.schema_id = sink_schema.id
576        JOIN mz_databases sink_db ON sink_schema.database_id = sink_db.id
577        JOIN mz_internal.mz_object_dependencies deps ON sinks.id = deps.object_id
578        JOIN mz_objects dep_obj ON deps.referenced_object_id = dep_obj.id
579        JOIN mz_schemas dep_schema ON dep_obj.schema_id = dep_schema.id
580        JOIN mz_databases dep_db ON dep_schema.database_id = dep_db.id
581        WHERE ({})
582          AND dep_obj.type IN ('materialized-view', 'table', 'source')
583        ORDER BY sink_db.name, sink_schema.name, sinks.name
584        "#,
585        where_clause
586    );
587
588    // Build params vector with references to the schema tuples
589    let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
590    for sq in schemas {
591        params.push(&sq.database);
592        params.push(&sq.schema);
593    }
594
595    let rows = client.query(&query, &params).await?;
596
597    Ok(rows
598        .iter()
599        .map(|row| DependentSink {
600            sink_database: row.get("sink_database"),
601            sink_schema: row.get("sink_schema"),
602            sink_name: row.get("sink_name"),
603            dependency_database: row.get("dependency_database"),
604            dependency_schema: row.get("dependency_schema"),
605            dependency_name: row.get("dependency_name"),
606            dependency_type: row.get("dependency_type"),
607        })
608        .collect())
609}
610
611/// Check if a connection exists in the specified database and schema.
612pub(super) async fn check_connection_exists(
613    client: &Client,
614    database: &str,
615    schema: &str,
616    name: &str,
617) -> Result<bool, ConnectionError> {
618    let query = r#"
619        SELECT EXISTS(
620            SELECT 1
621            FROM mz_catalog.mz_connections c
622            JOIN mz_catalog.mz_schemas s ON c.schema_id = s.id
623            JOIN mz_catalog.mz_databases d ON s.database_id = d.id
624            WHERE d.name = $1 AND s.name = $2 AND c.name = $3
625        ) AS exists
626    "#;
627    let row = client
628        .query_one(query, &[&database, &schema, &name])
629        .await?;
630    Ok(row.get("exists"))
631}
632
633/// Check if an object (MV, table, source) exists in the specified schema.
634///
635/// Used to verify that a replacement object exists before repointing a sink.
636pub(super) async fn object_exists(
637    client: &Client,
638    database: &str,
639    schema: &str,
640    object: &str,
641) -> Result<bool, ConnectionError> {
642    let query = r#"
643        SELECT EXISTS(
644            SELECT 1 FROM mz_objects o
645            JOIN mz_schemas s ON o.schema_id = s.id
646            JOIN mz_databases d ON s.database_id = d.id
647            WHERE d.name = $1 AND s.name = $2 AND o.name = $3
648              AND o.type IN ('materialized-view', 'table', 'source')
649        ) AS exists
650    "#;
651
652    let row = client
653        .query_one(query, &[&database, &schema, &object])
654        .await?;
655
656    Ok(row.get("exists"))
657}
658
659/// Get staging schema names for a specific deployment.
660pub(super) async fn get_staging_schemas(
661    client: &Client,
662    deploy_id: &str,
663) -> Result<Vec<SchemaQualifier>, ConnectionError> {
664    let pattern = staging_suffix_like_pattern(deploy_id);
665
666    let query = r#"
667        SELECT d.name as database, s.name as schema
668        FROM mz_schemas s
669        JOIN mz_databases d ON s.database_id = d.id
670        WHERE s.name LIKE $1 ESCAPE '\'
671    "#;
672
673    let rows = client.query(query, &[&pattern]).await?;
674
675    Ok(rows
676        .iter()
677        .map(|row| {
678            let database: String = row.get("database");
679            let schema: String = row.get("schema");
680            SchemaQualifier::new(database, schema)
681        })
682        .collect())
683}
684
685/// Get staging cluster names for a specific deployment.
686pub(super) async fn get_staging_clusters(
687    client: &Client,
688    deploy_id: &str,
689) -> Result<Vec<String>, ConnectionError> {
690    let pattern = staging_suffix_like_pattern(deploy_id);
691
692    let query = r#"
693        SELECT name
694        FROM mz_clusters
695        WHERE name LIKE $1 ESCAPE '\'
696    "#;
697
698    let rows = client.query(query, &[&pattern]).await?;
699
700    Ok(rows.iter().map(|row| row.get("name")).collect())
701}
702
703/// Map a Materialize object type string to its DROP keyword.
704fn mz_type_to_drop_keyword(obj_type: &str) -> Option<&'static str> {
705    match obj_type {
706        "table" => Some("TABLE"),
707        "view" => Some("VIEW"),
708        "materialized-view" => Some("MATERIALIZED VIEW"),
709        "source" => Some("SOURCE"),
710        "sink" => Some("SINK"),
711        _ => None,
712    }
713}
714
715/// Drop all objects in a schema.
716///
717/// Returns the fully-qualified names of dropped objects.
718pub(super) async fn drop_schema_objects(
719    client: &Client,
720    database: &str,
721    schema: &str,
722) -> Result<Vec<String>, ConnectionError> {
723    let query = r#"
724        SELECT mo.name, mo.type
725        FROM mz_objects mo
726        JOIN mz_schemas s ON mo.schema_id = s.id
727        JOIN mz_databases d ON s.database_id = d.id
728        WHERE d.name = $1 AND s.name = $2
729        AND mo.type IN ('table', 'view', 'materialized-view', 'source', 'sink')
730        ORDER BY mo.id DESC
731    "#;
732
733    let rows = client.query(query, &[&database, &schema]).await?;
734
735    let mut dropped = Vec::new();
736    for row in rows {
737        let name: String = row.get("name");
738        let obj_type: String = row.get("type");
739
740        let fqn = format!(
741            "{}.{}.{}",
742            quote_identifier(database),
743            quote_identifier(schema),
744            quote_identifier(&name)
745        );
746        let Some(drop_type) = mz_type_to_drop_keyword(obj_type.as_str()) else {
747            continue;
748        };
749
750        let drop_sql = format!("DROP {} IF EXISTS {} CASCADE", drop_type, fqn);
751        client.execute(&drop_sql, &[]).await?;
752
753        dropped.push(fqn);
754    }
755
756    Ok(dropped)
757}
758
759/// Drop specific objects by their ObjectIds.
760///
761/// Returns the fully-qualified names of dropped objects.
762pub(super) async fn drop_objects(
763    client: &Client,
764    objects: &BTreeSet<ObjectId>,
765) -> Result<Vec<String>, ConnectionError> {
766    let mut dropped = Vec::new();
767
768    if objects.is_empty() {
769        return Ok(dropped);
770    }
771
772    let placeholders_str = sql_placeholders(objects.len());
773
774    let query = format!(
775        r#"
776        SELECT mo.name, s.name as schema_name, d.name as database_name, mo.type
777        FROM mz_objects mo
778        JOIN mz_schemas s ON mo.schema_id = s.id
779        JOIN mz_databases d ON s.database_id = d.id
780        WHERE d.name || '.' || s.name || '.' || mo.name IN ({})
781        AND mo.type IN ('table', 'view', 'materialized-view', 'source', 'sink')
782        ORDER BY mo.id DESC
783    "#,
784        placeholders_str
785    );
786
787    let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
788    let fqns: Vec<_> = objects.iter().map(|object| object.to_string()).collect();
789    for fqn in &fqns {
790        params.push(fqn);
791    }
792
793    let rows = client.query(&query, &params).await?;
794
795    for row in rows {
796        let name: String = row.get("name");
797        let schema: String = row.get("schema_name");
798        let database: String = row.get("database_name");
799        let obj_type: String = row.get("type");
800
801        let fqn = format!(
802            "{}.{}.{}",
803            quote_identifier(&database),
804            quote_identifier(&schema),
805            quote_identifier(&name)
806        );
807        let Some(drop_type) = mz_type_to_drop_keyword(obj_type.as_str()) else {
808            continue;
809        };
810
811        let drop_sql = format!("DROP {} IF EXISTS {} CASCADE", drop_type, fqn);
812        client.execute(&drop_sql, &[]).await?;
813
814        dropped.push(fqn);
815    }
816
817    Ok(dropped)
818}
819
820/// Drop staging schemas by name.
821pub(super) async fn drop_staging_schemas(
822    client: &Client,
823    schemas: &[SchemaQualifier],
824) -> Result<(), ConnectionError> {
825    for sq in schemas {
826        let drop_sql = format!(
827            "DROP SCHEMA IF EXISTS {}.{} CASCADE",
828            quote_identifier(&sq.database),
829            quote_identifier(&sq.schema)
830        );
831        client.execute(&drop_sql, &[]).await?;
832    }
833
834    Ok(())
835}
836
837/// Drop staging clusters by name.
838pub(super) async fn drop_staging_clusters(
839    client: &Client,
840    clusters: &[String],
841) -> Result<(), ConnectionError> {
842    for cluster in clusters {
843        let drop_sql = format!(
844            "DROP CLUSTER IF EXISTS {} CASCADE",
845            quote_identifier(cluster)
846        );
847        client.execute(&drop_sql, &[]).await?;
848    }
849
850    Ok(())
851}
852
853/// Get privilege grants on a named infrastructure object (cluster, network policy).
854///
855/// `catalog_table` is the system catalog table (e.g., `"mz_clusters"`,
856/// `"mz_network_policies"`). Returns `(grantee, privilege_type)` pairs from
857/// `mz_aclexplode`, filtering out system roles.
858async fn get_named_object_grants(
859    client: &Client,
860    catalog_table: &str,
861    name: &str,
862) -> Result<Vec<ObjectGrant>, ConnectionError> {
863    let query = format!(
864        r#"
865        -- Explode the ACL bitmap into individual (grantee, privilege_type) rows.
866        -- Each object stores privileges as a compact bitmap; mz_aclexplode unpacks it.
867        WITH privilege AS (
868            SELECT mz_internal.mz_aclexplode(privileges).*, owner_id
869            FROM {}
870            WHERE name = $1
871        )
872        SELECT
873            grantee.name AS grantee,
874            p.privilege_type
875        FROM privilege AS p
876        -- Resolve grantee role IDs to human-readable names.
877        JOIN mz_roles AS grantee ON p.grantee = grantee.id
878        -- Exclude system roles that are not user-manageable.
879        WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
880          -- Owners implicitly have all privileges; don't surface those as explicit grants.
881          AND p.grantee != p.owner_id
882        "#,
883        catalog_table
884    );
885
886    let rows = client.query(&query, &[&name]).await?;
887
888    Ok(rows
889        .iter()
890        .map(|row| ObjectGrant {
891            grantee: row.get("grantee"),
892            privilege_type: row.get("privilege_type"),
893        })
894        .collect())
895}
896
897/// Get privilege grants on a cluster by name.
898pub(super) async fn get_cluster_grants(
899    client: &Client,
900    name: &str,
901) -> Result<Vec<ObjectGrant>, ConnectionError> {
902    get_named_object_grants(client, "mz_clusters", name).await
903}
904
905/// Get privilege grants on a network policy by name.
906pub(super) async fn get_network_policy_grants(
907    client: &Client,
908    name: &str,
909) -> Result<Vec<ObjectGrant>, ConnectionError> {
910    get_named_object_grants(client, "mz_network_policies", name).await
911}
912
913/// Get privilege grants on a database object (table, source, secret, connection).
914///
915/// `catalog_table` is the system catalog table name (e.g., `"mz_tables"`, `"mz_secrets"`).
916pub(super) async fn get_database_object_grants(
917    client: &Client,
918    catalog_table: &str,
919    database: &str,
920    schema: &str,
921    name: &str,
922) -> Result<Vec<ObjectGrant>, ConnectionError> {
923    let query = format!(
924        r#"
925        -- Locate the object by its fully-qualified name (database.schema.object)
926        -- using a 3-table join chain: catalog_table -> mz_schemas -> mz_databases.
927        -- Then explode the ACL bitmap into individual privilege rows.
928        WITH privilege AS (
929            SELECT mz_internal.mz_aclexplode(t.privileges).*, t.owner_id
930            FROM {} t
931            JOIN mz_schemas s ON t.schema_id = s.id
932            JOIN mz_databases d ON s.database_id = d.id
933            WHERE d.name = $1 AND s.name = $2 AND t.name = $3
934        )
935        SELECT
936            grantee.name AS grantee,
937            p.privilege_type
938        FROM privilege AS p
939        JOIN mz_roles AS grantee ON p.grantee = grantee.id
940        WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
941          AND p.grantee != p.owner_id
942        "#,
943        catalog_table
944    );
945
946    let rows = client.query(&query, &[&database, &schema, &name]).await?;
947
948    Ok(rows
949        .iter()
950        .map(|row| ObjectGrant {
951            grantee: row.get("grantee"),
952            privilege_type: row.get("privilege_type"),
953        })
954        .collect())
955}
956
957/// Get default privilege grants for a named infrastructure object (cluster, network policy).
958///
959/// Queries `mz_default_privileges` to find grants that would be auto-applied
960/// to the given object based on its owner and any PUBLIC default privileges.
961/// These grants should be protected from revocation during reconciliation.
962async fn get_default_privilege_grants_for_named_object(
963    client: &Client,
964    catalog_table: &str,
965    name: &str,
966    object_type: &str,
967) -> Result<Vec<ObjectGrant>, ConnectionError> {
968    let query = format!(
969        r#"
970        -- Query default privileges from ALTER DEFAULT PRIVILEGES rules.
971        -- These are auto-applied grants that should be protected from revocation.
972        SELECT
973            grantee_role.name AS grantee,
974            dp_priv.privilege_type
975        FROM mz_default_privileges dp
976        -- Expand the privilege bitmap into individual privilege type strings.
977        CROSS JOIN LATERAL unnest(
978            mz_internal.mz_format_privileges(dp.privileges)
979        ) AS dp_priv(privilege_type)
980        JOIN {} obj ON obj.name = $1
981        JOIN mz_roles AS grantee_role ON dp.grantee = grantee_role.id
982        WHERE dp.object_type = $2
983          -- Match rules targeting the object's owner, or PUBLIC ('p') rules
984          -- that apply to all owners.
985          AND (dp.role_id = obj.owner_id OR dp.role_id = 'p')
986          -- Named objects (clusters, network policies) are not schema-scoped,
987          -- so only global default privileges (both NULL) apply.
988          AND dp.database_id IS NULL
989          AND dp.schema_id IS NULL
990          AND grantee_role.name NOT IN ('none', 'mz_system', 'mz_support')
991        "#,
992        catalog_table
993    );
994
995    let rows = client.query(&query, &[&name, &object_type]).await?;
996
997    Ok(rows
998        .iter()
999        .map(|row| ObjectGrant {
1000            grantee: row.get("grantee"),
1001            privilege_type: row.get("privilege_type"),
1002        })
1003        .collect())
1004}
1005
1006/// Get default privilege grants for a cluster by name.
1007pub(super) async fn get_default_privilege_grants_for_cluster(
1008    client: &Client,
1009    name: &str,
1010) -> Result<Vec<ObjectGrant>, ConnectionError> {
1011    get_default_privilege_grants_for_named_object(client, "mz_clusters", name, "cluster").await
1012}
1013
1014/// Get default privilege grants for a network policy by name.
1015pub(super) async fn get_default_privilege_grants_for_network_policy(
1016    client: &Client,
1017    name: &str,
1018) -> Result<Vec<ObjectGrant>, ConnectionError> {
1019    get_default_privilege_grants_for_named_object(client, "mz_network_policies", name, "type").await
1020}
1021
1022/// Get default privilege grants for a database object (table, source, secret, connection).
1023///
1024/// Queries `mz_default_privileges` to find grants that would be auto-applied
1025/// to the given object based on its owner, database, schema, and any PUBLIC
1026/// default privileges. These grants should be protected from revocation.
1027pub(super) async fn get_default_privilege_grants_for_database_object(
1028    client: &Client,
1029    catalog_table: &str,
1030    database: &str,
1031    schema: &str,
1032    name: &str,
1033    object_type: &str,
1034) -> Result<Vec<ObjectGrant>, ConnectionError> {
1035    let query = format!(
1036        r#"
1037        -- Query default privileges from ALTER DEFAULT PRIVILEGES rules
1038        -- for a schema-qualified database object.
1039        SELECT
1040            grantee_role.name AS grantee,
1041            dp_priv.privilege_type
1042        FROM mz_default_privileges dp
1043        -- Expand the privilege bitmap into individual privilege type strings.
1044        CROSS JOIN LATERAL unnest(
1045            mz_internal.mz_format_privileges(dp.privileges)
1046        ) AS dp_priv(privilege_type)
1047        -- Locate the object by FQN to determine its owner, database, and schema.
1048        JOIN {} obj ON obj.name = $3
1049        JOIN mz_schemas s ON obj.schema_id = s.id
1050        JOIN mz_databases d ON s.database_id = d.id
1051        JOIN mz_roles AS grantee_role ON dp.grantee = grantee_role.id
1052        WHERE d.name = $1 AND s.name = $2
1053          AND dp.object_type = $4
1054          -- Match rules targeting the object's owner, or PUBLIC ('p') rules.
1055          AND (dp.role_id = obj.owner_id OR dp.role_id = 'p')
1056          -- Match both global rules (database_id IS NULL) and rules scoped to
1057          -- this specific database. Global rules apply to all databases.
1058          AND (dp.database_id IS NULL OR dp.database_id = d.id)
1059          -- Same for schema: global or scoped to this specific schema.
1060          AND (dp.schema_id IS NULL OR dp.schema_id = s.id)
1061          AND grantee_role.name NOT IN ('none', 'mz_system', 'mz_support')
1062        "#,
1063        catalog_table
1064    );
1065
1066    let rows = client
1067        .query(&query, &[&database, &schema, &name, &object_type])
1068        .await?;
1069
1070    Ok(rows
1071        .iter()
1072        .map(|row| ObjectGrant {
1073            grantee: row.get("grantee"),
1074            privilege_type: row.get("privilege_type"),
1075        })
1076        .collect())
1077}
1078
1079/// Get the `CREATE CONNECTION` SQL for an existing connection.
1080///
1081/// Uses `SHOW CREATE CONNECTION` which returns the canonical, non-redacted SQL
1082/// including fully-qualified secret references. Returns `None` if the
1083/// connection does not exist.
1084pub(super) async fn get_connection_create_sql(
1085    client: &Client,
1086    database: &str,
1087    schema: &str,
1088    name: &str,
1089) -> Result<Option<String>, ConnectionError> {
1090    let fqn = format!(
1091        "{}.{}.{}",
1092        quote_identifier(database),
1093        quote_identifier(schema),
1094        quote_identifier(name)
1095    );
1096    let query = format!("SHOW CREATE CONNECTION {}", fqn);
1097    let rows = client.query(&query, &[]).await?;
1098    Ok(rows.first().map(|row| row.get("create_sql")))
1099}
1100
1101impl IntrospectionClient<'_> {
1102    /// Get the current Materialize user/role.
1103    pub async fn get_current_user(&self) -> Result<String, ConnectionError> {
1104        get_current_user(self.client).await
1105    }
1106
1107    /// Check which objects from a set exist in the production database.
1108    pub async fn check_objects_exist(
1109        &self,
1110        objects: &BTreeSet<ObjectId>,
1111    ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1112        check_objects_exist(self.client, objects).await
1113    }
1114
1115    /// Check which objects from a set exist in a specific catalog table.
1116    pub async fn check_catalog_objects_exist(
1117        &self,
1118        objects: &BTreeSet<ObjectId>,
1119        catalog_table: &str,
1120    ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1121        check_catalog_objects_exist(self.client, objects, catalog_table).await
1122    }
1123
1124    /// Check which tables from the given set exist in the database.
1125    pub async fn check_tables_exist(
1126        &self,
1127        tables: &BTreeSet<ObjectId>,
1128    ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1129        check_tables_exist(self.client, tables).await
1130    }
1131
1132    /// Check which sources from the given set exist in the database.
1133    pub async fn check_sources_exist(
1134        &self,
1135        sources: &BTreeSet<ObjectId>,
1136    ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1137        check_sources_exist(self.client, sources).await
1138    }
1139
1140    /// Check which secrets from the given set exist in the database.
1141    pub async fn check_secrets_exist(
1142        &self,
1143        secrets: &BTreeSet<ObjectId>,
1144    ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1145        check_secrets_exist(self.client, secrets).await
1146    }
1147
1148    /// Check which connections from the given set exist in the database.
1149    pub async fn check_connections_exist(
1150        &self,
1151        connections: &BTreeSet<ObjectId>,
1152    ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1153        check_connections_exist(self.client, connections).await
1154    }
1155
1156    /// Check which sinks from the given set exist in the database.
1157    pub async fn check_sinks_exist(
1158        &self,
1159        sinks: &BTreeSet<ObjectId>,
1160    ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1161        check_sinks_exist(self.client, sinks).await
1162    }
1163
1164    /// Check which schemas from a set of (database, schema) pairs exist.
1165    pub async fn check_schemas_exist(
1166        &self,
1167        schemas: &[(String, String)],
1168    ) -> Result<BTreeSet<(String, String)>, ConnectionError> {
1169        check_schemas_exist(self.client, schemas).await
1170    }
1171
1172    /// Check which clusters from a set of names exist.
1173    pub async fn check_clusters_exist(
1174        &self,
1175        clusters: &[String],
1176    ) -> Result<BTreeSet<String>, ConnectionError> {
1177        check_clusters_exist(self.client, clusters).await
1178    }
1179
1180    /// Find sinks that depend on objects in the specified schemas.
1181    pub async fn find_sinks_depending_on_schemas(
1182        &self,
1183        schemas: &[SchemaQualifier],
1184    ) -> Result<Vec<DependentSink>, ConnectionError> {
1185        find_sinks_depending_on_schemas(self.client, schemas).await
1186    }
1187
1188    /// Check if a connection exists in the specified database and schema.
1189    pub async fn check_connection_exists(
1190        &self,
1191        database: &str,
1192        schema: &str,
1193        name: &str,
1194    ) -> Result<bool, ConnectionError> {
1195        check_connection_exists(self.client, database, schema, name).await
1196    }
1197
1198    /// Check if an object (MV, table, source) exists in the specified schema.
1199    pub async fn object_exists(
1200        &self,
1201        database: &str,
1202        schema: &str,
1203        object: &str,
1204    ) -> Result<bool, ConnectionError> {
1205        object_exists(self.client, database, schema, object).await
1206    }
1207
1208    /// Get staging schema names for a specific deployment.
1209    pub async fn get_staging_schemas(
1210        &self,
1211        deploy_id: &str,
1212    ) -> Result<Vec<SchemaQualifier>, ConnectionError> {
1213        get_staging_schemas(self.client, deploy_id).await
1214    }
1215
1216    /// Get staging cluster names for a specific deployment.
1217    pub async fn get_staging_clusters(
1218        &self,
1219        deploy_id: &str,
1220    ) -> Result<Vec<String>, ConnectionError> {
1221        get_staging_clusters(self.client, deploy_id).await
1222    }
1223
1224    /// Drop all objects in a schema.
1225    pub async fn drop_schema_objects(
1226        &self,
1227        database: &str,
1228        schema: &str,
1229    ) -> Result<Vec<String>, ConnectionError> {
1230        drop_schema_objects(self.client, database, schema).await
1231    }
1232
1233    /// Drop specific objects by their ObjectIds.
1234    pub async fn drop_objects(
1235        &self,
1236        objects: &BTreeSet<ObjectId>,
1237    ) -> Result<Vec<String>, ConnectionError> {
1238        drop_objects(self.client, objects).await
1239    }
1240
1241    /// Drop staging schemas by name.
1242    pub async fn drop_staging_schemas(
1243        &self,
1244        schemas: &[SchemaQualifier],
1245    ) -> Result<(), ConnectionError> {
1246        drop_staging_schemas(self.client, schemas).await
1247    }
1248
1249    /// Drop staging clusters by name.
1250    pub async fn drop_staging_clusters(&self, clusters: &[String]) -> Result<(), ConnectionError> {
1251        drop_staging_clusters(self.client, clusters).await
1252    }
1253
1254    /// Check if a schema exists in the specified database.
1255    pub async fn schema_exists(
1256        &self,
1257        database: &str,
1258        schema: &str,
1259    ) -> Result<bool, ConnectionError> {
1260        schema_exists(self.client, database, schema).await
1261    }
1262
1263    /// Check if a network policy exists.
1264    pub async fn network_policy_exists(&self, name: &str) -> Result<bool, ConnectionError> {
1265        network_policy_exists(self.client, name).await
1266    }
1267
1268    /// Check if a role exists.
1269    pub async fn role_exists(&self, name: &str) -> Result<bool, ConnectionError> {
1270        role_exists(self.client, name).await
1271    }
1272
1273    /// Get the members granted to a role.
1274    pub async fn get_role_members(&self, name: &str) -> Result<Vec<String>, ConnectionError> {
1275        get_role_members(self.client, name).await
1276    }
1277
1278    /// Get session default parameter names for a role.
1279    pub async fn get_role_parameters(&self, name: &str) -> Result<Vec<String>, ConnectionError> {
1280        get_role_parameters(self.client, name).await
1281    }
1282
1283    /// Check if a cluster exists.
1284    pub async fn cluster_exists(&self, name: &str) -> Result<bool, ConnectionError> {
1285        cluster_exists(self.client, name).await
1286    }
1287
1288    /// Get a cluster by name.
1289    pub async fn get_cluster(&self, name: &str) -> Result<Option<Cluster>, ConnectionError> {
1290        get_cluster(self.client, name).await
1291    }
1292
1293    /// List all clusters.
1294    pub async fn list_clusters(&self) -> Result<Vec<Cluster>, ConnectionError> {
1295        list_clusters(self.client).await
1296    }
1297
1298    /// Get cluster configuration including replicas and grants.
1299    pub async fn get_cluster_config(
1300        &self,
1301        name: &str,
1302    ) -> Result<Option<ClusterConfig>, ConnectionError> {
1303        get_cluster_config(self.client, name).await
1304    }
1305
1306    /// Get privilege grants on a cluster by name.
1307    pub async fn get_cluster_grants(
1308        &self,
1309        name: &str,
1310    ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1311        get_cluster_grants(self.client, name).await
1312    }
1313
1314    /// Get privilege grants on a network policy by name.
1315    pub async fn get_network_policy_grants(
1316        &self,
1317        name: &str,
1318    ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1319        get_network_policy_grants(self.client, name).await
1320    }
1321
1322    /// Get privilege grants on a database object.
1323    pub async fn get_database_object_grants(
1324        &self,
1325        catalog_table: &str,
1326        database: &str,
1327        schema: &str,
1328        name: &str,
1329    ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1330        get_database_object_grants(self.client, catalog_table, database, schema, name).await
1331    }
1332
1333    /// Get the `CREATE CONNECTION` SQL for an existing connection.
1334    pub async fn get_connection_create_sql(
1335        &self,
1336        database: &str,
1337        schema: &str,
1338        name: &str,
1339    ) -> Result<Option<String>, ConnectionError> {
1340        get_connection_create_sql(self.client, database, schema, name).await
1341    }
1342
1343    /// Get default privilege grants for a cluster by name.
1344    pub async fn get_default_privilege_grants_for_cluster(
1345        &self,
1346        name: &str,
1347    ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1348        get_default_privilege_grants_for_cluster(self.client, name).await
1349    }
1350
1351    /// Get default privilege grants for a network policy by name.
1352    pub async fn get_default_privilege_grants_for_network_policy(
1353        &self,
1354        name: &str,
1355    ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1356        get_default_privilege_grants_for_network_policy(self.client, name).await
1357    }
1358
1359    /// Get default privilege grants for a database object.
1360    pub async fn get_default_privilege_grants_for_database_object(
1361        &self,
1362        catalog_table: &str,
1363        database: &str,
1364        schema: &str,
1365        name: &str,
1366        object_type: &str,
1367    ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1368        get_default_privilege_grants_for_database_object(
1369            self.client,
1370            catalog_table,
1371            database,
1372            schema,
1373            name,
1374            object_type,
1375        )
1376        .await
1377    }
1378}