1use crate::client::connection::{Client, IntrospectionClient};
18use crate::client::errors::ConnectionError;
19use crate::client::models::{Cluster, ClusterConfig, ClusterOptions, ClusterReplica, ObjectGrant};
20use crate::client::quote_identifier;
21use crate::client::sql_placeholders;
22use crate::client::staging_suffix_like_pattern;
23use crate::project::SchemaQualifier;
24use crate::project::ir::object_id::ObjectId;
25use itertools::Itertools;
26use std::collections::{BTreeMap, BTreeSet};
27use tokio_postgres::types::ToSql;
28
29#[derive(Debug, Clone, serde::Serialize)]
34pub struct DependentSink {
35 pub sink_database: String,
36 pub sink_schema: String,
37 pub sink_name: String,
38 pub dependency_database: String,
39 pub dependency_schema: String,
40 pub dependency_name: String,
41 pub dependency_type: String,
42}
43
44pub(super) async fn schema_exists(
46 client: &Client,
47 database: &str,
48 schema: &str,
49) -> Result<bool, ConnectionError> {
50 let query = r#"
51 SELECT EXISTS(
52 SELECT 1
53 FROM mz_catalog.mz_schemas s
54 JOIN mz_catalog.mz_databases d ON s.database_id = d.id
55 WHERE s.name = $1 AND d.name = $2
56 ) AS exists
57 "#;
58
59 let row = client.query_one(query, &[&schema, &database]).await?;
60
61 Ok(row.get("exists"))
62}
63
64pub(super) async fn cluster_exists(client: &Client, name: &str) -> Result<bool, ConnectionError> {
66 let query = r#"
67 SELECT EXISTS(
68 SELECT 1 FROM mz_catalog.mz_clusters WHERE name = $1
69 ) AS exists
70 "#;
71
72 let row = client.query_one(query, &[&name]).await?;
73
74 Ok(row.get("exists"))
75}
76
77pub(super) async fn get_cluster(
79 client: &Client,
80 name: &str,
81) -> Result<Option<Cluster>, ConnectionError> {
82 let query = r#"
83 SELECT
84 id,
85 name,
86 size,
87 replication_factor::bigint AS replication_factor
88 FROM mz_catalog.mz_clusters
89 WHERE name = $1
90 "#;
91
92 let rows = client.query(query, &[&name]).await?;
93
94 if rows.is_empty() {
95 return Ok(None);
96 }
97
98 let row = &rows[0];
99 Ok(Some(Cluster {
100 id: row.get("id"),
101 name: row.get("name"),
102 size: row.get("size"),
103 replication_factor: row.get("replication_factor"),
104 }))
105}
106
107pub(super) async fn list_clusters(client: &Client) -> Result<Vec<Cluster>, ConnectionError> {
109 let query = r#"
110 SELECT
111 id,
112 name,
113 size,
114 replication_factor::bigint AS replication_factor
115 FROM mz_catalog.mz_clusters
116 ORDER BY name
117 "#;
118
119 let rows = client.query(query, &[]).await?;
120
121 Ok(rows
122 .iter()
123 .map(|row| Cluster {
124 id: row.get("id"),
125 name: row.get("name"),
126 size: row.get("size"),
127 replication_factor: row.get("replication_factor"),
128 })
129 .collect())
130}
131
132pub(super) async fn get_cluster_config(
139 client: &Client,
140 name: &str,
141) -> Result<Option<ClusterConfig>, ConnectionError> {
142 let cluster_query = r#"
144 SELECT
145 c.id,
146 c.name,
147 c.managed,
148 c.size,
149 c.replication_factor::bigint AS replication_factor,
150 r.name AS replica_name,
151 r.size AS replica_size,
152 r.availability_zone
153 FROM mz_catalog.mz_clusters c
154 LEFT JOIN mz_catalog.mz_cluster_replicas r ON c.id = r.cluster_id
155 WHERE c.name = $1
156 ORDER BY r.name
157 "#;
158
159 let cluster_rows = client.query(cluster_query, &[&name]).await?;
160
161 if cluster_rows.is_empty() {
162 return Ok(None);
163 }
164
165 let first_row = &cluster_rows[0];
167 let managed: bool = first_row.get("managed");
168 let size: Option<String> = first_row.get("size");
169 let replication_factor: Option<i64> = first_row.get("replication_factor");
170
171 let grants_query = r#"
173 WITH cluster_privilege AS (
174 SELECT mz_internal.mz_aclexplode(privileges).*, owner_id
175 FROM mz_clusters
176 WHERE name = $1
177 )
178 SELECT
179 grantee.name AS grantee,
180 c.privilege_type
181 FROM cluster_privilege AS c
182 JOIN mz_roles AS grantee ON c.grantee = grantee.id
183 WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
184 AND c.grantee != c.owner_id
185 "#;
186
187 let grant_rows = client.query(grants_query, &[&name]).await?;
188
189 let grants: Vec<ObjectGrant> = grant_rows
190 .iter()
191 .map(|row| ObjectGrant {
192 grantee: row.get("grantee"),
193 privilege_type: row.get("privilege_type"),
194 })
195 .collect();
196
197 if managed {
198 let size = size.ok_or_else(|| {
200 ConnectionError::Message(format!(
201 "Managed cluster '{}' has no size (unexpected)",
202 name
203 ))
204 })?;
205
206 let replication_factor = replication_factor.unwrap_or(1).try_into().map_err(|_| {
207 ConnectionError::Message(format!("Invalid replication_factor for cluster '{}'", name))
208 })?;
209
210 Ok(Some(ClusterConfig::Managed {
211 options: ClusterOptions {
212 size,
213 replication_factor,
214 },
215 grants,
216 }))
217 } else {
218 let mut replicas = Vec::new();
220 for row in &cluster_rows {
221 let replica_name: Option<String> = row.get("replica_name");
222 if let Some(replica_name) = replica_name {
223 replicas.push(ClusterReplica {
224 name: replica_name,
225 size: row.get("replica_size"),
226 availability_zone: row.get("availability_zone"),
227 });
228 }
229 }
230
231 Ok(Some(ClusterConfig::Unmanaged { replicas, grants }))
232 }
233}
234
235pub(super) async fn network_policy_exists(
237 client: &Client,
238 name: &str,
239) -> Result<bool, ConnectionError> {
240 let query = r#"
241 SELECT EXISTS(
242 SELECT 1 FROM mz_catalog.mz_network_policies WHERE name = $1
243 ) AS exists
244 "#;
245
246 let row = client.query_one(query, &[&name]).await?;
247
248 Ok(row.get("exists"))
249}
250
251pub(super) async fn role_exists(client: &Client, name: &str) -> Result<bool, ConnectionError> {
253 let query = r#"
254 SELECT EXISTS(
255 SELECT 1 FROM mz_catalog.mz_roles WHERE name = $1
256 ) AS exists
257 "#;
258
259 let row = client.query_one(query, &[&name]).await?;
260
261 Ok(row.get("exists"))
262}
263
264pub(super) async fn get_role_members(
266 client: &Client,
267 role_name: &str,
268) -> Result<Vec<String>, ConnectionError> {
269 let query = r#"
270 SELECT m.name AS member
271 FROM mz_catalog.mz_role_members rm
272 JOIN mz_catalog.mz_roles r ON r.id = rm.role_id
273 JOIN mz_catalog.mz_roles m ON m.id = rm.member
274 WHERE r.name = $1
275 ORDER BY m.name
276 "#;
277
278 let rows = client.query(query, &[&role_name]).await?;
279
280 Ok(rows.iter().map(|row| row.get("member")).collect())
281}
282
283pub(super) async fn get_role_parameters(
285 client: &Client,
286 role_name: &str,
287) -> Result<Vec<String>, ConnectionError> {
288 let query = r#"
289 SELECT rp.parameter_name
290 FROM mz_catalog.mz_role_parameters rp
291 JOIN mz_catalog.mz_roles r ON r.id = rp.role_id
292 WHERE r.name = $1
293 ORDER BY rp.parameter_name
294 "#;
295
296 let rows = client.query(query, &[&role_name]).await?;
297
298 Ok(rows.iter().map(|row| row.get("parameter_name")).collect())
299}
300
301pub(super) async fn get_current_user(client: &Client) -> Result<String, ConnectionError> {
303 let row = client.query_one("SELECT current_user()", &[]).await?;
304
305 Ok(row.get(0))
306}
307
308pub(super) async fn check_schemas_exist(
312 client: &Client,
313 schemas: &[(String, String)],
314) -> Result<BTreeSet<(String, String)>, ConnectionError> {
315 if schemas.is_empty() {
316 return Ok(BTreeSet::new());
317 }
318
319 let fqns: Vec<String> = schemas
321 .iter()
322 .map(|(db, schema)| format!("{}.{}", db, schema))
323 .collect();
324
325 let fqn_map: BTreeMap<&str, &(String, String)> = fqns
326 .iter()
327 .zip_eq(schemas.iter())
328 .map(|(fqn, pair)| (fqn.as_str(), pair))
329 .collect();
330
331 let placeholders_str = sql_placeholders(fqns.len());
332
333 let query = format!(
334 r#"
335 SELECT d.name || '.' || s.name as fqn
336 FROM mz_catalog.mz_schemas s
337 JOIN mz_catalog.mz_databases d ON s.database_id = d.id
338 WHERE d.name || '.' || s.name IN ({})
339 ORDER BY fqn
340 "#,
341 placeholders_str
342 );
343
344 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
345 for fqn in &fqns {
346 params.push(fqn);
347 }
348
349 let rows = client.query(&query, ¶ms).await?;
350
351 let mut existing = BTreeSet::new();
352 for row in rows {
353 let fqn: String = row.get("fqn");
354 if let Some(pair) = fqn_map.get(fqn.as_str()) {
355 existing.insert((*pair).clone());
356 }
357 }
358
359 Ok(existing)
360}
361
362pub(super) async fn check_clusters_exist(
366 client: &Client,
367 clusters: &[String],
368) -> Result<BTreeSet<String>, ConnectionError> {
369 if clusters.is_empty() {
370 return Ok(BTreeSet::new());
371 }
372
373 let placeholders_str = sql_placeholders(clusters.len());
374
375 let query = format!(
376 r#"
377 SELECT name FROM mz_catalog.mz_clusters
378 WHERE name IN ({})
379 ORDER BY name
380 "#,
381 placeholders_str
382 );
383
384 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
385 for name in clusters {
386 params.push(name);
387 }
388
389 let rows = client.query(&query, ¶ms).await?;
390
391 Ok(rows.iter().map(|row| row.get("name")).collect())
392}
393
394pub(super) async fn check_objects_exist(
396 client: &Client,
397 objects: &BTreeSet<ObjectId>,
398) -> Result<BTreeSet<ObjectId>, ConnectionError> {
399 if objects.is_empty() {
400 return Ok(BTreeSet::new());
401 }
402
403 let fqn_map: BTreeMap<String, &ObjectId> = objects.iter().map(|o| (o.to_string(), o)).collect();
404 let fqns: Vec<&String> = fqn_map.keys().collect();
405
406 let placeholders_str = sql_placeholders(fqns.len());
407
408 let query = format!(
409 r#"
410 SELECT d.name || '.' || s.name || '.' || mo.name as fqn
411 FROM mz_objects mo
412 JOIN mz_schemas s ON mo.schema_id = s.id
413 JOIN mz_databases d ON s.database_id = d.id
414 WHERE d.name || '.' || s.name || '.' || mo.name IN ({})
415 AND mo.type IN ('table', 'view', 'materialized-view', 'source', 'sink')
416 ORDER BY fqn
417 "#,
418 placeholders_str
419 );
420
421 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
422 for fqn in &fqns {
423 params.push(fqn);
424 }
425
426 let rows = client.query(&query, ¶ms).await?;
427
428 Ok(rows
429 .iter()
430 .filter_map(|row| {
431 let fqn: String = row.get("fqn");
432 fqn_map.get(&fqn).map(|id| (*id).clone())
433 })
434 .collect())
435}
436
437async fn check_catalog_objects_exist(
441 client: &Client,
442 objects: &BTreeSet<ObjectId>,
443 catalog_table: &str,
444) -> Result<BTreeSet<ObjectId>, ConnectionError> {
445 if objects.is_empty() {
446 return Ok(BTreeSet::new());
447 }
448
449 let fqn_map: BTreeMap<String, &ObjectId> = objects.iter().map(|o| (o.to_string(), o)).collect();
451 let fqns: Vec<&String> = fqn_map.keys().collect();
452
453 let placeholders_str = sql_placeholders(fqns.len());
454
455 let query = format!(
456 r#"
457 SELECT d.name || '.' || s.name || '.' || t.name as fqn
458 FROM {} t
459 JOIN mz_schemas s ON t.schema_id = s.id
460 JOIN mz_databases d ON s.database_id = d.id
461 WHERE d.name || '.' || s.name || '.' || t.name IN ({})
462 ORDER BY fqn
463 "#,
464 catalog_table, placeholders_str
465 );
466
467 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
468 for fqn in &fqns {
469 params.push(*fqn);
470 }
471
472 let rows = client.query(&query, ¶ms).await?;
473
474 let mut existing = BTreeSet::new();
475 for row in rows {
476 let fqn: String = row.get("fqn");
477 if let Some(obj_id) = fqn_map.get(&fqn) {
478 existing.insert((*obj_id).clone());
479 }
480 }
481
482 Ok(existing)
483}
484
485pub(super) async fn check_tables_exist(
489 client: &Client,
490 tables: &BTreeSet<ObjectId>,
491) -> Result<BTreeSet<ObjectId>, ConnectionError> {
492 check_catalog_objects_exist(client, tables, "mz_tables").await
493}
494
495pub(super) async fn check_sources_exist(
499 client: &Client,
500 sources: &BTreeSet<ObjectId>,
501) -> Result<BTreeSet<ObjectId>, ConnectionError> {
502 check_catalog_objects_exist(client, sources, "mz_sources").await
503}
504
505pub(super) async fn check_secrets_exist(
509 client: &Client,
510 secrets: &BTreeSet<ObjectId>,
511) -> Result<BTreeSet<ObjectId>, ConnectionError> {
512 check_catalog_objects_exist(client, secrets, "mz_secrets").await
513}
514
515pub(super) async fn check_connections_exist(
519 client: &Client,
520 connections: &BTreeSet<ObjectId>,
521) -> Result<BTreeSet<ObjectId>, ConnectionError> {
522 check_catalog_objects_exist(client, connections, "mz_connections").await
523}
524
525pub(super) async fn check_sinks_exist(
530 client: &Client,
531 sinks: &BTreeSet<ObjectId>,
532) -> Result<BTreeSet<ObjectId>, ConnectionError> {
533 check_catalog_objects_exist(client, sinks, "mz_sinks").await
534}
535
536pub(super) async fn find_sinks_depending_on_schemas(
542 client: &Client,
543 schemas: &[SchemaQualifier],
544) -> Result<Vec<DependentSink>, ConnectionError> {
545 if schemas.is_empty() {
546 return Ok(Vec::new());
547 }
548
549 let mut conditions = Vec::new();
551 let mut param_idx = 1;
552
553 for _ in schemas {
554 conditions.push(format!(
555 "(dep_db.name = ${} AND dep_schema.name = ${})",
556 param_idx,
557 param_idx + 1
558 ));
559 param_idx += 2;
560 }
561
562 let where_clause = conditions.join(" OR ");
563
564 let query = format!(
565 r#"
566 SELECT
567 sink_db.name as sink_database,
568 sink_schema.name as sink_schema,
569 sinks.name as sink_name,
570 dep_db.name as dependency_database,
571 dep_schema.name as dependency_schema,
572 dep_obj.name as dependency_name,
573 dep_obj.type as dependency_type
574 FROM mz_sinks sinks
575 JOIN mz_schemas sink_schema ON sinks.schema_id = sink_schema.id
576 JOIN mz_databases sink_db ON sink_schema.database_id = sink_db.id
577 JOIN mz_internal.mz_object_dependencies deps ON sinks.id = deps.object_id
578 JOIN mz_objects dep_obj ON deps.referenced_object_id = dep_obj.id
579 JOIN mz_schemas dep_schema ON dep_obj.schema_id = dep_schema.id
580 JOIN mz_databases dep_db ON dep_schema.database_id = dep_db.id
581 WHERE ({})
582 AND dep_obj.type IN ('materialized-view', 'table', 'source')
583 ORDER BY sink_db.name, sink_schema.name, sinks.name
584 "#,
585 where_clause
586 );
587
588 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
590 for sq in schemas {
591 params.push(&sq.database);
592 params.push(&sq.schema);
593 }
594
595 let rows = client.query(&query, ¶ms).await?;
596
597 Ok(rows
598 .iter()
599 .map(|row| DependentSink {
600 sink_database: row.get("sink_database"),
601 sink_schema: row.get("sink_schema"),
602 sink_name: row.get("sink_name"),
603 dependency_database: row.get("dependency_database"),
604 dependency_schema: row.get("dependency_schema"),
605 dependency_name: row.get("dependency_name"),
606 dependency_type: row.get("dependency_type"),
607 })
608 .collect())
609}
610
611pub(super) async fn check_connection_exists(
613 client: &Client,
614 database: &str,
615 schema: &str,
616 name: &str,
617) -> Result<bool, ConnectionError> {
618 let query = r#"
619 SELECT EXISTS(
620 SELECT 1
621 FROM mz_catalog.mz_connections c
622 JOIN mz_catalog.mz_schemas s ON c.schema_id = s.id
623 JOIN mz_catalog.mz_databases d ON s.database_id = d.id
624 WHERE d.name = $1 AND s.name = $2 AND c.name = $3
625 ) AS exists
626 "#;
627 let row = client
628 .query_one(query, &[&database, &schema, &name])
629 .await?;
630 Ok(row.get("exists"))
631}
632
633pub(super) async fn object_exists(
637 client: &Client,
638 database: &str,
639 schema: &str,
640 object: &str,
641) -> Result<bool, ConnectionError> {
642 let query = r#"
643 SELECT EXISTS(
644 SELECT 1 FROM mz_objects o
645 JOIN mz_schemas s ON o.schema_id = s.id
646 JOIN mz_databases d ON s.database_id = d.id
647 WHERE d.name = $1 AND s.name = $2 AND o.name = $3
648 AND o.type IN ('materialized-view', 'table', 'source')
649 ) AS exists
650 "#;
651
652 let row = client
653 .query_one(query, &[&database, &schema, &object])
654 .await?;
655
656 Ok(row.get("exists"))
657}
658
659pub(super) async fn get_staging_schemas(
661 client: &Client,
662 deploy_id: &str,
663) -> Result<Vec<SchemaQualifier>, ConnectionError> {
664 let pattern = staging_suffix_like_pattern(deploy_id);
665
666 let query = r#"
667 SELECT d.name as database, s.name as schema
668 FROM mz_schemas s
669 JOIN mz_databases d ON s.database_id = d.id
670 WHERE s.name LIKE $1 ESCAPE '\'
671 "#;
672
673 let rows = client.query(query, &[&pattern]).await?;
674
675 Ok(rows
676 .iter()
677 .map(|row| {
678 let database: String = row.get("database");
679 let schema: String = row.get("schema");
680 SchemaQualifier::new(database, schema)
681 })
682 .collect())
683}
684
685pub(super) async fn get_staging_clusters(
687 client: &Client,
688 deploy_id: &str,
689) -> Result<Vec<String>, ConnectionError> {
690 let pattern = staging_suffix_like_pattern(deploy_id);
691
692 let query = r#"
693 SELECT name
694 FROM mz_clusters
695 WHERE name LIKE $1 ESCAPE '\'
696 "#;
697
698 let rows = client.query(query, &[&pattern]).await?;
699
700 Ok(rows.iter().map(|row| row.get("name")).collect())
701}
702
703fn mz_type_to_drop_keyword(obj_type: &str) -> Option<&'static str> {
705 match obj_type {
706 "table" => Some("TABLE"),
707 "view" => Some("VIEW"),
708 "materialized-view" => Some("MATERIALIZED VIEW"),
709 "source" => Some("SOURCE"),
710 "sink" => Some("SINK"),
711 _ => None,
712 }
713}
714
715pub(super) async fn drop_schema_objects(
719 client: &Client,
720 database: &str,
721 schema: &str,
722) -> Result<Vec<String>, ConnectionError> {
723 let query = r#"
724 SELECT mo.name, mo.type
725 FROM mz_objects mo
726 JOIN mz_schemas s ON mo.schema_id = s.id
727 JOIN mz_databases d ON s.database_id = d.id
728 WHERE d.name = $1 AND s.name = $2
729 AND mo.type IN ('table', 'view', 'materialized-view', 'source', 'sink')
730 ORDER BY mo.id DESC
731 "#;
732
733 let rows = client.query(query, &[&database, &schema]).await?;
734
735 let mut dropped = Vec::new();
736 for row in rows {
737 let name: String = row.get("name");
738 let obj_type: String = row.get("type");
739
740 let fqn = format!(
741 "{}.{}.{}",
742 quote_identifier(database),
743 quote_identifier(schema),
744 quote_identifier(&name)
745 );
746 let Some(drop_type) = mz_type_to_drop_keyword(obj_type.as_str()) else {
747 continue;
748 };
749
750 let drop_sql = format!("DROP {} IF EXISTS {} CASCADE", drop_type, fqn);
751 client.execute(&drop_sql, &[]).await?;
752
753 dropped.push(fqn);
754 }
755
756 Ok(dropped)
757}
758
759pub(super) async fn drop_objects(
763 client: &Client,
764 objects: &BTreeSet<ObjectId>,
765) -> Result<Vec<String>, ConnectionError> {
766 let mut dropped = Vec::new();
767
768 if objects.is_empty() {
769 return Ok(dropped);
770 }
771
772 let placeholders_str = sql_placeholders(objects.len());
773
774 let query = format!(
775 r#"
776 SELECT mo.name, s.name as schema_name, d.name as database_name, mo.type
777 FROM mz_objects mo
778 JOIN mz_schemas s ON mo.schema_id = s.id
779 JOIN mz_databases d ON s.database_id = d.id
780 WHERE d.name || '.' || s.name || '.' || mo.name IN ({})
781 AND mo.type IN ('table', 'view', 'materialized-view', 'source', 'sink')
782 ORDER BY mo.id DESC
783 "#,
784 placeholders_str
785 );
786
787 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
788 let fqns: Vec<_> = objects.iter().map(|object| object.to_string()).collect();
789 for fqn in &fqns {
790 params.push(fqn);
791 }
792
793 let rows = client.query(&query, ¶ms).await?;
794
795 for row in rows {
796 let name: String = row.get("name");
797 let schema: String = row.get("schema_name");
798 let database: String = row.get("database_name");
799 let obj_type: String = row.get("type");
800
801 let fqn = format!(
802 "{}.{}.{}",
803 quote_identifier(&database),
804 quote_identifier(&schema),
805 quote_identifier(&name)
806 );
807 let Some(drop_type) = mz_type_to_drop_keyword(obj_type.as_str()) else {
808 continue;
809 };
810
811 let drop_sql = format!("DROP {} IF EXISTS {} CASCADE", drop_type, fqn);
812 client.execute(&drop_sql, &[]).await?;
813
814 dropped.push(fqn);
815 }
816
817 Ok(dropped)
818}
819
820pub(super) async fn drop_staging_schemas(
822 client: &Client,
823 schemas: &[SchemaQualifier],
824) -> Result<(), ConnectionError> {
825 for sq in schemas {
826 let drop_sql = format!(
827 "DROP SCHEMA IF EXISTS {}.{} CASCADE",
828 quote_identifier(&sq.database),
829 quote_identifier(&sq.schema)
830 );
831 client.execute(&drop_sql, &[]).await?;
832 }
833
834 Ok(())
835}
836
837pub(super) async fn drop_staging_clusters(
839 client: &Client,
840 clusters: &[String],
841) -> Result<(), ConnectionError> {
842 for cluster in clusters {
843 let drop_sql = format!(
844 "DROP CLUSTER IF EXISTS {} CASCADE",
845 quote_identifier(cluster)
846 );
847 client.execute(&drop_sql, &[]).await?;
848 }
849
850 Ok(())
851}
852
853async fn get_named_object_grants(
859 client: &Client,
860 catalog_table: &str,
861 name: &str,
862) -> Result<Vec<ObjectGrant>, ConnectionError> {
863 let query = format!(
864 r#"
865 -- Explode the ACL bitmap into individual (grantee, privilege_type) rows.
866 -- Each object stores privileges as a compact bitmap; mz_aclexplode unpacks it.
867 WITH privilege AS (
868 SELECT mz_internal.mz_aclexplode(privileges).*, owner_id
869 FROM {}
870 WHERE name = $1
871 )
872 SELECT
873 grantee.name AS grantee,
874 p.privilege_type
875 FROM privilege AS p
876 -- Resolve grantee role IDs to human-readable names.
877 JOIN mz_roles AS grantee ON p.grantee = grantee.id
878 -- Exclude system roles that are not user-manageable.
879 WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
880 -- Owners implicitly have all privileges; don't surface those as explicit grants.
881 AND p.grantee != p.owner_id
882 "#,
883 catalog_table
884 );
885
886 let rows = client.query(&query, &[&name]).await?;
887
888 Ok(rows
889 .iter()
890 .map(|row| ObjectGrant {
891 grantee: row.get("grantee"),
892 privilege_type: row.get("privilege_type"),
893 })
894 .collect())
895}
896
897pub(super) async fn get_cluster_grants(
899 client: &Client,
900 name: &str,
901) -> Result<Vec<ObjectGrant>, ConnectionError> {
902 get_named_object_grants(client, "mz_clusters", name).await
903}
904
905pub(super) async fn get_network_policy_grants(
907 client: &Client,
908 name: &str,
909) -> Result<Vec<ObjectGrant>, ConnectionError> {
910 get_named_object_grants(client, "mz_network_policies", name).await
911}
912
913pub(super) async fn get_database_object_grants(
917 client: &Client,
918 catalog_table: &str,
919 database: &str,
920 schema: &str,
921 name: &str,
922) -> Result<Vec<ObjectGrant>, ConnectionError> {
923 let query = format!(
924 r#"
925 -- Locate the object by its fully-qualified name (database.schema.object)
926 -- using a 3-table join chain: catalog_table -> mz_schemas -> mz_databases.
927 -- Then explode the ACL bitmap into individual privilege rows.
928 WITH privilege AS (
929 SELECT mz_internal.mz_aclexplode(t.privileges).*, t.owner_id
930 FROM {} t
931 JOIN mz_schemas s ON t.schema_id = s.id
932 JOIN mz_databases d ON s.database_id = d.id
933 WHERE d.name = $1 AND s.name = $2 AND t.name = $3
934 )
935 SELECT
936 grantee.name AS grantee,
937 p.privilege_type
938 FROM privilege AS p
939 JOIN mz_roles AS grantee ON p.grantee = grantee.id
940 WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
941 AND p.grantee != p.owner_id
942 "#,
943 catalog_table
944 );
945
946 let rows = client.query(&query, &[&database, &schema, &name]).await?;
947
948 Ok(rows
949 .iter()
950 .map(|row| ObjectGrant {
951 grantee: row.get("grantee"),
952 privilege_type: row.get("privilege_type"),
953 })
954 .collect())
955}
956
957async fn get_default_privilege_grants_for_named_object(
963 client: &Client,
964 catalog_table: &str,
965 name: &str,
966 object_type: &str,
967) -> Result<Vec<ObjectGrant>, ConnectionError> {
968 let query = format!(
969 r#"
970 -- Query default privileges from ALTER DEFAULT PRIVILEGES rules.
971 -- These are auto-applied grants that should be protected from revocation.
972 SELECT
973 grantee_role.name AS grantee,
974 dp_priv.privilege_type
975 FROM mz_default_privileges dp
976 -- Expand the privilege bitmap into individual privilege type strings.
977 CROSS JOIN LATERAL unnest(
978 mz_internal.mz_format_privileges(dp.privileges)
979 ) AS dp_priv(privilege_type)
980 JOIN {} obj ON obj.name = $1
981 JOIN mz_roles AS grantee_role ON dp.grantee = grantee_role.id
982 WHERE dp.object_type = $2
983 -- Match rules targeting the object's owner, or PUBLIC ('p') rules
984 -- that apply to all owners.
985 AND (dp.role_id = obj.owner_id OR dp.role_id = 'p')
986 -- Named objects (clusters, network policies) are not schema-scoped,
987 -- so only global default privileges (both NULL) apply.
988 AND dp.database_id IS NULL
989 AND dp.schema_id IS NULL
990 AND grantee_role.name NOT IN ('none', 'mz_system', 'mz_support')
991 "#,
992 catalog_table
993 );
994
995 let rows = client.query(&query, &[&name, &object_type]).await?;
996
997 Ok(rows
998 .iter()
999 .map(|row| ObjectGrant {
1000 grantee: row.get("grantee"),
1001 privilege_type: row.get("privilege_type"),
1002 })
1003 .collect())
1004}
1005
1006pub(super) async fn get_default_privilege_grants_for_cluster(
1008 client: &Client,
1009 name: &str,
1010) -> Result<Vec<ObjectGrant>, ConnectionError> {
1011 get_default_privilege_grants_for_named_object(client, "mz_clusters", name, "cluster").await
1012}
1013
1014pub(super) async fn get_default_privilege_grants_for_network_policy(
1016 client: &Client,
1017 name: &str,
1018) -> Result<Vec<ObjectGrant>, ConnectionError> {
1019 get_default_privilege_grants_for_named_object(client, "mz_network_policies", name, "type").await
1020}
1021
1022pub(super) async fn get_default_privilege_grants_for_database_object(
1028 client: &Client,
1029 catalog_table: &str,
1030 database: &str,
1031 schema: &str,
1032 name: &str,
1033 object_type: &str,
1034) -> Result<Vec<ObjectGrant>, ConnectionError> {
1035 let query = format!(
1036 r#"
1037 -- Query default privileges from ALTER DEFAULT PRIVILEGES rules
1038 -- for a schema-qualified database object.
1039 SELECT
1040 grantee_role.name AS grantee,
1041 dp_priv.privilege_type
1042 FROM mz_default_privileges dp
1043 -- Expand the privilege bitmap into individual privilege type strings.
1044 CROSS JOIN LATERAL unnest(
1045 mz_internal.mz_format_privileges(dp.privileges)
1046 ) AS dp_priv(privilege_type)
1047 -- Locate the object by FQN to determine its owner, database, and schema.
1048 JOIN {} obj ON obj.name = $3
1049 JOIN mz_schemas s ON obj.schema_id = s.id
1050 JOIN mz_databases d ON s.database_id = d.id
1051 JOIN mz_roles AS grantee_role ON dp.grantee = grantee_role.id
1052 WHERE d.name = $1 AND s.name = $2
1053 AND dp.object_type = $4
1054 -- Match rules targeting the object's owner, or PUBLIC ('p') rules.
1055 AND (dp.role_id = obj.owner_id OR dp.role_id = 'p')
1056 -- Match both global rules (database_id IS NULL) and rules scoped to
1057 -- this specific database. Global rules apply to all databases.
1058 AND (dp.database_id IS NULL OR dp.database_id = d.id)
1059 -- Same for schema: global or scoped to this specific schema.
1060 AND (dp.schema_id IS NULL OR dp.schema_id = s.id)
1061 AND grantee_role.name NOT IN ('none', 'mz_system', 'mz_support')
1062 "#,
1063 catalog_table
1064 );
1065
1066 let rows = client
1067 .query(&query, &[&database, &schema, &name, &object_type])
1068 .await?;
1069
1070 Ok(rows
1071 .iter()
1072 .map(|row| ObjectGrant {
1073 grantee: row.get("grantee"),
1074 privilege_type: row.get("privilege_type"),
1075 })
1076 .collect())
1077}
1078
1079pub(super) async fn get_connection_create_sql(
1085 client: &Client,
1086 database: &str,
1087 schema: &str,
1088 name: &str,
1089) -> Result<Option<String>, ConnectionError> {
1090 let fqn = format!(
1091 "{}.{}.{}",
1092 quote_identifier(database),
1093 quote_identifier(schema),
1094 quote_identifier(name)
1095 );
1096 let query = format!("SHOW CREATE CONNECTION {}", fqn);
1097 let rows = client.query(&query, &[]).await?;
1098 Ok(rows.first().map(|row| row.get("create_sql")))
1099}
1100
1101impl IntrospectionClient<'_> {
1102 pub async fn get_current_user(&self) -> Result<String, ConnectionError> {
1104 get_current_user(self.client).await
1105 }
1106
1107 pub async fn check_objects_exist(
1109 &self,
1110 objects: &BTreeSet<ObjectId>,
1111 ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1112 check_objects_exist(self.client, objects).await
1113 }
1114
1115 pub async fn check_catalog_objects_exist(
1117 &self,
1118 objects: &BTreeSet<ObjectId>,
1119 catalog_table: &str,
1120 ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1121 check_catalog_objects_exist(self.client, objects, catalog_table).await
1122 }
1123
1124 pub async fn check_tables_exist(
1126 &self,
1127 tables: &BTreeSet<ObjectId>,
1128 ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1129 check_tables_exist(self.client, tables).await
1130 }
1131
1132 pub async fn check_sources_exist(
1134 &self,
1135 sources: &BTreeSet<ObjectId>,
1136 ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1137 check_sources_exist(self.client, sources).await
1138 }
1139
1140 pub async fn check_secrets_exist(
1142 &self,
1143 secrets: &BTreeSet<ObjectId>,
1144 ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1145 check_secrets_exist(self.client, secrets).await
1146 }
1147
1148 pub async fn check_connections_exist(
1150 &self,
1151 connections: &BTreeSet<ObjectId>,
1152 ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1153 check_connections_exist(self.client, connections).await
1154 }
1155
1156 pub async fn check_sinks_exist(
1158 &self,
1159 sinks: &BTreeSet<ObjectId>,
1160 ) -> Result<BTreeSet<ObjectId>, ConnectionError> {
1161 check_sinks_exist(self.client, sinks).await
1162 }
1163
1164 pub async fn check_schemas_exist(
1166 &self,
1167 schemas: &[(String, String)],
1168 ) -> Result<BTreeSet<(String, String)>, ConnectionError> {
1169 check_schemas_exist(self.client, schemas).await
1170 }
1171
1172 pub async fn check_clusters_exist(
1174 &self,
1175 clusters: &[String],
1176 ) -> Result<BTreeSet<String>, ConnectionError> {
1177 check_clusters_exist(self.client, clusters).await
1178 }
1179
1180 pub async fn find_sinks_depending_on_schemas(
1182 &self,
1183 schemas: &[SchemaQualifier],
1184 ) -> Result<Vec<DependentSink>, ConnectionError> {
1185 find_sinks_depending_on_schemas(self.client, schemas).await
1186 }
1187
1188 pub async fn check_connection_exists(
1190 &self,
1191 database: &str,
1192 schema: &str,
1193 name: &str,
1194 ) -> Result<bool, ConnectionError> {
1195 check_connection_exists(self.client, database, schema, name).await
1196 }
1197
1198 pub async fn object_exists(
1200 &self,
1201 database: &str,
1202 schema: &str,
1203 object: &str,
1204 ) -> Result<bool, ConnectionError> {
1205 object_exists(self.client, database, schema, object).await
1206 }
1207
1208 pub async fn get_staging_schemas(
1210 &self,
1211 deploy_id: &str,
1212 ) -> Result<Vec<SchemaQualifier>, ConnectionError> {
1213 get_staging_schemas(self.client, deploy_id).await
1214 }
1215
1216 pub async fn get_staging_clusters(
1218 &self,
1219 deploy_id: &str,
1220 ) -> Result<Vec<String>, ConnectionError> {
1221 get_staging_clusters(self.client, deploy_id).await
1222 }
1223
1224 pub async fn drop_schema_objects(
1226 &self,
1227 database: &str,
1228 schema: &str,
1229 ) -> Result<Vec<String>, ConnectionError> {
1230 drop_schema_objects(self.client, database, schema).await
1231 }
1232
1233 pub async fn drop_objects(
1235 &self,
1236 objects: &BTreeSet<ObjectId>,
1237 ) -> Result<Vec<String>, ConnectionError> {
1238 drop_objects(self.client, objects).await
1239 }
1240
1241 pub async fn drop_staging_schemas(
1243 &self,
1244 schemas: &[SchemaQualifier],
1245 ) -> Result<(), ConnectionError> {
1246 drop_staging_schemas(self.client, schemas).await
1247 }
1248
1249 pub async fn drop_staging_clusters(&self, clusters: &[String]) -> Result<(), ConnectionError> {
1251 drop_staging_clusters(self.client, clusters).await
1252 }
1253
1254 pub async fn schema_exists(
1256 &self,
1257 database: &str,
1258 schema: &str,
1259 ) -> Result<bool, ConnectionError> {
1260 schema_exists(self.client, database, schema).await
1261 }
1262
1263 pub async fn network_policy_exists(&self, name: &str) -> Result<bool, ConnectionError> {
1265 network_policy_exists(self.client, name).await
1266 }
1267
1268 pub async fn role_exists(&self, name: &str) -> Result<bool, ConnectionError> {
1270 role_exists(self.client, name).await
1271 }
1272
1273 pub async fn get_role_members(&self, name: &str) -> Result<Vec<String>, ConnectionError> {
1275 get_role_members(self.client, name).await
1276 }
1277
1278 pub async fn get_role_parameters(&self, name: &str) -> Result<Vec<String>, ConnectionError> {
1280 get_role_parameters(self.client, name).await
1281 }
1282
1283 pub async fn cluster_exists(&self, name: &str) -> Result<bool, ConnectionError> {
1285 cluster_exists(self.client, name).await
1286 }
1287
1288 pub async fn get_cluster(&self, name: &str) -> Result<Option<Cluster>, ConnectionError> {
1290 get_cluster(self.client, name).await
1291 }
1292
1293 pub async fn list_clusters(&self) -> Result<Vec<Cluster>, ConnectionError> {
1295 list_clusters(self.client).await
1296 }
1297
1298 pub async fn get_cluster_config(
1300 &self,
1301 name: &str,
1302 ) -> Result<Option<ClusterConfig>, ConnectionError> {
1303 get_cluster_config(self.client, name).await
1304 }
1305
1306 pub async fn get_cluster_grants(
1308 &self,
1309 name: &str,
1310 ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1311 get_cluster_grants(self.client, name).await
1312 }
1313
1314 pub async fn get_network_policy_grants(
1316 &self,
1317 name: &str,
1318 ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1319 get_network_policy_grants(self.client, name).await
1320 }
1321
1322 pub async fn get_database_object_grants(
1324 &self,
1325 catalog_table: &str,
1326 database: &str,
1327 schema: &str,
1328 name: &str,
1329 ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1330 get_database_object_grants(self.client, catalog_table, database, schema, name).await
1331 }
1332
1333 pub async fn get_connection_create_sql(
1335 &self,
1336 database: &str,
1337 schema: &str,
1338 name: &str,
1339 ) -> Result<Option<String>, ConnectionError> {
1340 get_connection_create_sql(self.client, database, schema, name).await
1341 }
1342
1343 pub async fn get_default_privilege_grants_for_cluster(
1345 &self,
1346 name: &str,
1347 ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1348 get_default_privilege_grants_for_cluster(self.client, name).await
1349 }
1350
1351 pub async fn get_default_privilege_grants_for_network_policy(
1353 &self,
1354 name: &str,
1355 ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1356 get_default_privilege_grants_for_network_policy(self.client, name).await
1357 }
1358
1359 pub async fn get_default_privilege_grants_for_database_object(
1361 &self,
1362 catalog_table: &str,
1363 database: &str,
1364 schema: &str,
1365 name: &str,
1366 object_type: &str,
1367 ) -> Result<Vec<ObjectGrant>, ConnectionError> {
1368 get_default_privilege_grants_for_database_object(
1369 self.client,
1370 catalog_table,
1371 database,
1372 schema,
1373 name,
1374 object_type,
1375 )
1376 .await
1377 }
1378}