mz_sql/pure/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::connections::{
17    MySqlConnectionValidationError, PostgresConnectionValidationError,
18};
19use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
20
21use crate::names::{FullItemName, PartialItemName};
22
23/// Logical errors detectable during purification for a POSTGRES SOURCE.
24#[derive(Debug, Clone, thiserror::Error)]
25pub enum PgSourcePurificationError {
26    #[error("CREATE SOURCE specifies DETAILS option")]
27    UserSpecifiedDetails,
28    #[error("{0} option is unnecessary when no tables are added")]
29    UnnecessaryOptionsWithoutReferences(String),
30    #[error("PUBLICATION {0} is empty")]
31    EmptyPublication(String),
32    #[error("database {database} missing referenced schemas")]
33    DatabaseMissingFilteredSchemas {
34        database: String,
35        schemas: Vec<String>,
36    },
37    #[error("missing TABLES specification")]
38    RequiresExternalReferences,
39    #[error("insufficient privileges")]
40    UserLacksUsageOnSchemas { schemas: Vec<String> },
41    #[error("insufficient privileges")]
42    UserLacksSelectOnTables { tables: Vec<String> },
43    #[error("referenced items not tables with REPLICA IDENTITY FULL")]
44    NotTablesWReplicaIdentityFull { items: Vec<String> },
45    #[error("TEXT COLUMNS refers to table not currently being added")]
46    DanglingTextColumns { items: Vec<PartialItemName> },
47    #[error("referenced tables use unsupported types")]
48    UnrecognizedTypes { cols: Vec<(String, Oid)> },
49    #[error("{0} is not a POSTGRES CONNECTION")]
50    NotPgConnection(FullItemName),
51    #[error("CONNECTION must specify PUBLICATION")]
52    ConnectionMissingPublication,
53    #[error(transparent)]
54    InvalidConnection(PostgresConnectionValidationError),
55}
56
57impl PgSourcePurificationError {
58    pub fn detail(&self) -> Option<String> {
59        match self {
60            Self::DanglingTextColumns { items } => Some(format!(
61                "the following tables are referenced but not added: {}",
62                itertools::join(items, ", ")
63            )),
64            Self::DatabaseMissingFilteredSchemas {
65                database: _,
66                schemas,
67            } => Some(format!(
68                "missing schemas: {}",
69                itertools::join(schemas.iter(), ", ")
70            )),
71            Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
72                "user lacks USAGE privileges for schemas {}",
73                schemas.join(", ")
74            )),
75            Self::UserLacksSelectOnTables { tables } => Some(format!(
76                "user lacks SELECT privileges for tables {}",
77                tables.join(", ")
78            )),
79            Self::NotTablesWReplicaIdentityFull { items } => {
80                Some(format!("referenced items: {}", items.join(", ")))
81            }
82            Self::UnrecognizedTypes { cols } => Some(format!(
83                "the following columns contain unsupported types:\n{}",
84                itertools::join(
85                    cols.into_iter()
86                        .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
87                    "\n"
88                )
89            )),
90            Self::InvalidConnection(e) => e.detail(),
91            _ => None,
92        }
93    }
94
95    pub fn hint(&self) -> Option<String> {
96        match self {
97            Self::UserSpecifiedDetails => Some(
98                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
99                    .into(),
100            ),
101            Self::RequiresExternalReferences => {
102                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
103            }
104            Self::UnrecognizedTypes {
105                cols: _,
106            } => Some(
107                "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
108                as text."
109                    .into(),
110            ),
111            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
112                "Remove the {} option, as no tables are being added.",
113                option
114            )),
115            Self::InvalidConnection(e) => e.hint(),
116            _ => None,
117        }
118    }
119}
120
121/// Logical errors detectable during purification for a KAFKA SOURCE.
122#[derive(Debug, Clone, thiserror::Error)]
123pub enum KafkaSourcePurificationError {
124    #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
125    ReferencedSubsources(ExternalReferences),
126    #[error("KAFKA CONNECTION without TOPIC")]
127    ConnectionMissingTopic,
128    #[error("{0} is not a KAFKA CONNECTION")]
129    NotKafkaConnection(FullItemName),
130    #[error("failed to create and connect Kafka consumer")]
131    KafkaConsumerError(String),
132    #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
133    WrongKafkaTopic(String, UnresolvedItemName),
134}
135
136impl KafkaSourcePurificationError {
137    pub fn detail(&self) -> Option<String> {
138        match self {
139            Self::KafkaConsumerError(e) => Some(e.clone()),
140            _ => None,
141        }
142    }
143
144    pub fn hint(&self) -> Option<String> {
145        None
146    }
147}
148
149/// Logical errors detectable during purification for a LOAD GENERATOR SOURCE.
150#[derive(Debug, Clone, thiserror::Error)]
151pub enum LoadGeneratorSourcePurificationError {
152    #[error("FOR ALL TABLES is only valid for multi-output sources")]
153    ForAllTables,
154    #[error("FOR SCHEMAS (..) unsupported")]
155    ForSchemas,
156    #[error("FOR TABLES (..) unsupported")]
157    ForTables,
158    #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
159    MultiOutputRequiresForAllTables,
160    #[error("multi-output sources require an external reference")]
161    MultiOutputRequiresExternalReference,
162    #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
163    WrongLoadGenerator(String, UnresolvedItemName),
164}
165
166impl LoadGeneratorSourcePurificationError {
167    pub fn detail(&self) -> Option<String> {
168        match self {
169            _ => None,
170        }
171    }
172
173    pub fn hint(&self) -> Option<String> {
174        match self {
175            _ => None,
176        }
177    }
178}
179
180/// Logical errors detectable during purification for a KAFKA SINK.
181#[derive(Debug, Clone, thiserror::Error)]
182pub enum KafkaSinkPurificationError {
183    #[error("{0} is not a KAFKA CONNECTION")]
184    NotKafkaConnection(FullItemName),
185    #[error("admin client errored")]
186    AdminClientError(Arc<ContextCreationError>),
187    #[error("zero brokers discovered in metadata request")]
188    ZeroBrokers,
189}
190
191impl KafkaSinkPurificationError {
192    pub fn detail(&self) -> Option<String> {
193        match self {
194            Self::AdminClientError(e) => Some(e.to_string_with_causes()),
195            _ => None,
196        }
197    }
198
199    pub fn hint(&self) -> Option<String> {
200        None
201    }
202}
203
204use mz_ore::error::ErrorExt;
205
206/// Logical errors detectable during purification for Confluent Schema Registry.
207#[derive(Debug, Clone, thiserror::Error)]
208pub enum CsrPurificationError {
209    #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
210    NotCsrConnection(FullItemName),
211    #[error("client errored")]
212    ClientError(Arc<CsrConnectError>),
213    #[error("list subjects failed")]
214    ListSubjectsError(Arc<ListError>),
215}
216
217impl CsrPurificationError {
218    pub fn detail(&self) -> Option<String> {
219        match self {
220            Self::ClientError(e) => Some(e.to_string_with_causes()),
221            Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
222            _ => None,
223        }
224    }
225
226    pub fn hint(&self) -> Option<String> {
227        None
228    }
229}
230
231/// Logical errors detectable during purification for a MySQL SOURCE.
232#[derive(Debug, thiserror::Error)]
233pub enum MySqlSourcePurificationError {
234    #[error("User lacks required MySQL privileges")]
235    UserLacksPrivileges(Vec<(String, String)>),
236    #[error("CREATE SOURCE specifies DETAILS option")]
237    UserSpecifiedDetails,
238    #[error("{0} option is unnecessary when no tables are added")]
239    UnnecessaryOptionsWithoutReferences(String),
240    #[error("{0} is not a MYSQL CONNECTION")]
241    NotMySqlConnection(FullItemName),
242    #[error("referenced tables use unsupported types")]
243    UnrecognizedTypes { cols: Vec<(String, String, String)> },
244    #[error("duplicated column name references in table {0}: {1:?}")]
245    DuplicatedColumnNames(String, Vec<String>),
246    #[error("{option_name} refers to table not currently being added")]
247    DanglingColumns {
248        option_name: String,
249        items: Vec<UnresolvedItemName>,
250    },
251    #[error("Invalid MySQL table reference: {0}")]
252    InvalidTableReference(String),
253    #[error("No tables found for provided reference")]
254    EmptyDatabase,
255    #[error("missing TABLES specification")]
256    RequiresExternalReferences,
257    #[error("No tables found in referenced schemas")]
258    NoTablesFoundForSchemas(Vec<String>),
259    #[error(transparent)]
260    InvalidConnection(#[from] MySqlConnectionValidationError),
261}
262
263impl MySqlSourcePurificationError {
264    pub fn detail(&self) -> Option<String> {
265        match self {
266            Self::UserLacksPrivileges(missing) => Some(format!(
267                "Missing MySQL privileges: {}",
268                itertools::join(
269                    missing
270                        .iter()
271                        .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
272                    ", "
273                )
274            )),
275            Self::DanglingColumns {
276                option_name: _,
277                items,
278            } => Some(format!(
279                "the following columns are referenced but not added: {}",
280                itertools::join(items, ", ")
281            )),
282            Self::UnrecognizedTypes { cols } => Some(format!(
283                "the following columns contain unsupported types:\n{}",
284                itertools::join(
285                    cols.into_iter().map(|(table, column, data_type)| format!(
286                        "'{}' for {}.{}",
287                        data_type, column, table
288                    )),
289                    "\n"
290                )
291            )),
292            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
293                "missing schemas: {}",
294                itertools::join(schemas.iter(), ", ")
295            )),
296            Self::InvalidConnection(e) => e.detail(),
297            _ => None,
298        }
299    }
300
301    pub fn hint(&self) -> Option<String> {
302        match self {
303            Self::UserSpecifiedDetails => Some(
304                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
305                    .into(),
306            ),
307            Self::RequiresExternalReferences => {
308                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
309            }
310            Self::InvalidTableReference(_) => Some(
311                "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
312            ),
313            Self::UnrecognizedTypes { cols: _ } => Some(
314                "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
315                ingest their values as text, or ignored using EXCLUDE COLUMNS."
316                    .into(),
317            ),
318            Self::EmptyDatabase => Some(
319                "No tables were found to replicate. This could be because \
320                the user does not have privileges on the intended tables."
321                    .into(),
322            ),
323            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
324                "Remove the {} option, as no tables are being added.",
325                option
326            )),
327            Self::InvalidConnection(e) => e.hint(),
328            _ => None,
329        }
330    }
331}
332
333/// Logical errors detectable during purification for a SQL Server SOURCE.
334#[derive(Debug, Clone, thiserror::Error)]
335pub enum SqlServerSourcePurificationError {
336    #[error("{0} is not a SQL SERVER CONNECTION")]
337    NotSqlServerConnection(FullItemName),
338    #[error("CREATE SOURCE specifies DETAILS option")]
339    UserSpecifiedDetails,
340    #[error("{0} option is unnecessary when no tables are added")]
341    UnnecessaryOptionsWithoutReferences(String),
342    #[error("missing TABLES specification")]
343    RequiresExternalReferences,
344    #[error("{option_name} refers to table not currently being added")]
345    DanglingColumns {
346        option_name: String,
347        items: Vec<UnresolvedItemName>,
348    },
349    #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
350    MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
351    #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
352    UnsupportedColumn {
353        schema_name: Arc<str>,
354        tbl_name: Arc<str>,
355        col_name: Arc<str>,
356        col_type: Arc<str>,
357    },
358    #[error("Table {tbl_name} had all columns excluded")]
359    AllColumnsExcluded { tbl_name: Arc<str> },
360    #[error("No tables found for provided reference")]
361    NoTables,
362    #[error("programming error: {0}")]
363    ProgrammingError(String),
364    #[error("No start_lsn found for capture instance {0}")]
365    NoStartLsn(String),
366    #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
367    CdcMissingColumns {
368        capture_instance: Arc<str>,
369        col_names: Vec<Arc<str>>,
370    },
371}
372
373impl SqlServerSourcePurificationError {
374    pub fn detail(&self) -> Option<String> {
375        match self {
376            Self::DanglingColumns {
377                option_name: _,
378                items,
379            } => Some(format!(
380                "the following columns are referenced but not added: {}",
381                itertools::join(items, ", ")
382            )),
383            _ => None,
384        }
385    }
386
387    pub fn hint(&self) -> Option<String> {
388        match self {
389            Self::RequiresExternalReferences => {
390                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
391            }
392            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
393                "Remove the {} option, as no tables are being added.",
394                option
395            )),
396            Self::NoTables => Some(
397                "No tables were found to replicate. This could be because \
398                the user does not have privileges on the intended tables."
399                    .into(),
400            ),
401            Self::UnsupportedColumn { .. } => {
402                Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
403            }
404            _ => None,
405        }
406    }
407}