mz_sql/pure/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use aws_sdk_sts::operation::get_caller_identity::GetCallerIdentityError;
13use mz_ccsr::ListError;
14use mz_repr::adt::system::Oid;
15use mz_sql_parser::ast::display::AstDisplay;
16use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
17use mz_storage_types::connections::{
18    MySqlConnectionValidationError, PostgresConnectionValidationError,
19};
20use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
21
22use crate::names::{FullItemName, PartialItemName};
23
24/// Logical errors detectable during purification for a POSTGRES SOURCE.
25#[derive(Debug, Clone, thiserror::Error)]
26pub enum PgSourcePurificationError {
27    #[error("CREATE SOURCE specifies DETAILS option")]
28    UserSpecifiedDetails,
29    #[error("{0} option is unnecessary when no tables are added")]
30    UnnecessaryOptionsWithoutReferences(String),
31    #[error("PUBLICATION {0} is empty")]
32    EmptyPublication(String),
33    #[error("database {database} missing referenced schemas")]
34    DatabaseMissingFilteredSchemas {
35        database: String,
36        schemas: Vec<String>,
37    },
38    #[error("missing TABLES specification")]
39    RequiresExternalReferences,
40    #[error("insufficient privileges")]
41    UserLacksUsageOnSchemas { schemas: Vec<String> },
42    #[error("insufficient privileges")]
43    UserLacksSelectOnTables { tables: Vec<String> },
44    #[error("one or more tables requires BYPASSRLS")]
45    BypassRLSRequired { tables: Vec<String> },
46    #[error("referenced items not tables with REPLICA IDENTITY FULL")]
47    NotTablesWReplicaIdentityFull { items: Vec<String> },
48    #[error("TEXT COLUMNS refers to table not currently being added")]
49    DanglingTextColumns { items: Vec<PartialItemName> },
50    #[error("EXCLUDE COLUMNS refers to table not currently being added")]
51    DanglingExcludeColumns { items: Vec<PartialItemName> },
52    #[error("duplicated column name references: {0:?}")]
53    DuplicatedColumnNames(Vec<String>),
54    #[error("referenced tables use unsupported types")]
55    UnrecognizedTypes { cols: Vec<(String, Oid)> },
56    #[error("{0} is not a POSTGRES CONNECTION")]
57    NotPgConnection(FullItemName),
58    #[error("CONNECTION must specify PUBLICATION")]
59    ConnectionMissingPublication,
60    #[error(transparent)]
61    InvalidConnection(PostgresConnectionValidationError),
62}
63
64impl PgSourcePurificationError {
65    pub fn detail(&self) -> Option<String> {
66        match self {
67            Self::DanglingTextColumns { items } => Some(format!(
68                "the following tables are referenced but not added: {}",
69                itertools::join(items, ", ")
70            )),
71            Self::DatabaseMissingFilteredSchemas {
72                database: _,
73                schemas,
74            } => Some(format!(
75                "missing schemas: {}",
76                itertools::join(schemas.iter(), ", ")
77            )),
78            Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
79                "user lacks USAGE privileges for schemas {}",
80                schemas.join(", ")
81            )),
82            Self::UserLacksSelectOnTables { tables } => Some(format!(
83                "user lacks SELECT privileges for tables {}",
84                tables.join(", ")
85            )),
86            Self::BypassRLSRequired { tables } => Some(format!(
87                "user must have BYPASSRLS attribute to read tables {}",
88                tables.join(", "),
89            )),
90            Self::NotTablesWReplicaIdentityFull { items } => {
91                Some(format!("referenced items: {}", items.join(", ")))
92            }
93            Self::UnrecognizedTypes { cols } => Some(format!(
94                "the following columns contain unsupported types:\n{}",
95                itertools::join(
96                    cols.into_iter()
97                        .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
98                    "\n"
99                )
100            )),
101            Self::InvalidConnection(e) => e.detail(),
102            _ => None,
103        }
104    }
105
106    pub fn hint(&self) -> Option<String> {
107        match self {
108            Self::UserSpecifiedDetails => Some(
109                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
110                    .into(),
111            ),
112            Self::RequiresExternalReferences => {
113                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
114            }
115            Self::UnrecognizedTypes {
116                cols: _,
117            } => Some(
118                "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
119                as text."
120                    .into(),
121            ),
122            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
123                "Remove the {} option, as no tables are being added.",
124                option
125            )),
126            Self::BypassRLSRequired { .. } => Some("Add the BYPASSRLS attribute to the Materialize user".into()),
127            Self::InvalidConnection(e) => e.hint(),
128            _ => None,
129        }
130    }
131}
132
133/// Logical errors detectable during purification for a KAFKA SOURCE.
134#[derive(Debug, Clone, thiserror::Error)]
135pub enum KafkaSourcePurificationError {
136    #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
137    ReferencedSubsources(ExternalReferences),
138    #[error("KAFKA CONNECTION without TOPIC")]
139    ConnectionMissingTopic,
140    #[error("{0} is not a KAFKA CONNECTION")]
141    NotKafkaConnection(FullItemName),
142    #[error("failed to create and connect Kafka consumer")]
143    KafkaConsumerError(String),
144    #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
145    WrongKafkaTopic(String, UnresolvedItemName),
146}
147
148impl KafkaSourcePurificationError {
149    pub fn detail(&self) -> Option<String> {
150        match self {
151            Self::KafkaConsumerError(e) => Some(e.clone()),
152            _ => None,
153        }
154    }
155
156    pub fn hint(&self) -> Option<String> {
157        None
158    }
159}
160
161/// Logical errors detectable during purification for a LOAD GENERATOR SOURCE.
162#[derive(Debug, Clone, thiserror::Error)]
163pub enum LoadGeneratorSourcePurificationError {
164    #[error("FOR ALL TABLES is only valid for multi-output sources")]
165    ForAllTables,
166    #[error("FOR SCHEMAS (..) unsupported")]
167    ForSchemas,
168    #[error("FOR TABLES (..) unsupported")]
169    ForTables,
170    #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
171    MultiOutputRequiresForAllTables,
172    #[error("multi-output sources require an external reference")]
173    MultiOutputRequiresExternalReference,
174    #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
175    WrongLoadGenerator(String, UnresolvedItemName),
176}
177
178impl LoadGeneratorSourcePurificationError {
179    pub fn detail(&self) -> Option<String> {
180        match self {
181            _ => None,
182        }
183    }
184
185    pub fn hint(&self) -> Option<String> {
186        match self {
187            _ => None,
188        }
189    }
190}
191
192/// Logical errors detectable during purification for a KAFKA SINK.
193#[derive(Debug, Clone, thiserror::Error)]
194pub enum KafkaSinkPurificationError {
195    #[error("{0} is not a KAFKA CONNECTION")]
196    NotKafkaConnection(FullItemName),
197    #[error("admin client errored")]
198    AdminClientError(Arc<ContextCreationError>),
199    #[error("zero brokers discovered in metadata request")]
200    ZeroBrokers,
201}
202
203impl KafkaSinkPurificationError {
204    pub fn detail(&self) -> Option<String> {
205        match self {
206            Self::AdminClientError(e) => Some(e.to_string_with_causes()),
207            _ => None,
208        }
209    }
210
211    pub fn hint(&self) -> Option<String> {
212        None
213    }
214}
215
216#[derive(Debug, Clone, thiserror::Error)]
217pub enum IcebergSinkPurificationError {
218    #[error("catalog connection errored")]
219    CatalogError(Arc<anyhow::Error>),
220    #[error("error loading aws sdk context")]
221    AwsSdkContextError(Arc<anyhow::Error>),
222    #[error("error listing sts identity")]
223    StsIdentityError(Arc<GetCallerIdentityError>),
224}
225
226impl IcebergSinkPurificationError {
227    pub fn detail(&self) -> Option<String> {
228        match self {
229            Self::CatalogError(e) => Some(e.to_string_with_causes()),
230            Self::AwsSdkContextError(e) => Some(e.to_string_with_causes()),
231            Self::StsIdentityError(e) => Some(e.to_string_with_causes()),
232        }
233    }
234
235    pub fn hint(&self) -> Option<String> {
236        None
237    }
238}
239
240use mz_ore::error::ErrorExt;
241
242/// Logical errors detectable during purification for Confluent Schema Registry.
243#[derive(Debug, Clone, thiserror::Error)]
244pub enum CsrPurificationError {
245    #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
246    NotCsrConnection(FullItemName),
247    #[error("client errored")]
248    ClientError(Arc<CsrConnectError>),
249    #[error("list subjects failed")]
250    ListSubjectsError(Arc<ListError>),
251}
252
253impl CsrPurificationError {
254    pub fn detail(&self) -> Option<String> {
255        match self {
256            Self::ClientError(e) => Some(e.to_string_with_causes()),
257            Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
258            _ => None,
259        }
260    }
261
262    pub fn hint(&self) -> Option<String> {
263        None
264    }
265}
266
267/// Logical errors detectable during purification for a MySQL SOURCE.
268#[derive(Debug, thiserror::Error)]
269pub enum MySqlSourcePurificationError {
270    #[error("User lacks required MySQL privileges")]
271    UserLacksPrivileges(Vec<(String, String)>),
272    #[error("CREATE SOURCE specifies DETAILS option")]
273    UserSpecifiedDetails,
274    #[error("{0} option is unnecessary when no tables are added")]
275    UnnecessaryOptionsWithoutReferences(String),
276    #[error("{0} is not a MYSQL CONNECTION")]
277    NotMySqlConnection(FullItemName),
278    #[error("referenced tables use unsupported types")]
279    UnrecognizedTypes { cols: Vec<(String, String, String)> },
280    #[error("duplicated column name references in table {0}: {1:?}")]
281    DuplicatedColumnNames(String, Vec<String>),
282    #[error("{option_name} refers to table not currently being added")]
283    DanglingColumns {
284        option_name: String,
285        items: Vec<UnresolvedItemName>,
286    },
287    #[error("Invalid MySQL table reference: {0}")]
288    InvalidTableReference(String),
289    #[error("No tables found for provided reference")]
290    EmptyDatabase,
291    #[error("missing TABLES specification")]
292    RequiresExternalReferences,
293    #[error("No tables found in referenced schemas")]
294    NoTablesFoundForSchemas(Vec<String>),
295    #[error(transparent)]
296    InvalidConnection(#[from] MySqlConnectionValidationError),
297}
298
299impl MySqlSourcePurificationError {
300    pub fn detail(&self) -> Option<String> {
301        match self {
302            Self::UserLacksPrivileges(missing) => Some(format!(
303                "Missing MySQL privileges: {}",
304                itertools::join(
305                    missing
306                        .iter()
307                        .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
308                    ", "
309                )
310            )),
311            Self::DanglingColumns {
312                option_name: _,
313                items,
314            } => Some(format!(
315                "the following columns are referenced but not added: {}",
316                itertools::join(items, ", ")
317            )),
318            Self::UnrecognizedTypes { cols } => Some(format!(
319                "the following columns contain unsupported types:\n{}",
320                itertools::join(
321                    cols.into_iter().map(|(table, column, data_type)| format!(
322                        "'{}' for {}.{}",
323                        data_type, column, table
324                    )),
325                    "\n"
326                )
327            )),
328            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
329                "missing schemas: {}",
330                itertools::join(schemas.iter(), ", ")
331            )),
332            Self::InvalidConnection(e) => e.detail(),
333            _ => None,
334        }
335    }
336
337    pub fn hint(&self) -> Option<String> {
338        match self {
339            Self::UserSpecifiedDetails => Some(
340                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
341                    .into(),
342            ),
343            Self::RequiresExternalReferences => {
344                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
345            }
346            Self::InvalidTableReference(_) => Some(
347                "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
348            ),
349            Self::UnrecognizedTypes { cols: _ } => Some(
350                "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
351                ingest their values as text, or ignored using EXCLUDE COLUMNS."
352                    .into(),
353            ),
354            Self::EmptyDatabase => Some(
355                "No tables were found to replicate. This could be because \
356                the user does not have privileges on the intended tables."
357                    .into(),
358            ),
359            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
360                "Remove the {} option, as no tables are being added.",
361                option
362            )),
363            Self::InvalidConnection(e) => e.hint(),
364            _ => None,
365        }
366    }
367}
368
369/// Logical errors detectable during purification for a SQL Server SOURCE.
370#[derive(Debug, Clone, thiserror::Error)]
371pub enum SqlServerSourcePurificationError {
372    #[error("{0} is not a SQL SERVER CONNECTION")]
373    NotSqlServerConnection(FullItemName),
374    #[error("CREATE SOURCE specifies DETAILS option")]
375    UserSpecifiedDetails,
376    #[error("{0} option is unnecessary when no tables are added")]
377    UnnecessaryOptionsWithoutReferences(String),
378    #[error("missing TABLES specification")]
379    RequiresExternalReferences,
380    #[error("{option_name} refers to table not currently being added")]
381    DanglingColumns {
382        option_name: String,
383        items: Vec<UnresolvedItemName>,
384    },
385    #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
386    MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
387    #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
388    UnsupportedColumn {
389        schema_name: Arc<str>,
390        tbl_name: Arc<str>,
391        col_name: Arc<str>,
392        col_type: Arc<str>,
393        context: String,
394    },
395    #[error("Table {tbl_name} had all columns excluded")]
396    AllColumnsExcluded { tbl_name: Arc<str> },
397    #[error("No tables found for provided reference")]
398    NoTables,
399    #[error("programming error: {0}")]
400    ProgrammingError(String),
401    #[error("No start_lsn found for capture instance {0}")]
402    NoStartLsn(String),
403    #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
404    CdcMissingColumns {
405        capture_instance: Arc<str>,
406        col_names: Vec<Arc<str>>,
407    },
408}
409
410impl SqlServerSourcePurificationError {
411    pub fn detail(&self) -> Option<String> {
412        match self {
413            Self::DanglingColumns {
414                option_name: _,
415                items,
416            } => Some(format!(
417                "the following columns are referenced but not added: {}",
418                itertools::join(items, ", ")
419            )),
420            Self::UnsupportedColumn { context, .. } => Some(context.clone()),
421            _ => None,
422        }
423    }
424
425    pub fn hint(&self) -> Option<String> {
426        match self {
427            Self::RequiresExternalReferences => {
428                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
429            }
430            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
431                "Remove the {} option, as no tables are being added.",
432                option
433            )),
434            Self::NoTables => Some(
435                "No tables were found to replicate. This could be because \
436                the user does not have privileges on the intended tables."
437                    .into(),
438            ),
439            Self::UnsupportedColumn { .. } => {
440                Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
441            }
442            _ => None,
443        }
444    }
445}