1use crate::config::ConfigError;
21use crate::project::SchemaQualifier;
22use crate::project::ir::object_id::ObjectId;
23use owo_colors::{OwoColorize, Stream, Style};
24use std::fmt;
25use std::path::PathBuf;
26use thiserror::Error;
27
28#[derive(Debug, Error)]
30pub enum ConnectionError {
31 #[error("configuration error: {0}")]
32 Config(#[from] ConfigError),
33
34 #[error("failed to connect to {host}:{port}: {source}")]
35 Connect {
36 host: String,
37 port: u16,
38 source: tokio_postgres::Error,
39 },
40
41 #[error(
42 "TLS required by profile but server at {host}:{port} does not support TLS\n\
43 \n\
44 help: The server did not offer TLS. To connect without encryption, set\n\
45 \x20 sslmode = \"disable\" on the profile. To use TLS if available\n\
46 \x20 but fall back to plaintext otherwise, set sslmode = \"prefer\"."
47 )]
48 TlsRequiredNotSupported {
49 host: String,
50 port: u16,
51 source: tokio_postgres::Error,
52 },
53
54 #[error(
55 "TLS certificate verification failed for {host}:{port}: {source}\n\
56 \n\
57 help: The server's certificate could not be verified against the trusted\n\
58 \x20 CA bundle{hostname_suffix}. To skip verification, set\n\
59 \x20 sslmode = \"require\" or sslmode = \"prefer\". To use a custom\n\
60 \x20 CA bundle, set sslrootcert = \"/path/to/ca.pem\" on the profile."
61 )]
62 TlsVerification {
63 host: String,
64 port: u16,
65 hostname_suffix: &'static str,
66 source: tokio_postgres::Error,
67 },
68
69 #[error(
70 "no CA bundle found for TLS verification\n\
71 \n\
72 help: Set sslrootcert = \"/path/to/ca.pem\" on the profile to point at\n\
73 \x20 a specific CA bundle, or install the system CA bundle at one\n\
74 \x20 of: /etc/ssl/cert.pem, /etc/ssl/certs/ca-certificates.crt, or\n\
75 \x20 the platform-appropriate equivalent."
76 )]
77 TlsCaNotFound,
78
79 #[error("{}", format_query_error(.0))]
80 Query(tokio_postgres::Error),
81
82 #[error("dependency error: {0}")]
83 Dependency(#[from] crate::project::error::DependencyError),
84
85 #[error("failed to create database '{database}': {source}")]
86 DatabaseCreationFailed {
87 database: String,
88 source: Box<dyn std::error::Error + Send + Sync>,
89 },
90
91 #[error("failed to create schema '{database}.{schema}': {source}")]
92 SchemaCreationFailed {
93 database: String,
94 schema: String,
95 source: Box<dyn std::error::Error + Send + Sync>,
96 },
97
98 #[error("failed to create cluster '{name}': {source}")]
99 ClusterCreationFailed {
100 name: String,
101 source: Box<dyn std::error::Error + Send + Sync>,
102 },
103
104 #[error("cluster '{name}' already exists")]
105 ClusterAlreadyExists { name: String },
106
107 #[error("introspection failed for {object_type}: {source}")]
108 IntrospectionFailed {
109 object_type: String,
110 source: Box<dyn std::error::Error + Send + Sync>,
111 },
112
113 #[error("cluster '{name}' not found")]
114 ClusterNotFound { name: String },
115
116 #[error("deployment '{deploy_id}' already exists")]
117 DeploymentAlreadyExists { deploy_id: String },
118
119 #[error("deployment '{deploy_id}' not found")]
120 DeploymentNotFound { deploy_id: String },
121
122 #[error("deployment '{deploy_id}' has already been promoted to production")]
123 DeploymentAlreadyPromoted { deploy_id: String },
124
125 #[error("unsupported statement type: {0}")]
126 UnsupportedStatementType(String),
127
128 #[error("{0}")]
129 Message(String),
130}
131
132fn format_query_error(error: &tokio_postgres::Error) -> String {
133 if let Some(db_error) = error.as_db_error() {
134 let mut parts = vec![format!("database error: {}", db_error.message())];
135
136 if let Some(detail) = db_error.detail() {
137 parts.push(format!(" Detail: {}", detail));
138 }
139
140 if let Some(hint) = db_error.hint() {
141 parts.push(format!(" Hint: {}", hint));
142 }
143
144 parts.push(format!(" Code: {:?}", db_error.code()));
145 parts.join("\n")
146 } else {
147 format!("query error: {}", error)
148 }
149}
150
151impl From<tokio_postgres::Error> for ConnectionError {
152 fn from(error: tokio_postgres::Error) -> Self {
153 ConnectionError::Query(error)
154 }
155}
156
157impl From<mz_postgres_util::PostgresError> for ConnectionError {
158 fn from(error: mz_postgres_util::PostgresError) -> Self {
159 match error {
160 mz_postgres_util::PostgresError::Postgres(error) => ConnectionError::Query(error),
161 other => ConnectionError::Message(other.to_string()),
162 }
163 }
164}
165
166#[derive(Debug)]
168pub enum DatabaseValidationError {
169 MissingDatabases(Vec<String>),
171 MissingSchemas(Vec<SchemaQualifier>),
173 MissingClusters(Vec<String>),
175 CompilationFailed {
177 file_path: PathBuf,
178 object_name: ObjectId,
179 missing_dependencies: Vec<ObjectId>,
180 },
181 Multiple {
183 databases: Vec<String>,
184 schemas: Vec<SchemaQualifier>,
185 clusters: Vec<String>,
186 compilation_errors: Vec<DatabaseValidationError>,
187 },
188 ClusterConflict {
191 cluster_name: String,
192 compute_objects: Vec<String>,
193 storage_objects: Vec<String>,
194 },
195 InsufficientPrivileges {
197 missing_database_usage: Vec<String>,
198 missing_createcluster: bool,
199 },
200 SchemaOwnershipMismatch {
202 unowned_schemas: Vec<SchemaQualifier>,
203 current_user: String,
204 },
205 ClusterOwnershipMismatch {
207 unowned_clusters: Vec<String>,
208 current_user: String,
209 },
210 MissingSources(Vec<ObjectId>),
212 MissingConnections(Vec<ObjectId>),
214 MissingTableDependencies {
216 objects_needing_tables: Vec<(ObjectId, Vec<ObjectId>)>,
217 },
218 QueryError(ConnectionError),
220}
221
222impl fmt::Display for DatabaseValidationError {
223 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
224 match self {
225 DatabaseValidationError::MissingDatabases(dbs) => {
226 write!(f, "Missing databases: {}", dbs.join(", "))
227 }
228 DatabaseValidationError::MissingSchemas(schemas) => {
229 let schema_list: Vec<String> = schemas
230 .iter()
231 .map(|sq| format!("{}.{}", sq.database, sq.schema))
232 .collect();
233 write!(f, "Missing schemas: {}", schema_list.join(", "))
234 }
235 DatabaseValidationError::MissingClusters(clusters) => {
236 write!(f, "Missing clusters: {}", clusters.join(", "))
237 }
238 DatabaseValidationError::CompilationFailed {
239 file_path,
240 object_name,
241 missing_dependencies,
242 } => {
243 let relative_path = format_relative_path(file_path);
244
245 let error_style = Style::new().bright_red().bold();
246 let arrow_style = Style::new().bright_blue().bold();
247 writeln!(
248 f,
249 "{}: failed to compile '{}': missing external dependencies",
250 "error".if_supports_color(Stream::Stderr, |t| error_style.style(t)),
251 object_name
252 )?;
253 writeln!(
254 f,
255 " {} {}",
256 "-->".if_supports_color(Stream::Stderr, |t| arrow_style.style(t)),
257 relative_path
258 )?;
259 writeln!(f)?;
260 writeln!(f, " Missing dependencies:")?;
261 for dep in missing_dependencies {
262 writeln!(f, " - {}", dep)?;
263 }
264 Ok(())
265 }
266 DatabaseValidationError::Multiple {
267 databases,
268 schemas,
269 clusters,
270 compilation_errors,
271 } => {
272 let mut has_errors = false;
273
274 writeln!(f, "Missing dependencies")?;
275 if !databases.is_empty() {
276 writeln!(f, "Missing databases: {}", databases.join(", "))?;
277 has_errors = true;
278 }
279
280 if !schemas.is_empty() {
281 let schema_list: Vec<String> = schemas
282 .iter()
283 .map(|sq| format!("{}.{}", sq.database, sq.schema))
284 .collect();
285 writeln!(f, "Missing schemas: {}", schema_list.join(", "))?;
286 has_errors = true;
287 }
288
289 if !clusters.is_empty() {
290 writeln!(f, "Missing clusters: {}", clusters.join(", "))?;
291 has_errors = true;
292 }
293
294 if !compilation_errors.is_empty() {
295 if has_errors {
296 writeln!(f)?;
297 }
298 for (idx, err) in compilation_errors.iter().enumerate() {
299 if idx > 0 {
300 writeln!(f)?;
301 }
302 write!(f, "{}", err)?;
303 }
304 }
305
306 Ok(())
307 }
308 DatabaseValidationError::ClusterConflict {
309 cluster_name,
310 compute_objects,
311 storage_objects,
312 } => {
313 let error_style = Style::new().bright_red().bold();
314 writeln!(
315 f,
316 "{}: cluster '{}' contains both storage and computation objects",
317 "error".if_supports_color(Stream::Stderr, |t| error_style.style(t)),
318 cluster_name
319 )?;
320 writeln!(f)?;
321 writeln!(f, " Computation objects (indexes, materialized views):")?;
322 for obj in compute_objects {
323 writeln!(f, " - {}", obj)?;
324 }
325 writeln!(f)?;
326 writeln!(f, " Storage objects (sources, sinks):")?;
327 for obj in storage_objects {
328 writeln!(f, " - {}", obj)?;
329 }
330 writeln!(f)?;
331 let help_style = Style::new().bright_cyan().bold();
332 writeln!(
333 f,
334 " {} Move sources/sinks to a separate cluster to avoid accidental recreation",
335 "help:".if_supports_color(Stream::Stderr, |t| help_style.style(t))
336 )?;
337 Ok(())
338 }
339 DatabaseValidationError::InsufficientPrivileges {
340 missing_database_usage,
341 missing_createcluster,
342 } => {
343 let error_style = Style::new().bright_red().bold();
344 let help_style = Style::new().bright_cyan().bold();
345 writeln!(
346 f,
347 "{}: insufficient privileges to deploy this project",
348 "error".if_supports_color(Stream::Stderr, |t| error_style.style(t))
349 )?;
350 writeln!(f)?;
351
352 if !missing_database_usage.is_empty() {
353 writeln!(f, " Missing USAGE privilege on databases:")?;
354 for db in missing_database_usage {
355 writeln!(f, " - {}", db)?;
356 }
357 writeln!(f)?;
358 }
359
360 if *missing_createcluster {
361 writeln!(f, " Missing CREATECLUSTER system privilege")?;
362 writeln!(f)?;
363 }
364
365 writeln!(
366 f,
367 " {} Ask your administrator to grant the required privileges:",
368 "help:".if_supports_color(Stream::Stderr, |t| help_style.style(t))
369 )?;
370 writeln!(f)?;
371
372 if !missing_database_usage.is_empty() {
373 for db in missing_database_usage {
374 writeln!(f, " GRANT USAGE ON DATABASE {} TO <user>;", db)?;
375 }
376 }
377
378 if *missing_createcluster {
379 writeln!(f, " GRANT CREATECLUSTER ON SYSTEM TO <user>;")?;
380 }
381
382 Ok(())
383 }
384 DatabaseValidationError::SchemaOwnershipMismatch {
385 unowned_schemas,
386 current_user,
387 } => {
388 let error_style = Style::new().bright_red().bold();
389 let help_style = Style::new().bright_cyan().bold();
390 writeln!(
391 f,
392 "{}: current role '{}' does not own the following production schemas",
393 "error".if_supports_color(Stream::Stderr, |t| error_style.style(t)),
394 current_user
395 )?;
396 writeln!(f)?;
397 for sq in unowned_schemas {
398 writeln!(f, " - {}.{}", sq.database, sq.schema)?;
399 }
400 writeln!(f)?;
401 writeln!(
402 f,
403 " {} Grant ownership of the schemas to the current role:",
404 "help:".if_supports_color(Stream::Stderr, |t| help_style.style(t))
405 )?;
406 writeln!(f)?;
407 for sq in unowned_schemas {
408 writeln!(
409 f,
410 " ALTER SCHEMA {}.{} OWNER TO {};",
411 sq.database, sq.schema, current_user
412 )?;
413 }
414 Ok(())
415 }
416 DatabaseValidationError::ClusterOwnershipMismatch {
417 unowned_clusters,
418 current_user,
419 } => {
420 let error_style = Style::new().bright_red().bold();
421 let help_style = Style::new().bright_cyan().bold();
422 writeln!(
423 f,
424 "{}: current role '{}' does not own the following production clusters",
425 "error".if_supports_color(Stream::Stderr, |t| error_style.style(t)),
426 current_user
427 )?;
428 writeln!(f)?;
429 for cluster in unowned_clusters {
430 writeln!(f, " - {}", cluster)?;
431 }
432 writeln!(f)?;
433 writeln!(
434 f,
435 " {} Grant ownership of the clusters to the current role:",
436 "help:".if_supports_color(Stream::Stderr, |t| help_style.style(t))
437 )?;
438 writeln!(f)?;
439 for cluster in unowned_clusters {
440 writeln!(
441 f,
442 " ALTER CLUSTER {} OWNER TO {};",
443 cluster, current_user
444 )?;
445 }
446 Ok(())
447 }
448 DatabaseValidationError::MissingSources(sources) => {
449 let error_style = Style::new().bright_red().bold();
450 writeln!(
451 f,
452 "{}: The following sources are referenced but do not exist:",
453 "error".if_supports_color(Stream::Stderr, |t| error_style.style(t))
454 )?;
455 for source in sources {
456 writeln!(f, " - {}", source)?;
457 }
458 writeln!(f)?;
459 writeln!(
460 f,
461 "Please ensure all sources are created before running this command."
462 )?;
463 Ok(())
464 }
465 DatabaseValidationError::MissingConnections(connections) => {
466 let error_style = Style::new().bright_red().bold();
467 let help_style = Style::new().bright_cyan().bold();
468 writeln!(
469 f,
470 "{}: The following connections are referenced but do not exist:",
471 "error".if_supports_color(Stream::Stderr, |t| error_style.style(t))
472 )?;
473 for conn in connections {
474 writeln!(f, " - {}", conn)?;
475 }
476 writeln!(f)?;
477 writeln!(
478 f,
479 "{} Connections are not managed by mz-deploy and must be created separately.",
480 "help:".if_supports_color(Stream::Stderr, |t| help_style.style(t))
481 )?;
482 Ok(())
483 }
484 DatabaseValidationError::MissingTableDependencies {
485 objects_needing_tables,
486 } => {
487 let help_style = Style::new().bright_cyan().bold();
488 writeln!(
489 f,
490 "Objects depend on tables that don't exist in the database",
491 )?;
492 writeln!(f)?;
493 for (object, missing_tables) in objects_needing_tables {
494 writeln!(
495 f,
496 " {} {} depends on:",
497 "×".if_supports_color(Stream::Stderr, |t| t.bright_red()),
498 object
499 )?;
500 for table in missing_tables {
501 writeln!(f, " - {}", table)?;
502 }
503 }
504 writeln!(f)?;
505 writeln!(
506 f,
507 "{} Run 'mz-deploy apply' to create the required tables first",
508 "help:".if_supports_color(Stream::Stderr, |t| help_style.style(t))
509 )?;
510 Ok(())
511 }
512 DatabaseValidationError::QueryError(e) => {
513 write!(f, "Database query failed: {}", e)
514 }
515 }
516 }
517}
518
519impl std::error::Error for DatabaseValidationError {}
520
521pub fn format_relative_path(path: &std::path::Path) -> String {
526 let path_components: Vec<_> = path.components().collect();
527 let len = path_components.len();
528 if len >= 3 {
529 format!(
530 "{}/{}/{}",
531 path_components[len - 3].as_os_str().to_string_lossy(),
532 path_components[len - 2].as_os_str().to_string_lossy(),
533 path_components[len - 1].as_os_str().to_string_lossy()
534 )
535 } else {
536 path.display().to_string()
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use std::path::PathBuf;
544
545 #[mz_ore::test]
546 fn test_missing_table_dependencies_error_display() {
547 let error = DatabaseValidationError::MissingTableDependencies {
548 objects_needing_tables: vec![
549 (
550 ObjectId::new(
551 "materialize".to_string(),
552 "public".to_string(),
553 "my_view".to_string(),
554 ),
555 vec![
556 ObjectId::new(
557 "materialize".to_string(),
558 "tables".to_string(),
559 "users".to_string(),
560 ),
561 ObjectId::new(
562 "materialize".to_string(),
563 "tables".to_string(),
564 "orders".to_string(),
565 ),
566 ],
567 ),
568 (
569 ObjectId::new(
570 "materialize".to_string(),
571 "public".to_string(),
572 "another_view".to_string(),
573 ),
574 vec![ObjectId::new(
575 "materialize".to_string(),
576 "tables".to_string(),
577 "products".to_string(),
578 )],
579 ),
580 ],
581 };
582
583 let error_string = format!("{}", error);
584
585 assert!(error_string.contains("Objects depend on tables that don't exist"));
587 assert!(error_string.contains("materialize.public.my_view"));
588 assert!(error_string.contains("materialize.tables.users"));
589 assert!(error_string.contains("materialize.tables.orders"));
590 assert!(error_string.contains("materialize.public.another_view"));
591 assert!(error_string.contains("materialize.tables.products"));
592 assert!(error_string.contains("help"));
593 assert!(error_string.contains("mz-deploy apply"));
594 }
595
596 #[mz_ore::test]
597 fn test_format_relative_path() {
598 let path = PathBuf::from("/home/user/project/database/schema/file.sql");
599 assert_eq!(format_relative_path(&path), "database/schema/file.sql");
600
601 let short_path = PathBuf::from("file.sql");
602 assert_eq!(format_relative_path(&short_path), "file.sql");
603 }
604
605 #[mz_ore::test]
606 fn test_format_relative_path_exactly_three_components() {
607 let path = PathBuf::from("database/schema/file.sql");
608 assert_eq!(format_relative_path(&path), "database/schema/file.sql");
609 }
610
611 #[mz_ore::test]
612 fn test_format_relative_path_two_components() {
613 let path = PathBuf::from("schema/file.sql");
614 assert_eq!(format_relative_path(&path), "schema/file.sql");
615 }
616
617 #[mz_ore::test]
618 fn test_missing_databases_error_display() {
619 let error =
620 DatabaseValidationError::MissingDatabases(vec!["db1".to_string(), "db2".to_string()]);
621 let error_string = format!("{}", error);
622 assert!(error_string.contains("Missing databases"));
623 assert!(error_string.contains("db1"));
624 assert!(error_string.contains("db2"));
625 }
626
627 #[mz_ore::test]
628 fn test_missing_schemas_error_display() {
629 let error = DatabaseValidationError::MissingSchemas(vec![
630 SchemaQualifier::new("db1".to_string(), "schema1".to_string()),
631 SchemaQualifier::new("db2".to_string(), "schema2".to_string()),
632 ]);
633 let error_string = format!("{}", error);
634 assert!(error_string.contains("Missing schemas"));
635 assert!(error_string.contains("db1.schema1"));
636 assert!(error_string.contains("db2.schema2"));
637 }
638
639 #[mz_ore::test]
640 fn test_missing_clusters_error_display() {
641 let error = DatabaseValidationError::MissingClusters(vec![
642 "cluster1".to_string(),
643 "cluster2".to_string(),
644 ]);
645 let error_string = format!("{}", error);
646 assert!(error_string.contains("Missing clusters"));
647 assert!(error_string.contains("cluster1"));
648 assert!(error_string.contains("cluster2"));
649 }
650
651 #[mz_ore::test]
652 fn test_cluster_conflict_error_display() {
653 let error = DatabaseValidationError::ClusterConflict {
654 cluster_name: "shared_cluster".to_string(),
655 compute_objects: vec!["my_index".to_string(), "my_mv".to_string()],
656 storage_objects: vec!["my_source".to_string()],
657 };
658 let error_string = format!("{}", error);
659 assert!(error_string.contains("shared_cluster"));
660 assert!(error_string.contains("storage and computation objects"));
661 assert!(error_string.contains("my_index"));
662 assert!(error_string.contains("my_mv"));
663 assert!(error_string.contains("my_source"));
664 assert!(error_string.contains("help"));
665 }
666
667 #[mz_ore::test]
668 fn test_insufficient_privileges_error_display() {
669 let error = DatabaseValidationError::InsufficientPrivileges {
670 missing_database_usage: vec!["db1".to_string(), "db2".to_string()],
671 missing_createcluster: true,
672 };
673 let error_string = format!("{}", error);
674 assert!(error_string.contains("insufficient privileges"));
675 assert!(error_string.contains("db1"));
676 assert!(error_string.contains("db2"));
677 assert!(error_string.contains("CREATECLUSTER"));
678 assert!(error_string.contains("GRANT"));
679 }
680
681 #[mz_ore::test]
682 fn test_insufficient_privileges_only_database() {
683 let error = DatabaseValidationError::InsufficientPrivileges {
684 missing_database_usage: vec!["db1".to_string()],
685 missing_createcluster: false,
686 };
687 let error_string = format!("{}", error);
688 assert!(error_string.contains("db1"));
689 assert!(!error_string.contains("CREATECLUSTER ON SYSTEM"));
690 }
691
692 #[mz_ore::test]
693 fn test_missing_sources_error_display() {
694 let error = DatabaseValidationError::MissingSources(vec![ObjectId::new(
695 "materialize".to_string(),
696 "public".to_string(),
697 "kafka_source".to_string(),
698 )]);
699 let error_string = format!("{}", error);
700 assert!(error_string.contains("sources are referenced but do not exist"));
701 assert!(error_string.contains("materialize.public.kafka_source"));
702 }
703
704 #[mz_ore::test]
705 fn test_multiple_validation_errors_display() {
706 let error = DatabaseValidationError::Multiple {
707 databases: vec!["missing_db".to_string()],
708 schemas: vec![SchemaQualifier::new(
709 "db".to_string(),
710 "missing_schema".to_string(),
711 )],
712 clusters: vec!["missing_cluster".to_string()],
713 compilation_errors: vec![],
714 };
715 let error_string = format!("{}", error);
716 assert!(error_string.contains("missing_db"));
717 assert!(error_string.contains("db.missing_schema"));
718 assert!(error_string.contains("missing_cluster"));
719 }
720
721 #[mz_ore::test]
722 fn test_connection_error_display() {
723 let error = ConnectionError::Message("test error message".to_string());
724 let error_string = format!("{}", error);
725 assert_eq!(error_string, "test error message");
726 }
727
728 #[mz_ore::test]
729 fn test_connection_error_cluster_not_found() {
730 let error = ConnectionError::ClusterNotFound {
731 name: "missing_cluster".to_string(),
732 };
733 let error_string = format!("{}", error);
734 assert!(error_string.contains("missing_cluster"));
735 assert!(error_string.contains("not found"));
736 }
737
738 #[mz_ore::test]
739 fn test_connection_error_deployment_already_exists() {
740 let error = ConnectionError::DeploymentAlreadyExists {
741 deploy_id: "staging_123".to_string(),
742 };
743 let error_string = format!("{}", error);
744 assert!(error_string.contains("staging_123"));
745 assert!(error_string.contains("already exists"));
746 }
747
748 #[mz_ore::test]
749 fn test_connection_error_deployment_not_found() {
750 let error = ConnectionError::DeploymentNotFound {
751 deploy_id: "nonexistent".to_string(),
752 };
753 let error_string = format!("{}", error);
754 assert!(error_string.contains("nonexistent"));
755 assert!(error_string.contains("not found"));
756 }
757
758 #[mz_ore::test]
759 fn test_connection_error_deployment_already_promoted() {
760 let error = ConnectionError::DeploymentAlreadyPromoted {
761 deploy_id: "prod_deploy".to_string(),
762 };
763 let error_string = format!("{}", error);
764 assert!(error_string.contains("prod_deploy"));
765 assert!(error_string.contains("already been promoted"));
766 }
767
768 #[mz_ore::test]
769 fn test_database_validation_error_is_error_trait() {
770 let error: Box<dyn std::error::Error> =
772 Box::new(DatabaseValidationError::MissingDatabases(vec![]));
773 assert!(error.to_string().contains("Missing databases"));
774 }
775}