Skip to main content

mz_sql/pure/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::connections::{
17    MySqlConnectionValidationError, PostgresConnectionValidationError,
18};
19use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
20
21use crate::names::{FullItemName, PartialItemName};
22
23/// Logical errors detectable during purification for a POSTGRES SOURCE.
24#[derive(Debug, Clone, thiserror::Error)]
25pub enum PgSourcePurificationError {
26    #[error("CREATE SOURCE specifies DETAILS option")]
27    UserSpecifiedDetails,
28    #[error("{0} option is unnecessary when no tables are added")]
29    UnnecessaryOptionsWithoutReferences(String),
30    #[error("PUBLICATION {0} is empty")]
31    EmptyPublication(String),
32    #[error("database {database} missing referenced schemas")]
33    DatabaseMissingFilteredSchemas {
34        database: String,
35        schemas: Vec<String>,
36    },
37    #[error("missing TABLES specification")]
38    RequiresExternalReferences,
39    #[error("insufficient privileges")]
40    UserLacksUsageOnSchemas { schemas: Vec<String> },
41    #[error("insufficient privileges")]
42    UserLacksSelectOnTables { tables: Vec<String> },
43    #[error("one or more tables requires BYPASSRLS")]
44    BypassRLSRequired { tables: Vec<String> },
45    #[error("referenced items not tables with REPLICA IDENTITY FULL")]
46    NotTablesWReplicaIdentityFull { items: Vec<String> },
47    #[error("TEXT COLUMNS refers to table not currently being added")]
48    DanglingTextColumns { items: Vec<PartialItemName> },
49    #[error("EXCLUDE COLUMNS refers to table not currently being added")]
50    DanglingExcludeColumns { items: Vec<PartialItemName> },
51    #[error("duplicated column name references: {0:?}")]
52    DuplicatedColumnNames(Vec<String>),
53    #[error("referenced tables use unsupported types")]
54    UnrecognizedTypes { cols: Vec<(String, Oid)> },
55    #[error("{0} is not a POSTGRES CONNECTION")]
56    NotPgConnection(FullItemName),
57    #[error("CONNECTION must specify PUBLICATION")]
58    ConnectionMissingPublication,
59    #[error(transparent)]
60    InvalidConnection(PostgresConnectionValidationError),
61}
62
63impl PgSourcePurificationError {
64    pub fn detail(&self) -> Option<String> {
65        match self {
66            Self::DanglingTextColumns { items } => Some(format!(
67                "the following tables are referenced but not added: {}",
68                itertools::join(items, ", ")
69            )),
70            Self::DatabaseMissingFilteredSchemas {
71                database: _,
72                schemas,
73            } => Some(format!(
74                "missing schemas: {}",
75                itertools::join(schemas.iter(), ", ")
76            )),
77            Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
78                "user lacks USAGE privileges for schemas {}",
79                schemas.join(", ")
80            )),
81            Self::UserLacksSelectOnTables { tables } => Some(format!(
82                "user lacks SELECT privileges for tables {}",
83                tables.join(", ")
84            )),
85            Self::BypassRLSRequired { tables } => Some(format!(
86                "user must have BYPASSRLS attribute to read tables {}",
87                tables.join(", "),
88            )),
89            Self::NotTablesWReplicaIdentityFull { items } => {
90                Some(format!("referenced items: {}", items.join(", ")))
91            }
92            Self::UnrecognizedTypes { cols } => Some(format!(
93                "the following columns contain unsupported types:\n{}",
94                itertools::join(
95                    cols.into_iter()
96                        .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
97                    "\n"
98                )
99            )),
100            Self::InvalidConnection(e) => e.detail(),
101            _ => None,
102        }
103    }
104
105    pub fn hint(&self) -> Option<String> {
106        match self {
107            Self::UserSpecifiedDetails => Some(
108                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
109                    .into(),
110            ),
111            Self::RequiresExternalReferences => {
112                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
113            }
114            Self::UnrecognizedTypes {
115                cols: _,
116            } => Some(
117                "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
118                as text."
119                    .into(),
120            ),
121            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
122                "Remove the {} option, as no tables are being added.",
123                option
124            )),
125            Self::BypassRLSRequired { .. } => Some("Add the BYPASSRLS attribute to the Materialize user".into()),
126            Self::InvalidConnection(e) => e.hint(),
127            _ => None,
128        }
129    }
130}
131
132/// Logical errors detectable during purification for a KAFKA SOURCE.
133#[derive(Debug, Clone, thiserror::Error)]
134pub enum KafkaSourcePurificationError {
135    #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
136    ReferencedSubsources(ExternalReferences),
137    #[error("KAFKA CONNECTION without TOPIC")]
138    ConnectionMissingTopic,
139    #[error("{0} is not a KAFKA CONNECTION")]
140    NotKafkaConnection(FullItemName),
141    #[error("failed to create and connect Kafka consumer")]
142    KafkaConsumerError(String),
143    #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
144    WrongKafkaTopic(String, UnresolvedItemName),
145}
146
147impl KafkaSourcePurificationError {
148    pub fn detail(&self) -> Option<String> {
149        match self {
150            Self::KafkaConsumerError(e) => Some(e.clone()),
151            _ => None,
152        }
153    }
154
155    pub fn hint(&self) -> Option<String> {
156        None
157    }
158}
159
160/// Logical errors detectable during purification for a LOAD GENERATOR SOURCE.
161#[derive(Debug, Clone, thiserror::Error)]
162pub enum LoadGeneratorSourcePurificationError {
163    #[error("FOR ALL TABLES is only valid for multi-output sources")]
164    ForAllTables,
165    #[error("FOR SCHEMAS (..) unsupported")]
166    ForSchemas,
167    #[error("FOR TABLES (..) unsupported")]
168    ForTables,
169    #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
170    MultiOutputRequiresForAllTables,
171    #[error("multi-output sources require an external reference")]
172    MultiOutputRequiresExternalReference,
173    #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
174    WrongLoadGenerator(String, UnresolvedItemName),
175}
176
177impl LoadGeneratorSourcePurificationError {
178    pub fn detail(&self) -> Option<String> {
179        match self {
180            _ => None,
181        }
182    }
183
184    pub fn hint(&self) -> Option<String> {
185        match self {
186            _ => None,
187        }
188    }
189}
190
191/// Logical errors detectable during purification for a KAFKA SINK.
192#[derive(Debug, Clone, thiserror::Error)]
193pub enum KafkaSinkPurificationError {
194    #[error("{0} is not a KAFKA CONNECTION")]
195    NotKafkaConnection(FullItemName),
196    #[error("admin client errored")]
197    AdminClientError(Arc<ContextCreationError>),
198    #[error("zero brokers discovered in metadata request")]
199    ZeroBrokers,
200}
201
202impl KafkaSinkPurificationError {
203    pub fn detail(&self) -> Option<String> {
204        match self {
205            Self::AdminClientError(e) => Some(e.to_string_with_causes()),
206            _ => None,
207        }
208    }
209
210    pub fn hint(&self) -> Option<String> {
211        None
212    }
213}
214
215#[derive(Debug, Clone, thiserror::Error)]
216pub enum IcebergSinkPurificationError {
217    #[error("catalog connection errored")]
218    CatalogError(Arc<anyhow::Error>),
219    #[error("error loading aws sdk context")]
220    AwsSdkContextError(Arc<anyhow::Error>),
221    #[error("S3 Tables connection region mismatch")]
222    S3TablesRegionMismatch {
223        s3_tables_region: String,
224        environment_region: String,
225    },
226}
227
228impl IcebergSinkPurificationError {
229    pub fn detail(&self) -> Option<String> {
230        match self {
231            Self::CatalogError(e) => Some(e.to_string_with_causes()),
232            Self::AwsSdkContextError(e) => Some(e.to_string_with_causes()),
233            Self::S3TablesRegionMismatch {
234                s3_tables_region,
235                environment_region,
236            } => Some(format!(
237                "S3 Tables connection is configured for region '{}' but this Materialize environment is running in region '{}'",
238                s3_tables_region, environment_region
239            )),
240        }
241    }
242
243    pub fn hint(&self) -> Option<String> {
244        match self {
245            Self::S3TablesRegionMismatch {
246                environment_region, ..
247            } => Some(format!(
248                "Create a new AWS connection with REGION = '{}' to use with S3 Tables in this environment.",
249                environment_region
250            )),
251            _ => None,
252        }
253    }
254}
255
256use mz_ore::error::ErrorExt;
257
258/// Logical errors detectable during purification for Confluent Schema Registry.
259#[derive(Debug, Clone, thiserror::Error)]
260pub enum CsrPurificationError {
261    #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
262    NotCsrConnection(FullItemName),
263    #[error("client errored")]
264    ClientError(Arc<CsrConnectError>),
265    #[error("list subjects failed")]
266    ListSubjectsError(Arc<ListError>),
267}
268
269impl CsrPurificationError {
270    pub fn detail(&self) -> Option<String> {
271        match self {
272            Self::ClientError(e) => Some(e.to_string_with_causes()),
273            Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
274            Self::NotCsrConnection(_) => None,
275        }
276    }
277
278    pub fn hint(&self) -> Option<String> {
279        None
280    }
281}
282
283/// Logical errors detectable during purification for AWS Glue Schema Registry.
284#[derive(Debug, Clone, thiserror::Error)]
285pub enum GluePurificationError {
286    #[error("{0} is not an AWS GLUE SCHEMA REGISTRY CONNECTION")]
287    NotGlueConnection(FullItemName),
288    #[error("SCHEMA NAME option is required")]
289    MissingSchemaName,
290    #[error("loading AWS SDK configuration failed")]
291    LoadSdkConfigError(Arc<anyhow::Error>),
292    #[error("Glue schema lookup failed (registry {registry:?}, schema {schema:?})")]
293    SchemaLookupError {
294        registry: String,
295        schema: String,
296        #[source]
297        cause: Arc<mz_aws_glue_schema_registry::GetSchemaVersionError>,
298    },
299    #[error("Glue schema {schema:?} in registry {registry:?} has no definition")]
300    EmptyDefinition { registry: String, schema: String },
301    #[error(
302        "Glue schema {schema:?} in registry {registry:?} uses unsupported data format {format}; only AVRO is supported"
303    )]
304    UnsupportedDataFormat {
305        registry: String,
306        schema: String,
307        format: String,
308    },
309}
310
311impl GluePurificationError {
312    pub fn detail(&self) -> Option<String> {
313        match self {
314            Self::LoadSdkConfigError(e) => Some(e.to_string_with_causes()),
315            Self::SchemaLookupError { cause, .. } => Some(cause.to_string_with_causes()),
316            Self::NotGlueConnection(_)
317            | Self::MissingSchemaName
318            | Self::EmptyDefinition { .. }
319            | Self::UnsupportedDataFormat { .. } => None,
320        }
321    }
322
323    pub fn hint(&self) -> Option<String> {
324        None
325    }
326}
327
328/// Logical errors detectable during purification for a MySQL SOURCE.
329#[derive(Debug, thiserror::Error)]
330pub enum MySqlSourcePurificationError {
331    #[error("User lacks required MySQL privileges")]
332    UserLacksPrivileges(Vec<(String, String)>),
333    #[error("CREATE SOURCE specifies DETAILS option")]
334    UserSpecifiedDetails,
335    #[error("{0} option is unnecessary when no tables are added")]
336    UnnecessaryOptionsWithoutReferences(String),
337    #[error("{0} is not a MYSQL CONNECTION")]
338    NotMySqlConnection(FullItemName),
339    #[error("referenced tables use unsupported types")]
340    UnrecognizedTypes { cols: Vec<(String, String, String)> },
341    #[error("duplicated column name references in table {0}: {1:?}")]
342    DuplicatedColumnNames(String, Vec<String>),
343    #[error("{option_name} refers to table not currently being added")]
344    DanglingColumns {
345        option_name: String,
346        items: Vec<UnresolvedItemName>,
347    },
348    #[error("Invalid MySQL table reference: {0}")]
349    InvalidTableReference(String),
350    #[error("No tables found for provided reference")]
351    EmptyDatabase,
352    #[error("missing TABLES specification")]
353    RequiresExternalReferences,
354    #[error("No tables found in referenced schemas")]
355    NoTablesFoundForSchemas(Vec<String>),
356    #[error(transparent)]
357    InvalidConnection(#[from] MySqlConnectionValidationError),
358    #[error(
359        "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
360    )]
361    UnsupportedBinlogMetadataSetting { setting: String },
362    #[error(
363        "You are using MySQL version {version}. Materialize requires MySQL 8.0.1 or later to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
364    )]
365    UnsupportedMySqlVersion { version: String },
366}
367
368impl MySqlSourcePurificationError {
369    pub fn detail(&self) -> Option<String> {
370        match self {
371            Self::UserLacksPrivileges(missing) => Some(format!(
372                "Missing MySQL privileges: {}",
373                itertools::join(
374                    missing
375                        .iter()
376                        .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
377                    ", "
378                )
379            )),
380            Self::DanglingColumns {
381                option_name: _,
382                items,
383            } => Some(format!(
384                "the following columns are referenced but not added: {}",
385                itertools::join(items, ", ")
386            )),
387            Self::UnrecognizedTypes { cols } => Some(format!(
388                "the following columns contain unsupported types:\n{}",
389                itertools::join(
390                    cols.into_iter().map(|(table, column, data_type)| format!(
391                        "'{}' for {}.{}",
392                        data_type, column, table
393                    )),
394                    "\n"
395                )
396            )),
397            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
398                "missing schemas: {}",
399                itertools::join(schemas.iter(), ", ")
400            )),
401            Self::InvalidConnection(e) => e.detail(),
402            _ => None,
403        }
404    }
405
406    pub fn hint(&self) -> Option<String> {
407        match self {
408            Self::UserSpecifiedDetails => Some(
409                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
410                    .into(),
411            ),
412            Self::RequiresExternalReferences => {
413                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
414            }
415            Self::InvalidTableReference(_) => Some(
416                "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
417            ),
418            Self::UnrecognizedTypes { cols: _ } => Some(
419                "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
420                ingest their values as text, or ignored using EXCLUDE COLUMNS."
421                    .into(),
422            ),
423            Self::EmptyDatabase => Some(
424                "No tables were found to replicate. This could be because \
425                the user does not have privileges on the intended tables."
426                    .into(),
427            ),
428            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
429                "Remove the {} option, as no tables are being added.",
430                option
431            )),
432            Self::InvalidConnection(e) => e.hint(),
433            _ => None,
434        }
435    }
436}
437
438/// Logical errors detectable during purification for a SQL Server SOURCE.
439#[derive(Debug, Clone, thiserror::Error)]
440pub enum SqlServerSourcePurificationError {
441    #[error("{0} is not a SQL SERVER CONNECTION")]
442    NotSqlServerConnection(FullItemName),
443    #[error("CREATE SOURCE specifies DETAILS option")]
444    UserSpecifiedDetails,
445    #[error("{0} option is unnecessary when no tables are added")]
446    UnnecessaryOptionsWithoutReferences(String),
447    #[error("missing TABLES specification")]
448    RequiresExternalReferences,
449    #[error("{option_name} refers to table not currently being added")]
450    DanglingColumns {
451        option_name: String,
452        items: Vec<UnresolvedItemName>,
453    },
454    #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
455    MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
456    #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
457    UnsupportedColumn {
458        schema_name: Arc<str>,
459        tbl_name: Arc<str>,
460        col_name: Arc<str>,
461        col_type: Arc<str>,
462        context: String,
463    },
464    #[error("Table {tbl_name} had all columns excluded")]
465    AllColumnsExcluded { tbl_name: Arc<str> },
466    #[error("No tables found for provided reference")]
467    NoTables,
468    #[error("programming error: {0}")]
469    ProgrammingError(String),
470    #[error("No start_lsn found for capture instance {0}")]
471    NoStartLsn(String),
472    #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
473    CdcMissingColumns {
474        capture_instance: Arc<str>,
475        col_names: Vec<Arc<str>>,
476    },
477}
478
479impl SqlServerSourcePurificationError {
480    pub fn detail(&self) -> Option<String> {
481        match self {
482            Self::DanglingColumns {
483                option_name: _,
484                items,
485            } => Some(format!(
486                "the following columns are referenced but not added: {}",
487                itertools::join(items, ", ")
488            )),
489            Self::UnsupportedColumn { context, .. } => Some(context.clone()),
490            _ => None,
491        }
492    }
493
494    pub fn hint(&self) -> Option<String> {
495        match self {
496            Self::RequiresExternalReferences => {
497                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
498            }
499            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
500                "Remove the {} option, as no tables are being added.",
501                option
502            )),
503            Self::NoTables => Some(
504                "No tables were found to replicate. This could be because \
505                the user does not have privileges on the intended tables."
506                    .into(),
507            ),
508            Self::UnsupportedColumn { .. } => {
509                Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
510            }
511            _ => None,
512        }
513    }
514}