Skip to main content

mz_deploy/client/
validation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Database validation operations.
11//!
12//! Validates that the target Materialize environment satisfies all prerequisites
13//! for deploying the project. Runs before any DDL is executed.
14//!
15//! ## Validation Checklist
16//!
17//! | Check | Function | What it verifies |
18//! |-------|----------|-----------------|
19//! | External databases exist | `find_missing_databases` | Databases referenced by external dependencies are present |
20//! | External schemas exist | `find_missing_schemas` | Schemas referenced by external dependencies are present |
21//! | Clusters exist | `find_missing_clusters` | All clusters in `project.cluster_dependencies` are present |
22//! | External objects exist | `find_missing_external_dependencies` | Objects outside the project that are referenced exist in the catalog |
23//! | Cluster isolation | `validate_cluster_isolation_impl` | Sources/sinks don't share clusters with MVs/indexes (prevents accidental recreation during swap) |
24//! | Privileges | `validate_privileges_impl` | Current role has USAGE on databases and CREATECLUSTER system privilege |
25//! | Sources exist | `validate_sources_exist_impl` | Sources referenced by `CREATE TABLE FROM SOURCE` exist |
26//! | Sink connections exist | `validate_sink_connections_exist_impl` | Connections referenced by sinks exist |
27//! | Schema ownership | `validate_schema_ownership_impl` | Current role owns all production schemas that will be swapped |
28//! | Cluster ownership | `validate_cluster_ownership_impl` | Current role owns all production clusters that will be swapped |
29//! | Table dependencies | `validate_table_dependencies_impl` | Tables depended on by objects being deployed exist |
30//!
31//! ## Batching Strategy
32//!
33//! Catalog lookups use `IN` clause queries batched in chunks of
34//! `LOOKUP_BATCH_SIZE` (1000) to avoid exceeding query parameter limits
35//! while minimizing round trips.
36
37use crate::client::connection::{Client, ValidationClient};
38use crate::client::errors::DatabaseValidationError;
39use crate::client::sql_placeholders;
40use crate::project::SchemaQualifier;
41use crate::project::ast::Statement;
42use crate::project::ir::graph;
43use crate::project::ir::object_id::ObjectId;
44use mz_sql_parser::ast::CreateSinkConnection;
45use std::collections::{BTreeMap, BTreeSet};
46use std::path::Path;
47use std::path::PathBuf;
48use tokio_postgres::types::ToSql;
49
50const LOOKUP_BATCH_SIZE: usize = 1000;
51
52enum CatalogLookup {
53    Objects,
54    Sources,
55    Tables,
56    Connections,
57}
58
59impl CatalogLookup {
60    fn table_name(&self) -> &'static str {
61        match self {
62            CatalogLookup::Objects => "mz_objects",
63            CatalogLookup::Sources => "mz_sources",
64            CatalogLookup::Tables => "mz_tables",
65            CatalogLookup::Connections => "mz_connections",
66        }
67    }
68}
69
70/// Internal helper to query which sources exist on the given clusters using IN clause.
71pub(crate) async fn query_sources_by_cluster(
72    client: &Client,
73    cluster_names: &BTreeSet<String>,
74) -> Result<BTreeMap<String, Vec<String>>, DatabaseValidationError> {
75    if cluster_names.is_empty() {
76        return Ok(BTreeMap::new());
77    }
78
79    let in_clause = sql_placeholders(cluster_names.len());
80
81    let query = format!(
82        r#"
83        SELECT
84            c.name as cluster_name,
85            d.name || '.' || s.name || '.' || mo.name as fqn
86        FROM mz_catalog.mz_sources src
87        JOIN mz_catalog.mz_objects mo ON src.id = mo.id
88        JOIN mz_catalog.mz_schemas s ON mo.schema_id = s.id
89        JOIN mz_catalog.mz_databases d ON s.database_id = d.id
90        JOIN mz_catalog.mz_clusters c ON src.cluster_id = c.id
91        WHERE mo.id LIKE 'u%' AND c.name IN ({})
92        "#,
93        in_clause
94    );
95
96    #[allow(clippy::as_conversions)]
97    let params: Vec<&(dyn ToSql + Sync)> = cluster_names
98        .iter()
99        .map(|s| s as &(dyn ToSql + Sync))
100        .collect();
101
102    let rows = client
103        .query(&query, &params)
104        .await
105        .map_err(DatabaseValidationError::QueryError)?;
106
107    let mut result: BTreeMap<String, Vec<String>> = BTreeMap::new();
108    for row in rows {
109        let cluster_name: String = row.get("cluster_name");
110        let fqn: String = row.get("fqn");
111        result
112            .entry(cluster_name)
113            .or_insert_with(Vec::new)
114            .push(fqn);
115    }
116
117    Ok(result)
118}
119
120async fn query_existing_names(
121    client: &Client,
122    table_name: &str,
123    column_name: &str,
124    names: &BTreeSet<String>,
125) -> Result<BTreeSet<String>, DatabaseValidationError> {
126    let mut existing = BTreeSet::new();
127    if names.is_empty() {
128        return Ok(existing);
129    }
130
131    let name_list: Vec<String> = names.iter().cloned().collect();
132    for chunk in name_list.chunks(LOOKUP_BATCH_SIZE) {
133        let placeholders = sql_placeholders(chunk.len());
134        let query = format!(
135            "SELECT {column} FROM {table} WHERE {column} IN ({placeholders})",
136            column = column_name,
137            table = table_name,
138            placeholders = placeholders
139        );
140
141        #[allow(clippy::as_conversions)]
142        let params: Vec<&(dyn ToSql + Sync)> = chunk
143            .iter()
144            .map(|name| name as &(dyn ToSql + Sync))
145            .collect();
146
147        let rows = client
148            .query(&query, &params)
149            .await
150            .map_err(DatabaseValidationError::QueryError)?;
151        for row in rows {
152            let name: String = row.get(column_name);
153            existing.insert(name);
154        }
155    }
156
157    Ok(existing)
158}
159
160async fn query_existing_schema_pairs(
161    client: &Client,
162    schema_pairs: &BTreeSet<(String, String)>,
163) -> Result<BTreeSet<(String, String)>, DatabaseValidationError> {
164    let mut existing = BTreeSet::new();
165    if schema_pairs.is_empty() {
166        return Ok(existing);
167    }
168
169    let fqn_to_pair: BTreeMap<String, (String, String)> = schema_pairs
170        .iter()
171        .map(|(database, schema)| {
172            (
173                format!("{}.{}", database, schema),
174                (database.clone(), schema.clone()),
175            )
176        })
177        .collect();
178    let fqns: Vec<String> = fqn_to_pair.keys().cloned().collect();
179
180    for chunk in fqns.chunks(LOOKUP_BATCH_SIZE) {
181        let placeholders = sql_placeholders(chunk.len());
182        let query = format!(
183            r#"
184            SELECT d.name || '.' || s.name AS fqn
185            FROM mz_schemas s
186            JOIN mz_databases d ON s.database_id = d.id
187            WHERE d.name || '.' || s.name IN ({})
188            "#,
189            placeholders
190        );
191
192        #[allow(clippy::as_conversions)]
193        let params: Vec<&(dyn ToSql + Sync)> =
194            chunk.iter().map(|fqn| fqn as &(dyn ToSql + Sync)).collect();
195
196        let rows = client
197            .query(&query, &params)
198            .await
199            .map_err(DatabaseValidationError::QueryError)?;
200        for row in rows {
201            let fqn: String = row.get("fqn");
202            if let Some(pair) = fqn_to_pair.get(&fqn) {
203                existing.insert(pair.clone());
204            }
205        }
206    }
207
208    Ok(existing)
209}
210
211async fn query_existing_object_ids(
212    client: &Client,
213    object_ids: &BTreeSet<ObjectId>,
214    lookup: CatalogLookup,
215) -> Result<BTreeSet<ObjectId>, DatabaseValidationError> {
216    let mut existing = BTreeSet::new();
217    if object_ids.is_empty() {
218        return Ok(existing);
219    }
220
221    let fqn_to_object: BTreeMap<String, ObjectId> = object_ids
222        .iter()
223        .map(|obj| (obj.to_string(), obj.clone()))
224        .collect();
225    let fqns: Vec<String> = fqn_to_object.keys().cloned().collect();
226    let table_name = lookup.table_name();
227
228    for chunk in fqns.chunks(LOOKUP_BATCH_SIZE) {
229        let placeholders = sql_placeholders(chunk.len());
230        let query = format!(
231            r#"
232            SELECT d.name || '.' || s.name || '.' || t.name AS fqn
233            FROM {table_name} t
234            JOIN mz_schemas s ON t.schema_id = s.id
235            JOIN mz_databases d ON s.database_id = d.id
236            WHERE d.name || '.' || s.name || '.' || t.name IN ({placeholders})
237            "#,
238            table_name = table_name,
239            placeholders = placeholders
240        );
241
242        #[allow(clippy::as_conversions)]
243        let params: Vec<&(dyn ToSql + Sync)> =
244            chunk.iter().map(|fqn| fqn as &(dyn ToSql + Sync)).collect();
245
246        let rows = client
247            .query(&query, &params)
248            .await
249            .map_err(DatabaseValidationError::QueryError)?;
250        for row in rows {
251            let fqn: String = row.get("fqn");
252            if let Some(obj) = fqn_to_object.get(&fqn) {
253                existing.insert(obj.clone());
254            }
255        }
256    }
257
258    Ok(existing)
259}
260
261/// Internal implementation of validate_project.
262pub(crate) async fn validate_project_impl(
263    client: &Client,
264    planned_project: &graph::Project,
265    project_root: &Path,
266) -> Result<(), DatabaseValidationError> {
267    let (external_databases, external_schemas) = collect_external_dependencies(planned_project);
268    let missing_databases = find_missing_databases(client, &external_databases).await?;
269    let missing_schemas = find_missing_schemas(client, &external_schemas).await?;
270    let missing_clusters = find_missing_clusters(client, planned_project).await?;
271    let object_paths = build_object_paths(planned_project, project_root);
272    let missing_external_deps = find_missing_external_dependencies(client, planned_project).await?;
273    let compilation_errors =
274        build_compilation_errors(planned_project, &object_paths, &missing_external_deps);
275
276    if !missing_databases.is_empty()
277        || !missing_schemas.is_empty()
278        || !missing_clusters.is_empty()
279        || !compilation_errors.is_empty()
280    {
281        Err(DatabaseValidationError::Multiple {
282            databases: missing_databases,
283            schemas: missing_schemas,
284            clusters: missing_clusters,
285            compilation_errors,
286        })
287    } else {
288        Ok(())
289    }
290}
291
292/// Derives the set of external database/schema prerequisites from project dependencies.
293///
294/// Project-owned databases are excluded because deployment can create them if needed.
295fn collect_external_dependencies(
296    planned_project: &graph::Project,
297) -> (BTreeSet<String>, BTreeSet<(String, String)>) {
298    let project_databases: BTreeSet<_> = planned_project
299        .databases
300        .iter()
301        .map(|db| db.name.clone())
302        .collect();
303
304    let mut external_databases = BTreeSet::new();
305    let mut external_schemas = BTreeSet::new();
306    for ext_dep in &planned_project.external_dependencies {
307        // System-schema deps have no database, so there's nothing to require.
308        let Some(db) = ext_dep.database() else {
309            continue;
310        };
311        if !project_databases.contains(db) {
312            external_databases.insert(db.to_string());
313        }
314        external_schemas.insert((db.to_string(), ext_dep.schema().to_string()));
315    }
316    (external_databases, external_schemas)
317}
318
319/// Checks catalog state for external databases that must pre-exist.
320async fn find_missing_databases(
321    client: &Client,
322    external_databases: &BTreeSet<String>,
323) -> Result<Vec<String>, DatabaseValidationError> {
324    let existing = query_existing_names(client, "mz_databases", "name", external_databases).await?;
325    Ok(external_databases.difference(&existing).cloned().collect())
326}
327
328/// Checks catalog state for external schemas that must pre-exist.
329async fn find_missing_schemas(
330    client: &Client,
331    external_schemas: &BTreeSet<(String, String)>,
332) -> Result<Vec<SchemaQualifier>, DatabaseValidationError> {
333    let existing = query_existing_schema_pairs(client, external_schemas).await?;
334    Ok(external_schemas
335        .difference(&existing)
336        .map(|(db, schema)| SchemaQualifier::new(db.clone(), schema.clone()))
337        .collect())
338}
339
340/// Checks whether all cluster dependencies referenced by the project are present.
341async fn find_missing_clusters(
342    client: &Client,
343    planned_project: &graph::Project,
344) -> Result<Vec<String>, DatabaseValidationError> {
345    let required: BTreeSet<String> = planned_project
346        .cluster_dependencies
347        .iter()
348        .map(|cluster| cluster.name.clone())
349        .collect();
350    let existing = query_existing_names(client, "mz_clusters", "name", &required).await?;
351    Ok(required.difference(&existing).cloned().collect())
352}
353
354/// Reconstructs source file paths for planned objects under `models/`.
355///
356/// These paths are used to attach dependency errors to concrete files for users.
357fn build_object_paths(
358    planned_project: &graph::Project,
359    project_root: &Path,
360) -> BTreeMap<ObjectId, PathBuf> {
361    let mut object_paths = BTreeMap::new();
362    for db in &planned_project.databases {
363        for schema in &db.schemas {
364            for obj in &schema.objects {
365                let file_path = project_root
366                    .join("models")
367                    .join(obj.id.expect_database())
368                    .join(obj.id.schema())
369                    .join(format!("{}.sql", obj.id.object()));
370                object_paths.insert(obj.id.clone(), file_path);
371            }
372        }
373    }
374    object_paths
375}
376
377/// Checks whether externally-referenced objects actually exist in the target catalog.
378async fn find_missing_external_dependencies(
379    client: &Client,
380    planned_project: &graph::Project,
381) -> Result<BTreeSet<ObjectId>, DatabaseValidationError> {
382    let external_deps: BTreeSet<ObjectId> = planned_project
383        .external_dependencies
384        .iter()
385        .cloned()
386        .collect();
387    let existing =
388        query_existing_object_ids(client, &external_deps, CatalogLookup::Objects).await?;
389    Ok(external_deps.difference(&existing).cloned().collect())
390}
391
392/// Converts missing external dependencies into user-facing, file-scoped errors.
393///
394/// Grouping by file/object keeps output aligned with how users navigate project SQL.
395fn build_compilation_errors(
396    planned_project: &graph::Project,
397    object_paths: &BTreeMap<ObjectId, PathBuf>,
398    missing_external_deps: &BTreeSet<ObjectId>,
399) -> Vec<DatabaseValidationError> {
400    let mut errors = Vec::new();
401    for db in &planned_project.databases {
402        for schema in &db.schemas {
403            for obj in &schema.objects {
404                let missing_for_object: Vec<_> = obj
405                    .dependencies
406                    .iter()
407                    .filter(|dep| missing_external_deps.contains(*dep))
408                    .cloned()
409                    .collect();
410                if missing_for_object.is_empty() {
411                    continue;
412                }
413                if let Some(file_path) = object_paths.get(&obj.id) {
414                    errors.push(DatabaseValidationError::CompilationFailed {
415                        file_path: file_path.clone(),
416                        object_name: obj.id.clone(),
417                        missing_dependencies: missing_for_object,
418                    });
419                }
420            }
421        }
422    }
423    errors
424}
425
426impl ValidationClient<'_> {
427    /// Validate that all required databases, schemas, and external dependencies exist.
428    pub async fn validate_project(
429        &self,
430        planned_project: &graph::Project,
431        project_root: &Path,
432    ) -> Result<(), DatabaseValidationError> {
433        validate_project_impl(self.client, planned_project, project_root).await
434    }
435
436    /// Validate that sources and sinks don't share clusters with indexes or materialized views.
437    pub async fn validate_cluster_isolation(
438        &self,
439        planned_project: &graph::Project,
440    ) -> Result<(), DatabaseValidationError> {
441        validate_cluster_isolation_impl(self.client, planned_project).await
442    }
443
444    /// Validate that the user has sufficient privileges to deploy the project.
445    pub async fn validate_privileges(
446        &self,
447        planned_project: &graph::Project,
448    ) -> Result<(), DatabaseValidationError> {
449        validate_privileges_impl(self.client, planned_project).await
450    }
451
452    /// Validate that all sources referenced by CREATE TABLE FROM SOURCE statements exist.
453    pub async fn validate_sources_exist(
454        &self,
455        planned_project: &graph::Project,
456    ) -> Result<(), DatabaseValidationError> {
457        validate_sources_exist_impl(self.client, planned_project).await
458    }
459
460    /// Validate that all connections referenced by CREATE SINK statements exist.
461    pub async fn validate_sink_connections_exist(
462        &self,
463        planned_project: &graph::Project,
464    ) -> Result<(), DatabaseValidationError> {
465        validate_sink_connections_exist_impl(self.client, planned_project).await
466    }
467
468    /// Validate that the current role owns all production schemas that will be swapped.
469    pub async fn validate_schema_ownership(
470        &self,
471        schema_set: &BTreeSet<SchemaQualifier>,
472    ) -> Result<(), DatabaseValidationError> {
473        validate_schema_ownership_impl(self.client, schema_set).await
474    }
475
476    /// Validate that the current role owns all production clusters that will be swapped.
477    pub async fn validate_cluster_ownership(
478        &self,
479        cluster_set: &BTreeSet<String>,
480    ) -> Result<(), DatabaseValidationError> {
481        validate_cluster_ownership_impl(self.client, cluster_set).await
482    }
483
484    /// Validate that all tables referenced by objects to be deployed exist in the database.
485    pub async fn validate_table_dependencies(
486        &self,
487        planned_project: &graph::Project,
488        objects_to_deploy: &BTreeSet<ObjectId>,
489    ) -> Result<(), DatabaseValidationError> {
490        validate_table_dependencies_impl(self.client, planned_project, objects_to_deploy).await
491    }
492}
493
494/// Internal implementation of validate_schema_ownership.
495pub(crate) async fn validate_schema_ownership_impl(
496    client: &Client,
497    schema_set: &BTreeSet<SchemaQualifier>,
498) -> Result<(), DatabaseValidationError> {
499    if schema_set.is_empty() {
500        return Ok(());
501    }
502
503    let fqn_to_schema: BTreeMap<String, &SchemaQualifier> = schema_set
504        .iter()
505        .map(|sq| (format!("{}.{}", sq.database, sq.schema), sq))
506        .collect();
507    let fqns: Vec<String> = fqn_to_schema.keys().cloned().collect();
508
509    let mut unowned_schemas = Vec::new();
510    let mut current_user = String::new();
511
512    for chunk in fqns.chunks(LOOKUP_BATCH_SIZE) {
513        let placeholders = sql_placeholders(chunk.len());
514        let query = format!(
515            r#"
516            SELECT d.name || '.' || s.name AS fqn, current_user() AS current_user
517            FROM mz_schemas s
518            JOIN mz_databases d ON s.database_id = d.id
519            JOIN mz_roles r ON s.owner_id = r.id
520            WHERE d.name || '.' || s.name IN ({placeholders})
521              AND r.name != current_user()
522            "#,
523        );
524
525        #[allow(clippy::as_conversions)]
526        let params: Vec<&(dyn ToSql + Sync)> =
527            chunk.iter().map(|fqn| fqn as &(dyn ToSql + Sync)).collect();
528
529        let rows = client
530            .query(&query, &params)
531            .await
532            .map_err(DatabaseValidationError::QueryError)?;
533
534        for row in rows {
535            let fqn: String = row.get("fqn");
536            if let Some(sq) = fqn_to_schema.get(&fqn) {
537                unowned_schemas.push((*sq).clone());
538            }
539            if current_user.is_empty() {
540                current_user = row.get("current_user");
541            }
542        }
543    }
544
545    if !unowned_schemas.is_empty() {
546        unowned_schemas.sort();
547        return Err(DatabaseValidationError::SchemaOwnershipMismatch {
548            unowned_schemas,
549            current_user,
550        });
551    }
552
553    Ok(())
554}
555
556/// Internal implementation of validate_cluster_ownership.
557pub(crate) async fn validate_cluster_ownership_impl(
558    client: &Client,
559    cluster_set: &BTreeSet<String>,
560) -> Result<(), DatabaseValidationError> {
561    if cluster_set.is_empty() {
562        return Ok(());
563    }
564
565    let cluster_names: Vec<String> = cluster_set.iter().cloned().collect();
566
567    let mut unowned_clusters = Vec::new();
568    let mut current_user = String::new();
569
570    for chunk in cluster_names.chunks(LOOKUP_BATCH_SIZE) {
571        let placeholders = sql_placeholders(chunk.len());
572        let query = format!(
573            r#"
574            SELECT c.name AS cluster_name, current_user() AS current_user
575            FROM mz_clusters c
576            JOIN mz_roles r ON c.owner_id = r.id
577            WHERE c.name IN ({placeholders})
578              AND r.name != current_user()
579            "#,
580        );
581
582        #[allow(clippy::as_conversions)]
583        let params: Vec<&(dyn ToSql + Sync)> = chunk
584            .iter()
585            .map(|name| name as &(dyn ToSql + Sync))
586            .collect();
587
588        let rows = client
589            .query(&query, &params)
590            .await
591            .map_err(DatabaseValidationError::QueryError)?;
592
593        for row in rows {
594            let cluster_name: String = row.get("cluster_name");
595            unowned_clusters.push(cluster_name);
596            if current_user.is_empty() {
597                current_user = row.get("current_user");
598            }
599        }
600    }
601
602    if !unowned_clusters.is_empty() {
603        unowned_clusters.sort();
604        return Err(DatabaseValidationError::ClusterOwnershipMismatch {
605            unowned_clusters,
606            current_user,
607        });
608    }
609
610    Ok(())
611}
612
613/// Internal implementation of validate_cluster_isolation.
614pub(crate) async fn validate_cluster_isolation_impl(
615    client: &Client,
616    planned_project: &graph::Project,
617) -> Result<(), DatabaseValidationError> {
618    // Get all clusters used by the project
619    let mut all_clusters: BTreeSet<String> = BTreeSet::new();
620    for cluster in &planned_project.cluster_dependencies {
621        all_clusters.insert(cluster.name.clone());
622    }
623
624    // Query sources from the database for these clusters
625    let sources_by_cluster = query_sources_by_cluster(client, &all_clusters).await?;
626
627    // Validate cluster isolation using the project's validation method
628    planned_project
629        .validate_cluster_isolation(&sources_by_cluster)
630        .map_err(|(cluster_name, compute_objects, storage_objects)| {
631            DatabaseValidationError::ClusterConflict {
632                cluster_name,
633                compute_objects,
634                storage_objects,
635            }
636        })
637}
638
639/// Internal implementation of validate_privileges.
640pub(crate) async fn validate_privileges_impl(
641    client: &Client,
642    planned_project: &graph::Project,
643) -> Result<(), DatabaseValidationError> {
644    // Check if user is a superuser
645    let row = client
646        .query_one("SELECT mz_is_superuser()", &[])
647        .await
648        .map_err(DatabaseValidationError::QueryError)?;
649    let is_superuser: bool = row.get(0);
650
651    if is_superuser {
652        return Ok(()); // Superuser has all privileges
653    }
654
655    // Collect all required databases from the project
656    let mut priv_required_databases = BTreeSet::new();
657    for db in &planned_project.databases {
658        priv_required_databases.insert(db.name.clone());
659    }
660
661    // Check USAGE privileges on databases using the provided query
662    let missing_usage = if !priv_required_databases.is_empty() {
663        let in_clause = sql_placeholders(priv_required_databases.len());
664
665        let query = format!(
666            r#"
667            SELECT name
668            FROM mz_internal.mz_show_my_database_privileges
669            WHERE name IN ({})
670            GROUP BY name
671            HAVING NOT BOOL_OR(privilege_type = 'USAGE')
672            "#,
673            in_clause
674        );
675
676        #[allow(clippy::as_conversions)]
677        let params: Vec<&(dyn ToSql + Sync)> = priv_required_databases
678            .iter()
679            .map(|s| s as &(dyn ToSql + Sync))
680            .collect();
681
682        let rows = client
683            .query(&query, &params)
684            .await
685            .map_err(DatabaseValidationError::QueryError)?;
686
687        rows.iter()
688            .map(|row| row.get::<_, String>("name"))
689            .collect::<Vec<_>>()
690    } else {
691        Vec::new()
692    };
693
694    // Check CREATECLUSTER privilege if project has cluster dependencies
695    let missing_createcluster = if !planned_project.cluster_dependencies.is_empty() {
696        let query = r#"
697            SELECT EXISTS (
698                SELECT * FROM mz_internal.mz_show_my_system_privileges
699                WHERE privilege_type = 'CREATECLUSTER'
700            )
701        "#;
702
703        let row = client
704            .query_one(query, &[])
705            .await
706            .map_err(DatabaseValidationError::QueryError)?;
707
708        let has_createcluster: bool = row.get(0);
709        !has_createcluster
710    } else {
711        false
712    };
713
714    // Return error if missing any privileges
715    if !missing_usage.is_empty() || missing_createcluster {
716        return Err(DatabaseValidationError::InsufficientPrivileges {
717            missing_database_usage: missing_usage,
718            missing_createcluster,
719        });
720    }
721
722    Ok(())
723}
724
725/// Internal implementation of validate_sources_exist.
726pub(crate) async fn validate_sources_exist_impl(
727    client: &Client,
728    planned_project: &graph::Project,
729) -> Result<(), DatabaseValidationError> {
730    let defined_sources: BTreeSet<ObjectId> = planned_project
731        .iter_objects()
732        .filter(|obj| matches!(obj.typed_object.stmt, Statement::CreateSource(_)))
733        .map(|obj| obj.id.clone())
734        .collect();
735
736    let mut referenced_sources = BTreeSet::new();
737    for obj in planned_project.iter_objects() {
738        if let Statement::CreateTableFromSource(ref stmt) = obj.typed_object.stmt {
739            let source_id = ObjectId::from_raw_item_name(
740                &stmt.source,
741                obj.id.expect_database(),
742                obj.id.schema(),
743            );
744            if !defined_sources.contains(&source_id) {
745                referenced_sources.insert(source_id);
746            }
747        }
748    }
749
750    let existing =
751        query_existing_object_ids(client, &referenced_sources, CatalogLookup::Sources).await?;
752    let missing_sources: Vec<ObjectId> =
753        referenced_sources.difference(&existing).cloned().collect();
754    if !missing_sources.is_empty() {
755        return Err(DatabaseValidationError::MissingSources(missing_sources));
756    }
757
758    Ok(())
759}
760
761/// Internal implementation of validate_sink_connections_exist.
762///
763/// Validates that all connections referenced by sinks exist in the database.
764/// Sinks reference connections (Kafka, Iceberg) that are not managed by mz-deploy.
765pub(crate) async fn validate_sink_connections_exist_impl(
766    client: &Client,
767    planned_project: &graph::Project,
768) -> Result<(), DatabaseValidationError> {
769    let mut referenced_connections = BTreeSet::new();
770    for obj in planned_project.iter_objects() {
771        if let Statement::CreateSink(ref stmt) = obj.typed_object.stmt {
772            let connection_ids = match &stmt.connection {
773                CreateSinkConnection::Kafka { connection, .. } => {
774                    vec![ObjectId::from_raw_item_name(
775                        connection,
776                        obj.id.expect_database(),
777                        obj.id.schema(),
778                    )]
779                }
780                CreateSinkConnection::Iceberg {
781                    catalog_connection,
782                    aws_connection,
783                    ..
784                } => {
785                    let mut ids = vec![ObjectId::from_raw_item_name(
786                        catalog_connection,
787                        obj.id.expect_database(),
788                        obj.id.schema(),
789                    )];
790                    if let Some(aws_connection) = aws_connection {
791                        ids.push(ObjectId::from_raw_item_name(
792                            aws_connection,
793                            obj.id.expect_database(),
794                            obj.id.schema(),
795                        ));
796                    }
797                    ids
798                }
799            };
800
801            for conn_id in connection_ids {
802                referenced_connections.insert(conn_id);
803            }
804        }
805    }
806
807    let existing =
808        query_existing_object_ids(client, &referenced_connections, CatalogLookup::Connections)
809            .await?;
810    let missing_connections: Vec<ObjectId> = referenced_connections
811        .difference(&existing)
812        .cloned()
813        .collect();
814    if !missing_connections.is_empty() {
815        return Err(DatabaseValidationError::MissingConnections(
816            missing_connections,
817        ));
818    }
819
820    Ok(())
821}
822
823/// Internal implementation of validate_table_dependencies.
824pub(crate) async fn validate_table_dependencies_impl(
825    client: &Client,
826    planned_project: &graph::Project,
827    objects_to_deploy: &BTreeSet<ObjectId>,
828) -> Result<(), DatabaseValidationError> {
829    let project_tables: BTreeSet<ObjectId> = planned_project.get_tables().collect();
830
831    let mut required_tables = BTreeSet::new();
832    for object_id in objects_to_deploy {
833        if let Some(obj) = planned_project.find_object(object_id) {
834            for dep_id in &obj.dependencies {
835                if project_tables.contains(dep_id) {
836                    required_tables.insert(dep_id.clone());
837                }
838            }
839        }
840    }
841
842    let existing_tables =
843        query_existing_object_ids(client, &required_tables, CatalogLookup::Tables).await?;
844    let missing_table_set: BTreeSet<ObjectId> = required_tables
845        .difference(&existing_tables)
846        .cloned()
847        .collect();
848
849    let mut objects_needing_tables = Vec::new();
850    for object_id in objects_to_deploy {
851        if let Some(obj) = planned_project.find_object(object_id) {
852            let mut missing_tables = Vec::new();
853            for dep_id in &obj.dependencies {
854                if project_tables.contains(dep_id) && missing_table_set.contains(dep_id) {
855                    missing_tables.push(dep_id.clone());
856                }
857            }
858
859            if !missing_tables.is_empty() {
860                objects_needing_tables.push((object_id.clone(), missing_tables));
861            }
862        }
863    }
864
865    if !objects_needing_tables.is_empty() {
866        return Err(DatabaseValidationError::MissingTableDependencies {
867            objects_needing_tables,
868        });
869    }
870
871    Ok(())
872}