1use crate::client::connection::{Client, ValidationClient};
38use crate::client::errors::DatabaseValidationError;
39use crate::client::sql_placeholders;
40use crate::project::SchemaQualifier;
41use crate::project::ast::Statement;
42use crate::project::ir::graph;
43use crate::project::ir::object_id::ObjectId;
44use mz_sql_parser::ast::CreateSinkConnection;
45use std::collections::{BTreeMap, BTreeSet};
46use std::path::Path;
47use std::path::PathBuf;
48use tokio_postgres::types::ToSql;
49
50const LOOKUP_BATCH_SIZE: usize = 1000;
51
52enum CatalogLookup {
53 Objects,
54 Sources,
55 Tables,
56 Connections,
57}
58
59impl CatalogLookup {
60 fn table_name(&self) -> &'static str {
61 match self {
62 CatalogLookup::Objects => "mz_objects",
63 CatalogLookup::Sources => "mz_sources",
64 CatalogLookup::Tables => "mz_tables",
65 CatalogLookup::Connections => "mz_connections",
66 }
67 }
68}
69
70pub(crate) async fn query_sources_by_cluster(
72 client: &Client,
73 cluster_names: &BTreeSet<String>,
74) -> Result<BTreeMap<String, Vec<String>>, DatabaseValidationError> {
75 if cluster_names.is_empty() {
76 return Ok(BTreeMap::new());
77 }
78
79 let in_clause = sql_placeholders(cluster_names.len());
80
81 let query = format!(
82 r#"
83 SELECT
84 c.name as cluster_name,
85 d.name || '.' || s.name || '.' || mo.name as fqn
86 FROM mz_catalog.mz_sources src
87 JOIN mz_catalog.mz_objects mo ON src.id = mo.id
88 JOIN mz_catalog.mz_schemas s ON mo.schema_id = s.id
89 JOIN mz_catalog.mz_databases d ON s.database_id = d.id
90 JOIN mz_catalog.mz_clusters c ON src.cluster_id = c.id
91 WHERE mo.id LIKE 'u%' AND c.name IN ({})
92 "#,
93 in_clause
94 );
95
96 #[allow(clippy::as_conversions)]
97 let params: Vec<&(dyn ToSql + Sync)> = cluster_names
98 .iter()
99 .map(|s| s as &(dyn ToSql + Sync))
100 .collect();
101
102 let rows = client
103 .query(&query, ¶ms)
104 .await
105 .map_err(DatabaseValidationError::QueryError)?;
106
107 let mut result: BTreeMap<String, Vec<String>> = BTreeMap::new();
108 for row in rows {
109 let cluster_name: String = row.get("cluster_name");
110 let fqn: String = row.get("fqn");
111 result
112 .entry(cluster_name)
113 .or_insert_with(Vec::new)
114 .push(fqn);
115 }
116
117 Ok(result)
118}
119
120async fn query_existing_names(
121 client: &Client,
122 table_name: &str,
123 column_name: &str,
124 names: &BTreeSet<String>,
125) -> Result<BTreeSet<String>, DatabaseValidationError> {
126 let mut existing = BTreeSet::new();
127 if names.is_empty() {
128 return Ok(existing);
129 }
130
131 let name_list: Vec<String> = names.iter().cloned().collect();
132 for chunk in name_list.chunks(LOOKUP_BATCH_SIZE) {
133 let placeholders = sql_placeholders(chunk.len());
134 let query = format!(
135 "SELECT {column} FROM {table} WHERE {column} IN ({placeholders})",
136 column = column_name,
137 table = table_name,
138 placeholders = placeholders
139 );
140
141 #[allow(clippy::as_conversions)]
142 let params: Vec<&(dyn ToSql + Sync)> = chunk
143 .iter()
144 .map(|name| name as &(dyn ToSql + Sync))
145 .collect();
146
147 let rows = client
148 .query(&query, ¶ms)
149 .await
150 .map_err(DatabaseValidationError::QueryError)?;
151 for row in rows {
152 let name: String = row.get(column_name);
153 existing.insert(name);
154 }
155 }
156
157 Ok(existing)
158}
159
160async fn query_existing_schema_pairs(
161 client: &Client,
162 schema_pairs: &BTreeSet<(String, String)>,
163) -> Result<BTreeSet<(String, String)>, DatabaseValidationError> {
164 let mut existing = BTreeSet::new();
165 if schema_pairs.is_empty() {
166 return Ok(existing);
167 }
168
169 let fqn_to_pair: BTreeMap<String, (String, String)> = schema_pairs
170 .iter()
171 .map(|(database, schema)| {
172 (
173 format!("{}.{}", database, schema),
174 (database.clone(), schema.clone()),
175 )
176 })
177 .collect();
178 let fqns: Vec<String> = fqn_to_pair.keys().cloned().collect();
179
180 for chunk in fqns.chunks(LOOKUP_BATCH_SIZE) {
181 let placeholders = sql_placeholders(chunk.len());
182 let query = format!(
183 r#"
184 SELECT d.name || '.' || s.name AS fqn
185 FROM mz_schemas s
186 JOIN mz_databases d ON s.database_id = d.id
187 WHERE d.name || '.' || s.name IN ({})
188 "#,
189 placeholders
190 );
191
192 #[allow(clippy::as_conversions)]
193 let params: Vec<&(dyn ToSql + Sync)> =
194 chunk.iter().map(|fqn| fqn as &(dyn ToSql + Sync)).collect();
195
196 let rows = client
197 .query(&query, ¶ms)
198 .await
199 .map_err(DatabaseValidationError::QueryError)?;
200 for row in rows {
201 let fqn: String = row.get("fqn");
202 if let Some(pair) = fqn_to_pair.get(&fqn) {
203 existing.insert(pair.clone());
204 }
205 }
206 }
207
208 Ok(existing)
209}
210
211async fn query_existing_object_ids(
212 client: &Client,
213 object_ids: &BTreeSet<ObjectId>,
214 lookup: CatalogLookup,
215) -> Result<BTreeSet<ObjectId>, DatabaseValidationError> {
216 let mut existing = BTreeSet::new();
217 if object_ids.is_empty() {
218 return Ok(existing);
219 }
220
221 let fqn_to_object: BTreeMap<String, ObjectId> = object_ids
222 .iter()
223 .map(|obj| (obj.to_string(), obj.clone()))
224 .collect();
225 let fqns: Vec<String> = fqn_to_object.keys().cloned().collect();
226 let table_name = lookup.table_name();
227
228 for chunk in fqns.chunks(LOOKUP_BATCH_SIZE) {
229 let placeholders = sql_placeholders(chunk.len());
230 let query = format!(
231 r#"
232 SELECT d.name || '.' || s.name || '.' || t.name AS fqn
233 FROM {table_name} t
234 JOIN mz_schemas s ON t.schema_id = s.id
235 JOIN mz_databases d ON s.database_id = d.id
236 WHERE d.name || '.' || s.name || '.' || t.name IN ({placeholders})
237 "#,
238 table_name = table_name,
239 placeholders = placeholders
240 );
241
242 #[allow(clippy::as_conversions)]
243 let params: Vec<&(dyn ToSql + Sync)> =
244 chunk.iter().map(|fqn| fqn as &(dyn ToSql + Sync)).collect();
245
246 let rows = client
247 .query(&query, ¶ms)
248 .await
249 .map_err(DatabaseValidationError::QueryError)?;
250 for row in rows {
251 let fqn: String = row.get("fqn");
252 if let Some(obj) = fqn_to_object.get(&fqn) {
253 existing.insert(obj.clone());
254 }
255 }
256 }
257
258 Ok(existing)
259}
260
261pub(crate) async fn validate_project_impl(
263 client: &Client,
264 planned_project: &graph::Project,
265 project_root: &Path,
266) -> Result<(), DatabaseValidationError> {
267 let (external_databases, external_schemas) = collect_external_dependencies(planned_project);
268 let missing_databases = find_missing_databases(client, &external_databases).await?;
269 let missing_schemas = find_missing_schemas(client, &external_schemas).await?;
270 let missing_clusters = find_missing_clusters(client, planned_project).await?;
271 let object_paths = build_object_paths(planned_project, project_root);
272 let missing_external_deps = find_missing_external_dependencies(client, planned_project).await?;
273 let compilation_errors =
274 build_compilation_errors(planned_project, &object_paths, &missing_external_deps);
275
276 if !missing_databases.is_empty()
277 || !missing_schemas.is_empty()
278 || !missing_clusters.is_empty()
279 || !compilation_errors.is_empty()
280 {
281 Err(DatabaseValidationError::Multiple {
282 databases: missing_databases,
283 schemas: missing_schemas,
284 clusters: missing_clusters,
285 compilation_errors,
286 })
287 } else {
288 Ok(())
289 }
290}
291
292fn collect_external_dependencies(
296 planned_project: &graph::Project,
297) -> (BTreeSet<String>, BTreeSet<(String, String)>) {
298 let project_databases: BTreeSet<_> = planned_project
299 .databases
300 .iter()
301 .map(|db| db.name.clone())
302 .collect();
303
304 let mut external_databases = BTreeSet::new();
305 let mut external_schemas = BTreeSet::new();
306 for ext_dep in &planned_project.external_dependencies {
307 let Some(db) = ext_dep.database() else {
309 continue;
310 };
311 if !project_databases.contains(db) {
312 external_databases.insert(db.to_string());
313 }
314 external_schemas.insert((db.to_string(), ext_dep.schema().to_string()));
315 }
316 (external_databases, external_schemas)
317}
318
319async fn find_missing_databases(
321 client: &Client,
322 external_databases: &BTreeSet<String>,
323) -> Result<Vec<String>, DatabaseValidationError> {
324 let existing = query_existing_names(client, "mz_databases", "name", external_databases).await?;
325 Ok(external_databases.difference(&existing).cloned().collect())
326}
327
328async fn find_missing_schemas(
330 client: &Client,
331 external_schemas: &BTreeSet<(String, String)>,
332) -> Result<Vec<SchemaQualifier>, DatabaseValidationError> {
333 let existing = query_existing_schema_pairs(client, external_schemas).await?;
334 Ok(external_schemas
335 .difference(&existing)
336 .map(|(db, schema)| SchemaQualifier::new(db.clone(), schema.clone()))
337 .collect())
338}
339
340async fn find_missing_clusters(
342 client: &Client,
343 planned_project: &graph::Project,
344) -> Result<Vec<String>, DatabaseValidationError> {
345 let required: BTreeSet<String> = planned_project
346 .cluster_dependencies
347 .iter()
348 .map(|cluster| cluster.name.clone())
349 .collect();
350 let existing = query_existing_names(client, "mz_clusters", "name", &required).await?;
351 Ok(required.difference(&existing).cloned().collect())
352}
353
354fn build_object_paths(
358 planned_project: &graph::Project,
359 project_root: &Path,
360) -> BTreeMap<ObjectId, PathBuf> {
361 let mut object_paths = BTreeMap::new();
362 for db in &planned_project.databases {
363 for schema in &db.schemas {
364 for obj in &schema.objects {
365 let file_path = project_root
366 .join("models")
367 .join(obj.id.expect_database())
368 .join(obj.id.schema())
369 .join(format!("{}.sql", obj.id.object()));
370 object_paths.insert(obj.id.clone(), file_path);
371 }
372 }
373 }
374 object_paths
375}
376
377async fn find_missing_external_dependencies(
379 client: &Client,
380 planned_project: &graph::Project,
381) -> Result<BTreeSet<ObjectId>, DatabaseValidationError> {
382 let external_deps: BTreeSet<ObjectId> = planned_project
383 .external_dependencies
384 .iter()
385 .cloned()
386 .collect();
387 let existing =
388 query_existing_object_ids(client, &external_deps, CatalogLookup::Objects).await?;
389 Ok(external_deps.difference(&existing).cloned().collect())
390}
391
392fn build_compilation_errors(
396 planned_project: &graph::Project,
397 object_paths: &BTreeMap<ObjectId, PathBuf>,
398 missing_external_deps: &BTreeSet<ObjectId>,
399) -> Vec<DatabaseValidationError> {
400 let mut errors = Vec::new();
401 for db in &planned_project.databases {
402 for schema in &db.schemas {
403 for obj in &schema.objects {
404 let missing_for_object: Vec<_> = obj
405 .dependencies
406 .iter()
407 .filter(|dep| missing_external_deps.contains(*dep))
408 .cloned()
409 .collect();
410 if missing_for_object.is_empty() {
411 continue;
412 }
413 if let Some(file_path) = object_paths.get(&obj.id) {
414 errors.push(DatabaseValidationError::CompilationFailed {
415 file_path: file_path.clone(),
416 object_name: obj.id.clone(),
417 missing_dependencies: missing_for_object,
418 });
419 }
420 }
421 }
422 }
423 errors
424}
425
426impl ValidationClient<'_> {
427 pub async fn validate_project(
429 &self,
430 planned_project: &graph::Project,
431 project_root: &Path,
432 ) -> Result<(), DatabaseValidationError> {
433 validate_project_impl(self.client, planned_project, project_root).await
434 }
435
436 pub async fn validate_cluster_isolation(
438 &self,
439 planned_project: &graph::Project,
440 ) -> Result<(), DatabaseValidationError> {
441 validate_cluster_isolation_impl(self.client, planned_project).await
442 }
443
444 pub async fn validate_privileges(
446 &self,
447 planned_project: &graph::Project,
448 ) -> Result<(), DatabaseValidationError> {
449 validate_privileges_impl(self.client, planned_project).await
450 }
451
452 pub async fn validate_sources_exist(
454 &self,
455 planned_project: &graph::Project,
456 ) -> Result<(), DatabaseValidationError> {
457 validate_sources_exist_impl(self.client, planned_project).await
458 }
459
460 pub async fn validate_sink_connections_exist(
462 &self,
463 planned_project: &graph::Project,
464 ) -> Result<(), DatabaseValidationError> {
465 validate_sink_connections_exist_impl(self.client, planned_project).await
466 }
467
468 pub async fn validate_schema_ownership(
470 &self,
471 schema_set: &BTreeSet<SchemaQualifier>,
472 ) -> Result<(), DatabaseValidationError> {
473 validate_schema_ownership_impl(self.client, schema_set).await
474 }
475
476 pub async fn validate_cluster_ownership(
478 &self,
479 cluster_set: &BTreeSet<String>,
480 ) -> Result<(), DatabaseValidationError> {
481 validate_cluster_ownership_impl(self.client, cluster_set).await
482 }
483
484 pub async fn validate_table_dependencies(
486 &self,
487 planned_project: &graph::Project,
488 objects_to_deploy: &BTreeSet<ObjectId>,
489 ) -> Result<(), DatabaseValidationError> {
490 validate_table_dependencies_impl(self.client, planned_project, objects_to_deploy).await
491 }
492}
493
494pub(crate) async fn validate_schema_ownership_impl(
496 client: &Client,
497 schema_set: &BTreeSet<SchemaQualifier>,
498) -> Result<(), DatabaseValidationError> {
499 if schema_set.is_empty() {
500 return Ok(());
501 }
502
503 let fqn_to_schema: BTreeMap<String, &SchemaQualifier> = schema_set
504 .iter()
505 .map(|sq| (format!("{}.{}", sq.database, sq.schema), sq))
506 .collect();
507 let fqns: Vec<String> = fqn_to_schema.keys().cloned().collect();
508
509 let mut unowned_schemas = Vec::new();
510 let mut current_user = String::new();
511
512 for chunk in fqns.chunks(LOOKUP_BATCH_SIZE) {
513 let placeholders = sql_placeholders(chunk.len());
514 let query = format!(
515 r#"
516 SELECT d.name || '.' || s.name AS fqn, current_user() AS current_user
517 FROM mz_schemas s
518 JOIN mz_databases d ON s.database_id = d.id
519 JOIN mz_roles r ON s.owner_id = r.id
520 WHERE d.name || '.' || s.name IN ({placeholders})
521 AND r.name != current_user()
522 "#,
523 );
524
525 #[allow(clippy::as_conversions)]
526 let params: Vec<&(dyn ToSql + Sync)> =
527 chunk.iter().map(|fqn| fqn as &(dyn ToSql + Sync)).collect();
528
529 let rows = client
530 .query(&query, ¶ms)
531 .await
532 .map_err(DatabaseValidationError::QueryError)?;
533
534 for row in rows {
535 let fqn: String = row.get("fqn");
536 if let Some(sq) = fqn_to_schema.get(&fqn) {
537 unowned_schemas.push((*sq).clone());
538 }
539 if current_user.is_empty() {
540 current_user = row.get("current_user");
541 }
542 }
543 }
544
545 if !unowned_schemas.is_empty() {
546 unowned_schemas.sort();
547 return Err(DatabaseValidationError::SchemaOwnershipMismatch {
548 unowned_schemas,
549 current_user,
550 });
551 }
552
553 Ok(())
554}
555
556pub(crate) async fn validate_cluster_ownership_impl(
558 client: &Client,
559 cluster_set: &BTreeSet<String>,
560) -> Result<(), DatabaseValidationError> {
561 if cluster_set.is_empty() {
562 return Ok(());
563 }
564
565 let cluster_names: Vec<String> = cluster_set.iter().cloned().collect();
566
567 let mut unowned_clusters = Vec::new();
568 let mut current_user = String::new();
569
570 for chunk in cluster_names.chunks(LOOKUP_BATCH_SIZE) {
571 let placeholders = sql_placeholders(chunk.len());
572 let query = format!(
573 r#"
574 SELECT c.name AS cluster_name, current_user() AS current_user
575 FROM mz_clusters c
576 JOIN mz_roles r ON c.owner_id = r.id
577 WHERE c.name IN ({placeholders})
578 AND r.name != current_user()
579 "#,
580 );
581
582 #[allow(clippy::as_conversions)]
583 let params: Vec<&(dyn ToSql + Sync)> = chunk
584 .iter()
585 .map(|name| name as &(dyn ToSql + Sync))
586 .collect();
587
588 let rows = client
589 .query(&query, ¶ms)
590 .await
591 .map_err(DatabaseValidationError::QueryError)?;
592
593 for row in rows {
594 let cluster_name: String = row.get("cluster_name");
595 unowned_clusters.push(cluster_name);
596 if current_user.is_empty() {
597 current_user = row.get("current_user");
598 }
599 }
600 }
601
602 if !unowned_clusters.is_empty() {
603 unowned_clusters.sort();
604 return Err(DatabaseValidationError::ClusterOwnershipMismatch {
605 unowned_clusters,
606 current_user,
607 });
608 }
609
610 Ok(())
611}
612
613pub(crate) async fn validate_cluster_isolation_impl(
615 client: &Client,
616 planned_project: &graph::Project,
617) -> Result<(), DatabaseValidationError> {
618 let mut all_clusters: BTreeSet<String> = BTreeSet::new();
620 for cluster in &planned_project.cluster_dependencies {
621 all_clusters.insert(cluster.name.clone());
622 }
623
624 let sources_by_cluster = query_sources_by_cluster(client, &all_clusters).await?;
626
627 planned_project
629 .validate_cluster_isolation(&sources_by_cluster)
630 .map_err(|(cluster_name, compute_objects, storage_objects)| {
631 DatabaseValidationError::ClusterConflict {
632 cluster_name,
633 compute_objects,
634 storage_objects,
635 }
636 })
637}
638
639pub(crate) async fn validate_privileges_impl(
641 client: &Client,
642 planned_project: &graph::Project,
643) -> Result<(), DatabaseValidationError> {
644 let row = client
646 .query_one("SELECT mz_is_superuser()", &[])
647 .await
648 .map_err(DatabaseValidationError::QueryError)?;
649 let is_superuser: bool = row.get(0);
650
651 if is_superuser {
652 return Ok(()); }
654
655 let mut priv_required_databases = BTreeSet::new();
657 for db in &planned_project.databases {
658 priv_required_databases.insert(db.name.clone());
659 }
660
661 let missing_usage = if !priv_required_databases.is_empty() {
663 let in_clause = sql_placeholders(priv_required_databases.len());
664
665 let query = format!(
666 r#"
667 SELECT name
668 FROM mz_internal.mz_show_my_database_privileges
669 WHERE name IN ({})
670 GROUP BY name
671 HAVING NOT BOOL_OR(privilege_type = 'USAGE')
672 "#,
673 in_clause
674 );
675
676 #[allow(clippy::as_conversions)]
677 let params: Vec<&(dyn ToSql + Sync)> = priv_required_databases
678 .iter()
679 .map(|s| s as &(dyn ToSql + Sync))
680 .collect();
681
682 let rows = client
683 .query(&query, ¶ms)
684 .await
685 .map_err(DatabaseValidationError::QueryError)?;
686
687 rows.iter()
688 .map(|row| row.get::<_, String>("name"))
689 .collect::<Vec<_>>()
690 } else {
691 Vec::new()
692 };
693
694 let missing_createcluster = if !planned_project.cluster_dependencies.is_empty() {
696 let query = r#"
697 SELECT EXISTS (
698 SELECT * FROM mz_internal.mz_show_my_system_privileges
699 WHERE privilege_type = 'CREATECLUSTER'
700 )
701 "#;
702
703 let row = client
704 .query_one(query, &[])
705 .await
706 .map_err(DatabaseValidationError::QueryError)?;
707
708 let has_createcluster: bool = row.get(0);
709 !has_createcluster
710 } else {
711 false
712 };
713
714 if !missing_usage.is_empty() || missing_createcluster {
716 return Err(DatabaseValidationError::InsufficientPrivileges {
717 missing_database_usage: missing_usage,
718 missing_createcluster,
719 });
720 }
721
722 Ok(())
723}
724
725pub(crate) async fn validate_sources_exist_impl(
727 client: &Client,
728 planned_project: &graph::Project,
729) -> Result<(), DatabaseValidationError> {
730 let defined_sources: BTreeSet<ObjectId> = planned_project
731 .iter_objects()
732 .filter(|obj| matches!(obj.typed_object.stmt, Statement::CreateSource(_)))
733 .map(|obj| obj.id.clone())
734 .collect();
735
736 let mut referenced_sources = BTreeSet::new();
737 for obj in planned_project.iter_objects() {
738 if let Statement::CreateTableFromSource(ref stmt) = obj.typed_object.stmt {
739 let source_id = ObjectId::from_raw_item_name(
740 &stmt.source,
741 obj.id.expect_database(),
742 obj.id.schema(),
743 );
744 if !defined_sources.contains(&source_id) {
745 referenced_sources.insert(source_id);
746 }
747 }
748 }
749
750 let existing =
751 query_existing_object_ids(client, &referenced_sources, CatalogLookup::Sources).await?;
752 let missing_sources: Vec<ObjectId> =
753 referenced_sources.difference(&existing).cloned().collect();
754 if !missing_sources.is_empty() {
755 return Err(DatabaseValidationError::MissingSources(missing_sources));
756 }
757
758 Ok(())
759}
760
761pub(crate) async fn validate_sink_connections_exist_impl(
766 client: &Client,
767 planned_project: &graph::Project,
768) -> Result<(), DatabaseValidationError> {
769 let mut referenced_connections = BTreeSet::new();
770 for obj in planned_project.iter_objects() {
771 if let Statement::CreateSink(ref stmt) = obj.typed_object.stmt {
772 let connection_ids = match &stmt.connection {
773 CreateSinkConnection::Kafka { connection, .. } => {
774 vec![ObjectId::from_raw_item_name(
775 connection,
776 obj.id.expect_database(),
777 obj.id.schema(),
778 )]
779 }
780 CreateSinkConnection::Iceberg {
781 catalog_connection,
782 aws_connection,
783 ..
784 } => {
785 let mut ids = vec![ObjectId::from_raw_item_name(
786 catalog_connection,
787 obj.id.expect_database(),
788 obj.id.schema(),
789 )];
790 if let Some(aws_connection) = aws_connection {
791 ids.push(ObjectId::from_raw_item_name(
792 aws_connection,
793 obj.id.expect_database(),
794 obj.id.schema(),
795 ));
796 }
797 ids
798 }
799 };
800
801 for conn_id in connection_ids {
802 referenced_connections.insert(conn_id);
803 }
804 }
805 }
806
807 let existing =
808 query_existing_object_ids(client, &referenced_connections, CatalogLookup::Connections)
809 .await?;
810 let missing_connections: Vec<ObjectId> = referenced_connections
811 .difference(&existing)
812 .cloned()
813 .collect();
814 if !missing_connections.is_empty() {
815 return Err(DatabaseValidationError::MissingConnections(
816 missing_connections,
817 ));
818 }
819
820 Ok(())
821}
822
823pub(crate) async fn validate_table_dependencies_impl(
825 client: &Client,
826 planned_project: &graph::Project,
827 objects_to_deploy: &BTreeSet<ObjectId>,
828) -> Result<(), DatabaseValidationError> {
829 let project_tables: BTreeSet<ObjectId> = planned_project.get_tables().collect();
830
831 let mut required_tables = BTreeSet::new();
832 for object_id in objects_to_deploy {
833 if let Some(obj) = planned_project.find_object(object_id) {
834 for dep_id in &obj.dependencies {
835 if project_tables.contains(dep_id) {
836 required_tables.insert(dep_id.clone());
837 }
838 }
839 }
840 }
841
842 let existing_tables =
843 query_existing_object_ids(client, &required_tables, CatalogLookup::Tables).await?;
844 let missing_table_set: BTreeSet<ObjectId> = required_tables
845 .difference(&existing_tables)
846 .cloned()
847 .collect();
848
849 let mut objects_needing_tables = Vec::new();
850 for object_id in objects_to_deploy {
851 if let Some(obj) = planned_project.find_object(object_id) {
852 let mut missing_tables = Vec::new();
853 for dep_id in &obj.dependencies {
854 if project_tables.contains(dep_id) && missing_table_set.contains(dep_id) {
855 missing_tables.push(dep_id.clone());
856 }
857 }
858
859 if !missing_tables.is_empty() {
860 objects_needing_tables.push((object_id.clone(), missing_tables));
861 }
862 }
863 }
864
865 if !objects_needing_tables.is_empty() {
866 return Err(DatabaseValidationError::MissingTableDependencies {
867 objects_needing_tables,
868 });
869 }
870
871 Ok(())
872}