1use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::connections::{
17 MySqlConnectionValidationError, PostgresConnectionValidationError,
18};
19use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
20
21use crate::names::{FullItemName, PartialItemName};
22
23#[derive(Debug, Clone, thiserror::Error)]
25pub enum PgSourcePurificationError {
26 #[error("CREATE SOURCE specifies DETAILS option")]
27 UserSpecifiedDetails,
28 #[error("{0} option is unnecessary when no tables are added")]
29 UnnecessaryOptionsWithoutReferences(String),
30 #[error("PUBLICATION {0} is empty")]
31 EmptyPublication(String),
32 #[error("database {database} missing referenced schemas")]
33 DatabaseMissingFilteredSchemas {
34 database: String,
35 schemas: Vec<String>,
36 },
37 #[error("missing TABLES specification")]
38 RequiresExternalReferences,
39 #[error("insufficient privileges")]
40 UserLacksUsageOnSchemas { schemas: Vec<String> },
41 #[error("insufficient privileges")]
42 UserLacksSelectOnTables { tables: Vec<String> },
43 #[error("one or more tables requires BYPASSRLS")]
44 BypassRLSRequired { tables: Vec<String> },
45 #[error("referenced items not tables with REPLICA IDENTITY FULL")]
46 NotTablesWReplicaIdentityFull { items: Vec<String> },
47 #[error("TEXT COLUMNS refers to table not currently being added")]
48 DanglingTextColumns { items: Vec<PartialItemName> },
49 #[error("EXCLUDE COLUMNS refers to table not currently being added")]
50 DanglingExcludeColumns { items: Vec<PartialItemName> },
51 #[error("duplicated column name references: {0:?}")]
52 DuplicatedColumnNames(Vec<String>),
53 #[error("referenced tables use unsupported types")]
54 UnrecognizedTypes { cols: Vec<(String, Oid)> },
55 #[error("{0} is not a POSTGRES CONNECTION")]
56 NotPgConnection(FullItemName),
57 #[error("CONNECTION must specify PUBLICATION")]
58 ConnectionMissingPublication,
59 #[error(transparent)]
60 InvalidConnection(PostgresConnectionValidationError),
61}
62
63impl PgSourcePurificationError {
64 pub fn detail(&self) -> Option<String> {
65 match self {
66 Self::DanglingTextColumns { items } => Some(format!(
67 "the following tables are referenced but not added: {}",
68 itertools::join(items, ", ")
69 )),
70 Self::DatabaseMissingFilteredSchemas {
71 database: _,
72 schemas,
73 } => Some(format!(
74 "missing schemas: {}",
75 itertools::join(schemas.iter(), ", ")
76 )),
77 Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
78 "user lacks USAGE privileges for schemas {}",
79 schemas.join(", ")
80 )),
81 Self::UserLacksSelectOnTables { tables } => Some(format!(
82 "user lacks SELECT privileges for tables {}",
83 tables.join(", ")
84 )),
85 Self::BypassRLSRequired { tables } => Some(format!(
86 "user must have BYPASSRLS attribute to read tables {}",
87 tables.join(", "),
88 )),
89 Self::NotTablesWReplicaIdentityFull { items } => {
90 Some(format!("referenced items: {}", items.join(", ")))
91 }
92 Self::UnrecognizedTypes { cols } => Some(format!(
93 "the following columns contain unsupported types:\n{}",
94 itertools::join(
95 cols.into_iter()
96 .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
97 "\n"
98 )
99 )),
100 Self::InvalidConnection(e) => e.detail(),
101 _ => None,
102 }
103 }
104
105 pub fn hint(&self) -> Option<String> {
106 match self {
107 Self::UserSpecifiedDetails => Some(
108 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
109 .into(),
110 ),
111 Self::RequiresExternalReferences => {
112 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
113 }
114 Self::UnrecognizedTypes {
115 cols: _,
116 } => Some(
117 "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
118 as text."
119 .into(),
120 ),
121 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
122 "Remove the {} option, as no tables are being added.",
123 option
124 )),
125 Self::BypassRLSRequired { .. } => Some("Add the BYPASSRLS attribute to the Materialize user".into()),
126 Self::InvalidConnection(e) => e.hint(),
127 _ => None,
128 }
129 }
130}
131
132#[derive(Debug, Clone, thiserror::Error)]
134pub enum KafkaSourcePurificationError {
135 #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
136 ReferencedSubsources(ExternalReferences),
137 #[error("KAFKA CONNECTION without TOPIC")]
138 ConnectionMissingTopic,
139 #[error("{0} is not a KAFKA CONNECTION")]
140 NotKafkaConnection(FullItemName),
141 #[error("failed to create and connect Kafka consumer")]
142 KafkaConsumerError(String),
143 #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
144 WrongKafkaTopic(String, UnresolvedItemName),
145}
146
147impl KafkaSourcePurificationError {
148 pub fn detail(&self) -> Option<String> {
149 match self {
150 Self::KafkaConsumerError(e) => Some(e.clone()),
151 _ => None,
152 }
153 }
154
155 pub fn hint(&self) -> Option<String> {
156 None
157 }
158}
159
160#[derive(Debug, Clone, thiserror::Error)]
162pub enum LoadGeneratorSourcePurificationError {
163 #[error("FOR ALL TABLES is only valid for multi-output sources")]
164 ForAllTables,
165 #[error("FOR SCHEMAS (..) unsupported")]
166 ForSchemas,
167 #[error("FOR TABLES (..) unsupported")]
168 ForTables,
169 #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
170 MultiOutputRequiresForAllTables,
171 #[error("multi-output sources require an external reference")]
172 MultiOutputRequiresExternalReference,
173 #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
174 WrongLoadGenerator(String, UnresolvedItemName),
175}
176
177impl LoadGeneratorSourcePurificationError {
178 pub fn detail(&self) -> Option<String> {
179 match self {
180 _ => None,
181 }
182 }
183
184 pub fn hint(&self) -> Option<String> {
185 match self {
186 _ => None,
187 }
188 }
189}
190
191#[derive(Debug, Clone, thiserror::Error)]
193pub enum KafkaSinkPurificationError {
194 #[error("{0} is not a KAFKA CONNECTION")]
195 NotKafkaConnection(FullItemName),
196 #[error("admin client errored")]
197 AdminClientError(Arc<ContextCreationError>),
198 #[error("zero brokers discovered in metadata request")]
199 ZeroBrokers,
200}
201
202impl KafkaSinkPurificationError {
203 pub fn detail(&self) -> Option<String> {
204 match self {
205 Self::AdminClientError(e) => Some(e.to_string_with_causes()),
206 _ => None,
207 }
208 }
209
210 pub fn hint(&self) -> Option<String> {
211 None
212 }
213}
214
215#[derive(Debug, Clone, thiserror::Error)]
216pub enum IcebergSinkPurificationError {
217 #[error("catalog connection errored")]
218 CatalogError(Arc<anyhow::Error>),
219 #[error("error loading aws sdk context")]
220 AwsSdkContextError(Arc<anyhow::Error>),
221 #[error("S3 Tables connection region mismatch")]
222 S3TablesRegionMismatch {
223 s3_tables_region: String,
224 environment_region: String,
225 },
226}
227
228impl IcebergSinkPurificationError {
229 pub fn detail(&self) -> Option<String> {
230 match self {
231 Self::CatalogError(e) => Some(e.to_string_with_causes()),
232 Self::AwsSdkContextError(e) => Some(e.to_string_with_causes()),
233 Self::S3TablesRegionMismatch {
234 s3_tables_region,
235 environment_region,
236 } => Some(format!(
237 "S3 Tables connection is configured for region '{}' but this Materialize environment is running in region '{}'",
238 s3_tables_region, environment_region
239 )),
240 }
241 }
242
243 pub fn hint(&self) -> Option<String> {
244 match self {
245 Self::S3TablesRegionMismatch {
246 environment_region, ..
247 } => Some(format!(
248 "Create a new AWS connection with REGION = '{}' to use with S3 Tables in this environment.",
249 environment_region
250 )),
251 _ => None,
252 }
253 }
254}
255
256use mz_ore::error::ErrorExt;
257
258#[derive(Debug, Clone, thiserror::Error)]
260pub enum CsrPurificationError {
261 #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
262 NotCsrConnection(FullItemName),
263 #[error("client errored")]
264 ClientError(Arc<CsrConnectError>),
265 #[error("list subjects failed")]
266 ListSubjectsError(Arc<ListError>),
267}
268
269impl CsrPurificationError {
270 pub fn detail(&self) -> Option<String> {
271 match self {
272 Self::ClientError(e) => Some(e.to_string_with_causes()),
273 Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
274 Self::NotCsrConnection(_) => None,
275 }
276 }
277
278 pub fn hint(&self) -> Option<String> {
279 None
280 }
281}
282
283#[derive(Debug, thiserror::Error)]
285pub enum MySqlSourcePurificationError {
286 #[error("User lacks required MySQL privileges")]
287 UserLacksPrivileges(Vec<(String, String)>),
288 #[error("CREATE SOURCE specifies DETAILS option")]
289 UserSpecifiedDetails,
290 #[error("{0} option is unnecessary when no tables are added")]
291 UnnecessaryOptionsWithoutReferences(String),
292 #[error("{0} is not a MYSQL CONNECTION")]
293 NotMySqlConnection(FullItemName),
294 #[error("referenced tables use unsupported types")]
295 UnrecognizedTypes { cols: Vec<(String, String, String)> },
296 #[error("duplicated column name references in table {0}: {1:?}")]
297 DuplicatedColumnNames(String, Vec<String>),
298 #[error("{option_name} refers to table not currently being added")]
299 DanglingColumns {
300 option_name: String,
301 items: Vec<UnresolvedItemName>,
302 },
303 #[error("Invalid MySQL table reference: {0}")]
304 InvalidTableReference(String),
305 #[error("No tables found for provided reference")]
306 EmptyDatabase,
307 #[error("missing TABLES specification")]
308 RequiresExternalReferences,
309 #[error("No tables found in referenced schemas")]
310 NoTablesFoundForSchemas(Vec<String>),
311 #[error(transparent)]
312 InvalidConnection(#[from] MySqlConnectionValidationError),
313 #[error(
314 "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
315 )]
316 UnsupportedBinlogMetadataSetting { setting: String },
317 #[error(
318 "You are using MySQL version {version}. Materialize requires MySQL 8.0.1 or later to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
319 )]
320 UnsupportedMySqlVersion { version: String },
321}
322
323impl MySqlSourcePurificationError {
324 pub fn detail(&self) -> Option<String> {
325 match self {
326 Self::UserLacksPrivileges(missing) => Some(format!(
327 "Missing MySQL privileges: {}",
328 itertools::join(
329 missing
330 .iter()
331 .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
332 ", "
333 )
334 )),
335 Self::DanglingColumns {
336 option_name: _,
337 items,
338 } => Some(format!(
339 "the following columns are referenced but not added: {}",
340 itertools::join(items, ", ")
341 )),
342 Self::UnrecognizedTypes { cols } => Some(format!(
343 "the following columns contain unsupported types:\n{}",
344 itertools::join(
345 cols.into_iter().map(|(table, column, data_type)| format!(
346 "'{}' for {}.{}",
347 data_type, column, table
348 )),
349 "\n"
350 )
351 )),
352 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
353 "missing schemas: {}",
354 itertools::join(schemas.iter(), ", ")
355 )),
356 Self::InvalidConnection(e) => e.detail(),
357 _ => None,
358 }
359 }
360
361 pub fn hint(&self) -> Option<String> {
362 match self {
363 Self::UserSpecifiedDetails => Some(
364 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
365 .into(),
366 ),
367 Self::RequiresExternalReferences => {
368 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
369 }
370 Self::InvalidTableReference(_) => Some(
371 "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
372 ),
373 Self::UnrecognizedTypes { cols: _ } => Some(
374 "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
375 ingest their values as text, or ignored using EXCLUDE COLUMNS."
376 .into(),
377 ),
378 Self::EmptyDatabase => Some(
379 "No tables were found to replicate. This could be because \
380 the user does not have privileges on the intended tables."
381 .into(),
382 ),
383 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
384 "Remove the {} option, as no tables are being added.",
385 option
386 )),
387 Self::InvalidConnection(e) => e.hint(),
388 _ => None,
389 }
390 }
391}
392
393#[derive(Debug, Clone, thiserror::Error)]
395pub enum SqlServerSourcePurificationError {
396 #[error("{0} is not a SQL SERVER CONNECTION")]
397 NotSqlServerConnection(FullItemName),
398 #[error("CREATE SOURCE specifies DETAILS option")]
399 UserSpecifiedDetails,
400 #[error("{0} option is unnecessary when no tables are added")]
401 UnnecessaryOptionsWithoutReferences(String),
402 #[error("missing TABLES specification")]
403 RequiresExternalReferences,
404 #[error("{option_name} refers to table not currently being added")]
405 DanglingColumns {
406 option_name: String,
407 items: Vec<UnresolvedItemName>,
408 },
409 #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
410 MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
411 #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
412 UnsupportedColumn {
413 schema_name: Arc<str>,
414 tbl_name: Arc<str>,
415 col_name: Arc<str>,
416 col_type: Arc<str>,
417 context: String,
418 },
419 #[error("Table {tbl_name} had all columns excluded")]
420 AllColumnsExcluded { tbl_name: Arc<str> },
421 #[error("No tables found for provided reference")]
422 NoTables,
423 #[error("programming error: {0}")]
424 ProgrammingError(String),
425 #[error("No start_lsn found for capture instance {0}")]
426 NoStartLsn(String),
427 #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
428 CdcMissingColumns {
429 capture_instance: Arc<str>,
430 col_names: Vec<Arc<str>>,
431 },
432}
433
434impl SqlServerSourcePurificationError {
435 pub fn detail(&self) -> Option<String> {
436 match self {
437 Self::DanglingColumns {
438 option_name: _,
439 items,
440 } => Some(format!(
441 "the following columns are referenced but not added: {}",
442 itertools::join(items, ", ")
443 )),
444 Self::UnsupportedColumn { context, .. } => Some(context.clone()),
445 _ => None,
446 }
447 }
448
449 pub fn hint(&self) -> Option<String> {
450 match self {
451 Self::RequiresExternalReferences => {
452 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
453 }
454 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
455 "Remove the {} option, as no tables are being added.",
456 option
457 )),
458 Self::NoTables => Some(
459 "No tables were found to replicate. This could be because \
460 the user does not have privileges on the intended tables."
461 .into(),
462 ),
463 Self::UnsupportedColumn { .. } => {
464 Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
465 }
466 _ => None,
467 }
468 }
469}