1use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::connections::{
17 MySqlConnectionValidationError, PostgresConnectionValidationError,
18};
19use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
20
21use crate::names::{FullItemName, PartialItemName};
22
23#[derive(Debug, Clone, thiserror::Error)]
25pub enum PgSourcePurificationError {
26 #[error("CREATE SOURCE specifies DETAILS option")]
27 UserSpecifiedDetails,
28 #[error("{0} option is unnecessary when no tables are added")]
29 UnnecessaryOptionsWithoutReferences(String),
30 #[error("PUBLICATION {0} is empty")]
31 EmptyPublication(String),
32 #[error("database {database} missing referenced schemas")]
33 DatabaseMissingFilteredSchemas {
34 database: String,
35 schemas: Vec<String>,
36 },
37 #[error("missing TABLES specification")]
38 RequiresExternalReferences,
39 #[error("insufficient privileges")]
40 UserLacksUsageOnSchemas { schemas: Vec<String> },
41 #[error("insufficient privileges")]
42 UserLacksSelectOnTables { tables: Vec<String> },
43 #[error("one or more tables requires BYPASSRLS")]
44 BypassRLSRequired { tables: Vec<String> },
45 #[error("referenced items not tables with REPLICA IDENTITY FULL")]
46 NotTablesWReplicaIdentityFull { items: Vec<String> },
47 #[error("TEXT COLUMNS refers to table not currently being added")]
48 DanglingTextColumns { items: Vec<PartialItemName> },
49 #[error("EXCLUDE COLUMNS refers to table not currently being added")]
50 DanglingExcludeColumns { items: Vec<PartialItemName> },
51 #[error("duplicated column name references: {0:?}")]
52 DuplicatedColumnNames(Vec<String>),
53 #[error("referenced tables use unsupported types")]
54 UnrecognizedTypes { cols: Vec<(String, Oid)> },
55 #[error("{0} is not a POSTGRES CONNECTION")]
56 NotPgConnection(FullItemName),
57 #[error("CONNECTION must specify PUBLICATION")]
58 ConnectionMissingPublication,
59 #[error(transparent)]
60 InvalidConnection(PostgresConnectionValidationError),
61}
62
63impl PgSourcePurificationError {
64 pub fn detail(&self) -> Option<String> {
65 match self {
66 Self::DanglingTextColumns { items } => Some(format!(
67 "the following tables are referenced but not added: {}",
68 itertools::join(items, ", ")
69 )),
70 Self::DatabaseMissingFilteredSchemas {
71 database: _,
72 schemas,
73 } => Some(format!(
74 "missing schemas: {}",
75 itertools::join(schemas.iter(), ", ")
76 )),
77 Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
78 "user lacks USAGE privileges for schemas {}",
79 schemas.join(", ")
80 )),
81 Self::UserLacksSelectOnTables { tables } => Some(format!(
82 "user lacks SELECT privileges for tables {}",
83 tables.join(", ")
84 )),
85 Self::BypassRLSRequired { tables } => Some(format!(
86 "user must have BYPASSRLS attribute to read tables {}",
87 tables.join(", "),
88 )),
89 Self::NotTablesWReplicaIdentityFull { items } => {
90 Some(format!("referenced items: {}", items.join(", ")))
91 }
92 Self::UnrecognizedTypes { cols } => Some(format!(
93 "the following columns contain unsupported types:\n{}",
94 itertools::join(
95 cols.into_iter()
96 .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
97 "\n"
98 )
99 )),
100 Self::InvalidConnection(e) => e.detail(),
101 _ => None,
102 }
103 }
104
105 pub fn hint(&self) -> Option<String> {
106 match self {
107 Self::UserSpecifiedDetails => Some(
108 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
109 .into(),
110 ),
111 Self::RequiresExternalReferences => {
112 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
113 }
114 Self::UnrecognizedTypes {
115 cols: _,
116 } => Some(
117 "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
118 as text."
119 .into(),
120 ),
121 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
122 "Remove the {} option, as no tables are being added.",
123 option
124 )),
125 Self::BypassRLSRequired { .. } => Some("Add the BYPASSRLS attribute to the Materialize user".into()),
126 Self::InvalidConnection(e) => e.hint(),
127 _ => None,
128 }
129 }
130}
131
132#[derive(Debug, Clone, thiserror::Error)]
134pub enum KafkaSourcePurificationError {
135 #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
136 ReferencedSubsources(ExternalReferences),
137 #[error("KAFKA CONNECTION without TOPIC")]
138 ConnectionMissingTopic,
139 #[error("{0} is not a KAFKA CONNECTION")]
140 NotKafkaConnection(FullItemName),
141 #[error("failed to create and connect Kafka consumer")]
142 KafkaConsumerError(String),
143 #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
144 WrongKafkaTopic(String, UnresolvedItemName),
145}
146
147impl KafkaSourcePurificationError {
148 pub fn detail(&self) -> Option<String> {
149 match self {
150 Self::KafkaConsumerError(e) => Some(e.clone()),
151 _ => None,
152 }
153 }
154
155 pub fn hint(&self) -> Option<String> {
156 None
157 }
158}
159
160#[derive(Debug, Clone, thiserror::Error)]
162pub enum LoadGeneratorSourcePurificationError {
163 #[error("FOR ALL TABLES is only valid for multi-output sources")]
164 ForAllTables,
165 #[error("FOR SCHEMAS (..) unsupported")]
166 ForSchemas,
167 #[error("FOR TABLES (..) unsupported")]
168 ForTables,
169 #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
170 MultiOutputRequiresForAllTables,
171 #[error("multi-output sources require an external reference")]
172 MultiOutputRequiresExternalReference,
173 #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
174 WrongLoadGenerator(String, UnresolvedItemName),
175}
176
177impl LoadGeneratorSourcePurificationError {
178 pub fn detail(&self) -> Option<String> {
179 match self {
180 _ => None,
181 }
182 }
183
184 pub fn hint(&self) -> Option<String> {
185 match self {
186 _ => None,
187 }
188 }
189}
190
191#[derive(Debug, Clone, thiserror::Error)]
193pub enum KafkaSinkPurificationError {
194 #[error("{0} is not a KAFKA CONNECTION")]
195 NotKafkaConnection(FullItemName),
196 #[error("admin client errored")]
197 AdminClientError(Arc<ContextCreationError>),
198 #[error("zero brokers discovered in metadata request")]
199 ZeroBrokers,
200}
201
202impl KafkaSinkPurificationError {
203 pub fn detail(&self) -> Option<String> {
204 match self {
205 Self::AdminClientError(e) => Some(e.to_string_with_causes()),
206 _ => None,
207 }
208 }
209
210 pub fn hint(&self) -> Option<String> {
211 None
212 }
213}
214
215#[derive(Debug, Clone, thiserror::Error)]
216pub enum IcebergSinkPurificationError {
217 #[error("catalog connection errored")]
218 CatalogError(Arc<anyhow::Error>),
219 #[error("error loading aws sdk context")]
220 AwsSdkContextError(Arc<anyhow::Error>),
221 #[error("S3 Tables connection region mismatch")]
222 S3TablesRegionMismatch {
223 s3_tables_region: String,
224 environment_region: String,
225 },
226}
227
228impl IcebergSinkPurificationError {
229 pub fn detail(&self) -> Option<String> {
230 match self {
231 Self::CatalogError(e) => Some(e.to_string_with_causes()),
232 Self::AwsSdkContextError(e) => Some(e.to_string_with_causes()),
233 Self::S3TablesRegionMismatch {
234 s3_tables_region,
235 environment_region,
236 } => Some(format!(
237 "S3 Tables connection is configured for region '{}' but this Materialize environment is running in region '{}'",
238 s3_tables_region, environment_region
239 )),
240 }
241 }
242
243 pub fn hint(&self) -> Option<String> {
244 match self {
245 Self::S3TablesRegionMismatch {
246 environment_region, ..
247 } => Some(format!(
248 "Create a new AWS connection with REGION = '{}' to use with S3 Tables in this environment.",
249 environment_region
250 )),
251 _ => None,
252 }
253 }
254}
255
256use mz_ore::error::ErrorExt;
257
258#[derive(Debug, Clone, thiserror::Error)]
260pub enum CsrPurificationError {
261 #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
262 NotCsrConnection(FullItemName),
263 #[error("client errored")]
264 ClientError(Arc<CsrConnectError>),
265 #[error("list subjects failed")]
266 ListSubjectsError(Arc<ListError>),
267}
268
269impl CsrPurificationError {
270 pub fn detail(&self) -> Option<String> {
271 match self {
272 Self::ClientError(e) => Some(e.to_string_with_causes()),
273 Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
274 Self::NotCsrConnection(_) => None,
275 }
276 }
277
278 pub fn hint(&self) -> Option<String> {
279 None
280 }
281}
282
283#[derive(Debug, Clone, thiserror::Error)]
285pub enum GluePurificationError {
286 #[error("{0} is not an AWS GLUE SCHEMA REGISTRY CONNECTION")]
287 NotGlueConnection(FullItemName),
288 #[error("SCHEMA NAME option is required")]
289 MissingSchemaName,
290 #[error("loading AWS SDK configuration failed")]
291 LoadSdkConfigError(Arc<anyhow::Error>),
292 #[error("Glue schema lookup failed (registry {registry:?}, schema {schema:?})")]
293 SchemaLookupError {
294 registry: String,
295 schema: String,
296 #[source]
297 cause: Arc<mz_aws_glue_schema_registry::GetSchemaVersionError>,
298 },
299 #[error("Glue schema {schema:?} in registry {registry:?} has no definition")]
300 EmptyDefinition { registry: String, schema: String },
301 #[error(
302 "Glue schema {schema:?} in registry {registry:?} uses unsupported data format {format}; only AVRO is supported"
303 )]
304 UnsupportedDataFormat {
305 registry: String,
306 schema: String,
307 format: String,
308 },
309}
310
311impl GluePurificationError {
312 pub fn detail(&self) -> Option<String> {
313 match self {
314 Self::LoadSdkConfigError(e) => Some(e.to_string_with_causes()),
315 Self::SchemaLookupError { cause, .. } => Some(cause.to_string_with_causes()),
316 Self::NotGlueConnection(_)
317 | Self::MissingSchemaName
318 | Self::EmptyDefinition { .. }
319 | Self::UnsupportedDataFormat { .. } => None,
320 }
321 }
322
323 pub fn hint(&self) -> Option<String> {
324 None
325 }
326}
327
328#[derive(Debug, thiserror::Error)]
330pub enum MySqlSourcePurificationError {
331 #[error("User lacks required MySQL privileges")]
332 UserLacksPrivileges(Vec<(String, String)>),
333 #[error("CREATE SOURCE specifies DETAILS option")]
334 UserSpecifiedDetails,
335 #[error("{0} option is unnecessary when no tables are added")]
336 UnnecessaryOptionsWithoutReferences(String),
337 #[error("{0} is not a MYSQL CONNECTION")]
338 NotMySqlConnection(FullItemName),
339 #[error("referenced tables use unsupported types")]
340 UnrecognizedTypes { cols: Vec<(String, String, String)> },
341 #[error("duplicated column name references in table {0}: {1:?}")]
342 DuplicatedColumnNames(String, Vec<String>),
343 #[error("{option_name} refers to table not currently being added")]
344 DanglingColumns {
345 option_name: String,
346 items: Vec<UnresolvedItemName>,
347 },
348 #[error("Invalid MySQL table reference: {0}")]
349 InvalidTableReference(String),
350 #[error("No tables found for provided reference")]
351 EmptyDatabase,
352 #[error("missing TABLES specification")]
353 RequiresExternalReferences,
354 #[error("No tables found in referenced schemas")]
355 NoTablesFoundForSchemas(Vec<String>),
356 #[error(transparent)]
357 InvalidConnection(#[from] MySqlConnectionValidationError),
358 #[error(
359 "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
360 )]
361 UnsupportedBinlogMetadataSetting { setting: String },
362 #[error(
363 "You are using MySQL version {version}. Materialize requires MySQL 8.0.1 or later to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
364 )]
365 UnsupportedMySqlVersion { version: String },
366}
367
368impl MySqlSourcePurificationError {
369 pub fn detail(&self) -> Option<String> {
370 match self {
371 Self::UserLacksPrivileges(missing) => Some(format!(
372 "Missing MySQL privileges: {}",
373 itertools::join(
374 missing
375 .iter()
376 .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
377 ", "
378 )
379 )),
380 Self::DanglingColumns {
381 option_name: _,
382 items,
383 } => Some(format!(
384 "the following columns are referenced but not added: {}",
385 itertools::join(items, ", ")
386 )),
387 Self::UnrecognizedTypes { cols } => Some(format!(
388 "the following columns contain unsupported types:\n{}",
389 itertools::join(
390 cols.into_iter().map(|(table, column, data_type)| format!(
391 "'{}' for {}.{}",
392 data_type, column, table
393 )),
394 "\n"
395 )
396 )),
397 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
398 "missing schemas: {}",
399 itertools::join(schemas.iter(), ", ")
400 )),
401 Self::InvalidConnection(e) => e.detail(),
402 _ => None,
403 }
404 }
405
406 pub fn hint(&self) -> Option<String> {
407 match self {
408 Self::UserSpecifiedDetails => Some(
409 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
410 .into(),
411 ),
412 Self::RequiresExternalReferences => {
413 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
414 }
415 Self::InvalidTableReference(_) => Some(
416 "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
417 ),
418 Self::UnrecognizedTypes { cols: _ } => Some(
419 "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
420 ingest their values as text, or ignored using EXCLUDE COLUMNS."
421 .into(),
422 ),
423 Self::EmptyDatabase => Some(
424 "No tables were found to replicate. This could be because \
425 the user does not have privileges on the intended tables."
426 .into(),
427 ),
428 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
429 "Remove the {} option, as no tables are being added.",
430 option
431 )),
432 Self::InvalidConnection(e) => e.hint(),
433 _ => None,
434 }
435 }
436}
437
438#[derive(Debug, Clone, thiserror::Error)]
440pub enum SqlServerSourcePurificationError {
441 #[error("{0} is not a SQL SERVER CONNECTION")]
442 NotSqlServerConnection(FullItemName),
443 #[error("CREATE SOURCE specifies DETAILS option")]
444 UserSpecifiedDetails,
445 #[error("{0} option is unnecessary when no tables are added")]
446 UnnecessaryOptionsWithoutReferences(String),
447 #[error("missing TABLES specification")]
448 RequiresExternalReferences,
449 #[error("{option_name} refers to table not currently being added")]
450 DanglingColumns {
451 option_name: String,
452 items: Vec<UnresolvedItemName>,
453 },
454 #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
455 MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
456 #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
457 UnsupportedColumn {
458 schema_name: Arc<str>,
459 tbl_name: Arc<str>,
460 col_name: Arc<str>,
461 col_type: Arc<str>,
462 context: String,
463 },
464 #[error("Table {tbl_name} had all columns excluded")]
465 AllColumnsExcluded { tbl_name: Arc<str> },
466 #[error("No tables found for provided reference")]
467 NoTables,
468 #[error("programming error: {0}")]
469 ProgrammingError(String),
470 #[error("No start_lsn found for capture instance {0}")]
471 NoStartLsn(String),
472 #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
473 CdcMissingColumns {
474 capture_instance: Arc<str>,
475 col_names: Vec<Arc<str>>,
476 },
477}
478
479impl SqlServerSourcePurificationError {
480 pub fn detail(&self) -> Option<String> {
481 match self {
482 Self::DanglingColumns {
483 option_name: _,
484 items,
485 } => Some(format!(
486 "the following columns are referenced but not added: {}",
487 itertools::join(items, ", ")
488 )),
489 Self::UnsupportedColumn { context, .. } => Some(context.clone()),
490 _ => None,
491 }
492 }
493
494 pub fn hint(&self) -> Option<String> {
495 match self {
496 Self::RequiresExternalReferences => {
497 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
498 }
499 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
500 "Remove the {} option, as no tables are being added.",
501 option
502 )),
503 Self::NoTables => Some(
504 "No tables were found to replicate. This could be because \
505 the user does not have privileges on the intended tables."
506 .into(),
507 ),
508 Self::UnsupportedColumn { .. } => {
509 Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
510 }
511 _ => None,
512 }
513 }
514}