mz_sql/pure/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
17
18use crate::names::{FullItemName, PartialItemName};
19
20/// Logical errors detectable during purification for a POSTGRES SOURCE.
21#[derive(Debug, Clone, thiserror::Error)]
22pub enum PgSourcePurificationError {
23    #[error("CREATE SOURCE specifies DETAILS option")]
24    UserSpecifiedDetails,
25    #[error("{0} option is unnecessary when no tables are added")]
26    UnnecessaryOptionsWithoutReferences(String),
27    #[error("PUBLICATION {0} is empty")]
28    EmptyPublication(String),
29    #[error("database {database} missing referenced schemas")]
30    DatabaseMissingFilteredSchemas {
31        database: String,
32        schemas: Vec<String>,
33    },
34    #[error("missing TABLES specification")]
35    RequiresExternalReferences,
36    #[error("insufficient privileges")]
37    UserLacksUsageOnSchemas { user: String, schemas: Vec<String> },
38    #[error("insufficient privileges")]
39    UserLacksSelectOnTables { user: String, tables: Vec<String> },
40    #[error("referenced items not tables with REPLICA IDENTITY FULL")]
41    NotTablesWReplicaIdentityFull { items: Vec<String> },
42    #[error("TEXT COLUMNS refers to table not currently being added")]
43    DanglingTextColumns { items: Vec<PartialItemName> },
44    #[error("referenced tables use unsupported types")]
45    UnrecognizedTypes { cols: Vec<(String, Oid)> },
46    #[error("{0} is not a POSTGRES CONNECTION")]
47    NotPgConnection(FullItemName),
48    #[error("{0} is not a YUGABYTE CONNECTION")]
49    NotYugabyteConnection(FullItemName),
50    #[error("CONNECTION must specify PUBLICATION")]
51    ConnectionMissingPublication,
52    #[error("PostgreSQL server has insufficient number of replication slots available")]
53    InsufficientReplicationSlotsAvailable { count: usize },
54    #[error("server must have wal_level >= logical, but has {wal_level}")]
55    InsufficientWalLevel {
56        wal_level: mz_postgres_util::replication::WalLevel,
57    },
58    #[error("replication disabled on server")]
59    ReplicationDisabled,
60}
61
62impl PgSourcePurificationError {
63    pub fn detail(&self) -> Option<String> {
64        match self {
65            Self::DanglingTextColumns { items } => Some(format!(
66                "the following tables are referenced but not added: {}",
67                itertools::join(items, ", ")
68            )),
69            Self::DatabaseMissingFilteredSchemas {
70                database: _,
71                schemas,
72            } => Some(format!(
73                "missing schemas: {}",
74                itertools::join(schemas.iter(), ", ")
75            )),
76            Self::UserLacksUsageOnSchemas { user, schemas } => Some(format!(
77                "user {} lacks USAGE privileges for schemas {}",
78                user,
79                schemas.join(", ")
80            )),
81            Self::UserLacksSelectOnTables { user, tables } => Some(format!(
82                "user {} lacks SELECT privileges for tables {}",
83                user,
84                tables.join(", ")
85            )),
86            Self::NotTablesWReplicaIdentityFull { items } => {
87                Some(format!("referenced items: {}", items.join(", ")))
88            }
89            Self::UnrecognizedTypes { cols } => Some(format!(
90                "the following columns contain unsupported types:\n{}",
91                itertools::join(
92                    cols.into_iter()
93                        .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
94                    "\n"
95                )
96            )),
97            Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
98                "executing this statement requires {} replication slot{}",
99                count,
100                if *count == 1 { "" } else { "s" }
101            )),
102            _ => None,
103        }
104    }
105
106    pub fn hint(&self) -> Option<String> {
107        match self {
108            Self::UserSpecifiedDetails => Some(
109                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
110                    .into(),
111            ),
112            Self::RequiresExternalReferences => {
113                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
114            }
115            Self::UnrecognizedTypes {
116                cols: _,
117            } => Some(
118                "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
119                as text."
120                    .into(),
121            ),
122            Self::InsufficientReplicationSlotsAvailable { .. } => Some(
123                "you might be able to wait for other sources to finish snapshotting and try again".into()
124            ),
125            Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
126            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
127                "Remove the {} option, as no tables are being added.",
128                option
129            )),
130            _ => None,
131        }
132    }
133}
134
135/// Logical errors detectable during purification for a KAFKA SOURCE.
136#[derive(Debug, Clone, thiserror::Error)]
137pub enum KafkaSourcePurificationError {
138    #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
139    ReferencedSubsources(ExternalReferences),
140    #[error("KAFKA CONNECTION without TOPIC")]
141    ConnectionMissingTopic,
142    #[error("{0} is not a KAFKA CONNECTION")]
143    NotKafkaConnection(FullItemName),
144    #[error("failed to create and connect Kafka consumer")]
145    KafkaConsumerError(String),
146    #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
147    WrongKafkaTopic(String, UnresolvedItemName),
148}
149
150impl KafkaSourcePurificationError {
151    pub fn detail(&self) -> Option<String> {
152        match self {
153            Self::KafkaConsumerError(e) => Some(e.clone()),
154            _ => None,
155        }
156    }
157
158    pub fn hint(&self) -> Option<String> {
159        None
160    }
161}
162
163/// Logical errors detectable during purification for a LOAD GENERATOR SOURCE.
164#[derive(Debug, Clone, thiserror::Error)]
165pub enum LoadGeneratorSourcePurificationError {
166    #[error("FOR ALL TABLES is only valid for multi-output sources")]
167    ForAllTables,
168    #[error("FOR SCHEMAS (..) unsupported")]
169    ForSchemas,
170    #[error("FOR TABLES (..) unsupported")]
171    ForTables,
172    #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
173    MultiOutputRequiresForAllTables,
174    #[error("multi-output sources require an external reference")]
175    MultiOutputRequiresExternalReference,
176    #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
177    WrongLoadGenerator(String, UnresolvedItemName),
178}
179
180impl LoadGeneratorSourcePurificationError {
181    pub fn detail(&self) -> Option<String> {
182        match self {
183            _ => None,
184        }
185    }
186
187    pub fn hint(&self) -> Option<String> {
188        match self {
189            _ => None,
190        }
191    }
192}
193
194/// Logical errors detectable during purification for a KAFKA SINK.
195#[derive(Debug, Clone, thiserror::Error)]
196pub enum KafkaSinkPurificationError {
197    #[error("{0} is not a KAFKA CONNECTION")]
198    NotKafkaConnection(FullItemName),
199    #[error("admin client errored")]
200    AdminClientError(Arc<ContextCreationError>),
201    #[error("zero brokers discovered in metadata request")]
202    ZeroBrokers,
203}
204
205impl KafkaSinkPurificationError {
206    pub fn detail(&self) -> Option<String> {
207        match self {
208            Self::AdminClientError(e) => Some(e.to_string_with_causes()),
209            _ => None,
210        }
211    }
212
213    pub fn hint(&self) -> Option<String> {
214        None
215    }
216}
217
218use mz_ore::error::ErrorExt;
219
220/// Logical errors detectable during purification for Confluent Schema Registry.
221#[derive(Debug, Clone, thiserror::Error)]
222pub enum CsrPurificationError {
223    #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
224    NotCsrConnection(FullItemName),
225    #[error("client errored")]
226    ClientError(Arc<CsrConnectError>),
227    #[error("list subjects failed")]
228    ListSubjectsError(Arc<ListError>),
229}
230
231impl CsrPurificationError {
232    pub fn detail(&self) -> Option<String> {
233        match self {
234            Self::ClientError(e) => Some(e.to_string_with_causes()),
235            Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
236            _ => None,
237        }
238    }
239
240    pub fn hint(&self) -> Option<String> {
241        None
242    }
243}
244
245/// Logical errors detectable during purification for a MySQL SOURCE.
246#[derive(Debug, Clone, thiserror::Error)]
247pub enum MySqlSourcePurificationError {
248    #[error("User lacks required MySQL privileges")]
249    UserLacksPrivileges(Vec<(String, String)>),
250    #[error("CREATE SOURCE specifies DETAILS option")]
251    UserSpecifiedDetails,
252    #[error("{0} option is unnecessary when no tables are added")]
253    UnnecessaryOptionsWithoutReferences(String),
254    #[error("{0} is not a MYSQL CONNECTION")]
255    NotMySqlConnection(FullItemName),
256    #[error("Invalid MySQL system replication settings")]
257    ReplicationSettingsError(Vec<(String, String, String)>),
258    #[error("referenced tables use unsupported types")]
259    UnrecognizedTypes { cols: Vec<(String, String, String)> },
260    #[error("duplicated column name references in table {0}: {1:?}")]
261    DuplicatedColumnNames(String, Vec<String>),
262    #[error("{option_name} refers to table not currently being added")]
263    DanglingColumns {
264        option_name: String,
265        items: Vec<UnresolvedItemName>,
266    },
267    #[error("Invalid MySQL table reference: {0}")]
268    InvalidTableReference(String),
269    #[error("No tables found for provided reference")]
270    EmptyDatabase,
271    #[error("missing TABLES specification")]
272    RequiresExternalReferences,
273    #[error("No tables found in referenced schemas")]
274    NoTablesFoundForSchemas(Vec<String>),
275}
276
277impl MySqlSourcePurificationError {
278    pub fn detail(&self) -> Option<String> {
279        match self {
280            Self::UserLacksPrivileges(missing) => Some(format!(
281                "Missing MySQL privileges: {}",
282                itertools::join(
283                    missing
284                        .iter()
285                        .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
286                    ", "
287                )
288            )),
289            Self::DanglingColumns {
290                option_name: _,
291                items,
292            } => Some(format!(
293                "the following columns are referenced but not added: {}",
294                itertools::join(items, ", ")
295            )),
296            Self::ReplicationSettingsError(settings) => Some(format!(
297                "Invalid MySQL system replication settings: {}",
298                itertools::join(
299                    settings.iter().map(|(setting, expected, actual)| format!(
300                        "{}: expected {}, got {}",
301                        setting, expected, actual
302                    )),
303                    "; "
304                )
305            )),
306            Self::UnrecognizedTypes { cols } => Some(format!(
307                "the following columns contain unsupported types:\n{}",
308                itertools::join(
309                    cols.into_iter().map(|(table, column, data_type)| format!(
310                        "'{}' for {}.{}",
311                        data_type, column, table
312                    )),
313                    "\n"
314                )
315            )),
316            Self::NoTablesFoundForSchemas(schemas) => Some(format!(
317                "missing schemas: {}",
318                itertools::join(schemas.iter(), ", ")
319            )),
320            _ => None,
321        }
322    }
323
324    pub fn hint(&self) -> Option<String> {
325        match self {
326            Self::UserSpecifiedDetails => Some(
327                "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
328                    .into(),
329            ),
330            Self::ReplicationSettingsError(_) => {
331                Some("Set the necessary MySQL database system settings.".into())
332            }
333            Self::RequiresExternalReferences => {
334                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
335            }
336            Self::InvalidTableReference(_) => Some(
337                "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
338            ),
339            Self::UnrecognizedTypes { cols: _ } => Some(
340                "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
341                ingest their values as text, or ignored using EXCLUDE COLUMNS."
342                    .into(),
343            ),
344            Self::EmptyDatabase => Some(
345                "No tables were found to replicate. This could be because \
346                the user does not have privileges on the intended tables."
347                    .into(),
348            ),
349            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
350                "Remove the {} option, as no tables are being added.",
351                option
352            )),
353            _ => None,
354        }
355    }
356}
357
358/// Logical errors detectable during purification for a SQL Server SOURCE.
359#[derive(Debug, Clone, thiserror::Error)]
360pub enum SqlServerSourcePurificationError {
361    #[error("{0} is not a SQL SERVER CONNECTION")]
362    NotSqlServerConnection(FullItemName),
363    #[error("CREATE SOURCE specifies DETAILS option")]
364    UserSpecifiedDetails,
365    #[error("{0} option is unnecessary when no tables are added")]
366    UnnecessaryOptionsWithoutReferences(String),
367    #[error("Invalid SQL Server system replication settings")]
368    ReplicationSettingsError(Vec<(String, String, String)>),
369    #[error("missing TABLES specification")]
370    RequiresExternalReferences,
371    #[error("{option_name} refers to table not currently being added")]
372    DanglingColumns {
373        option_name: String,
374        items: Vec<UnresolvedItemName>,
375    },
376    #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
377    UnsupportedColumn {
378        schema_name: Arc<str>,
379        tbl_name: Arc<str>,
380        col_name: Arc<str>,
381        col_type: Arc<str>,
382    },
383    #[error("No tables found for provided reference")]
384    NoTables,
385    #[error("programming error: {0}")]
386    ProgrammingError(String),
387}
388
389impl SqlServerSourcePurificationError {
390    pub fn detail(&self) -> Option<String> {
391        match self {
392            Self::ReplicationSettingsError(settings) => Some(format!(
393                "Invalid SQL Server system replication settings: {}",
394                itertools::join(
395                    settings.iter().map(|(setting, expected, actual)| format!(
396                        "{}: expected {}, got {}",
397                        setting, expected, actual
398                    )),
399                    "; "
400                )
401            )),
402            Self::DanglingColumns {
403                option_name: _,
404                items,
405            } => Some(format!(
406                "the following columns are referenced but not added: {}",
407                itertools::join(items, ", ")
408            )),
409            _ => None,
410        }
411    }
412
413    pub fn hint(&self) -> Option<String> {
414        match self {
415            Self::RequiresExternalReferences => {
416                Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
417            }
418            Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
419                "Remove the {} option, as no tables are being added.",
420                option
421            )),
422            Self::NoTables => Some(
423                "No tables were found to replicate. This could be because \
424                the user does not have privileges on the intended tables."
425                    .into(),
426            ),
427            Self::UnsupportedColumn { .. } => {
428                Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
429            }
430            _ => None,
431        }
432    }
433}