1use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::connections::{
17 MySqlConnectionValidationError, PostgresConnectionValidationError,
18};
19use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
20
21use crate::names::{FullItemName, PartialItemName};
22
23#[derive(Debug, Clone, thiserror::Error)]
25pub enum PgSourcePurificationError {
26 #[error("CREATE SOURCE specifies DETAILS option")]
27 UserSpecifiedDetails,
28 #[error("{0} option is unnecessary when no tables are added")]
29 UnnecessaryOptionsWithoutReferences(String),
30 #[error("PUBLICATION {0} is empty")]
31 EmptyPublication(String),
32 #[error("database {database} missing referenced schemas")]
33 DatabaseMissingFilteredSchemas {
34 database: String,
35 schemas: Vec<String>,
36 },
37 #[error("missing TABLES specification")]
38 RequiresExternalReferences,
39 #[error("insufficient privileges")]
40 UserLacksUsageOnSchemas { schemas: Vec<String> },
41 #[error("insufficient privileges")]
42 UserLacksSelectOnTables { tables: Vec<String> },
43 #[error("one or more tables requires BYPASSRLS")]
44 BypassRLSRequired { tables: Vec<String> },
45 #[error("referenced items not tables with REPLICA IDENTITY FULL")]
46 NotTablesWReplicaIdentityFull { items: Vec<String> },
47 #[error("TEXT COLUMNS refers to table not currently being added")]
48 DanglingTextColumns { items: Vec<PartialItemName> },
49 #[error("EXCLUDE COLUMNS refers to table not currently being added")]
50 DanglingExcludeColumns { items: Vec<PartialItemName> },
51 #[error("duplicated column name references: {0:?}")]
52 DuplicatedColumnNames(Vec<String>),
53 #[error("referenced tables use unsupported types")]
54 UnrecognizedTypes { cols: Vec<(String, Oid)> },
55 #[error("{0} is not a POSTGRES CONNECTION")]
56 NotPgConnection(FullItemName),
57 #[error("CONNECTION must specify PUBLICATION")]
58 ConnectionMissingPublication,
59 #[error(transparent)]
60 InvalidConnection(PostgresConnectionValidationError),
61}
62
63impl PgSourcePurificationError {
64 pub fn detail(&self) -> Option<String> {
65 match self {
66 Self::DanglingTextColumns { items } => Some(format!(
67 "the following tables are referenced but not added: {}",
68 itertools::join(items, ", ")
69 )),
70 Self::DatabaseMissingFilteredSchemas {
71 database: _,
72 schemas,
73 } => Some(format!(
74 "missing schemas: {}",
75 itertools::join(schemas.iter(), ", ")
76 )),
77 Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
78 "user lacks USAGE privileges for schemas {}",
79 schemas.join(", ")
80 )),
81 Self::UserLacksSelectOnTables { tables } => Some(format!(
82 "user lacks SELECT privileges for tables {}",
83 tables.join(", ")
84 )),
85 Self::BypassRLSRequired { tables } => Some(format!(
86 "user must have BYPASSRLS attribute to read tables {}",
87 tables.join(", "),
88 )),
89 Self::NotTablesWReplicaIdentityFull { items } => {
90 Some(format!("referenced items: {}", items.join(", ")))
91 }
92 Self::UnrecognizedTypes { cols } => Some(format!(
93 "the following columns contain unsupported types:\n{}",
94 itertools::join(
95 cols.into_iter()
96 .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
97 "\n"
98 )
99 )),
100 Self::InvalidConnection(e) => e.detail(),
101 _ => None,
102 }
103 }
104
105 pub fn hint(&self) -> Option<String> {
106 match self {
107 Self::UserSpecifiedDetails => Some(
108 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
109 .into(),
110 ),
111 Self::RequiresExternalReferences => {
112 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
113 }
114 Self::UnrecognizedTypes {
115 cols: _,
116 } => Some(
117 "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
118 as text."
119 .into(),
120 ),
121 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
122 "Remove the {} option, as no tables are being added.",
123 option
124 )),
125 Self::BypassRLSRequired { .. } => Some("Add the BYPASSRLS attribute to the Materialize user".into()),
126 Self::InvalidConnection(e) => e.hint(),
127 _ => None,
128 }
129 }
130}
131
132#[derive(Debug, Clone, thiserror::Error)]
134pub enum KafkaSourcePurificationError {
135 #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
136 ReferencedSubsources(ExternalReferences),
137 #[error("KAFKA CONNECTION without TOPIC")]
138 ConnectionMissingTopic,
139 #[error("{0} is not a KAFKA CONNECTION")]
140 NotKafkaConnection(FullItemName),
141 #[error("failed to create and connect Kafka consumer")]
142 KafkaConsumerError(String),
143 #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
144 WrongKafkaTopic(String, UnresolvedItemName),
145}
146
147impl KafkaSourcePurificationError {
148 pub fn detail(&self) -> Option<String> {
149 match self {
150 Self::KafkaConsumerError(e) => Some(e.clone()),
151 _ => None,
152 }
153 }
154
155 pub fn hint(&self) -> Option<String> {
156 None
157 }
158}
159
160#[derive(Debug, Clone, thiserror::Error)]
162pub enum LoadGeneratorSourcePurificationError {
163 #[error("FOR ALL TABLES is only valid for multi-output sources")]
164 ForAllTables,
165 #[error("FOR SCHEMAS (..) unsupported")]
166 ForSchemas,
167 #[error("FOR TABLES (..) unsupported")]
168 ForTables,
169 #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
170 MultiOutputRequiresForAllTables,
171 #[error("multi-output sources require an external reference")]
172 MultiOutputRequiresExternalReference,
173 #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
174 WrongLoadGenerator(String, UnresolvedItemName),
175}
176
177impl LoadGeneratorSourcePurificationError {
178 pub fn detail(&self) -> Option<String> {
179 match self {
180 _ => None,
181 }
182 }
183
184 pub fn hint(&self) -> Option<String> {
185 match self {
186 _ => None,
187 }
188 }
189}
190
191#[derive(Debug, Clone, thiserror::Error)]
193pub enum KafkaSinkPurificationError {
194 #[error("{0} is not a KAFKA CONNECTION")]
195 NotKafkaConnection(FullItemName),
196 #[error("admin client errored")]
197 AdminClientError(Arc<ContextCreationError>),
198 #[error("zero brokers discovered in metadata request")]
199 ZeroBrokers,
200}
201
202impl KafkaSinkPurificationError {
203 pub fn detail(&self) -> Option<String> {
204 match self {
205 Self::AdminClientError(e) => Some(e.to_string_with_causes()),
206 _ => None,
207 }
208 }
209
210 pub fn hint(&self) -> Option<String> {
211 None
212 }
213}
214
215#[derive(Debug, Clone, thiserror::Error)]
216pub enum IcebergSinkPurificationError {
217 #[error("catalog connection errored")]
218 CatalogError(Arc<anyhow::Error>),
219 #[error("error loading aws sdk context")]
220 AwsSdkContextError(Arc<anyhow::Error>),
221 #[error("S3 Tables connection region mismatch")]
222 S3TablesRegionMismatch {
223 s3_tables_region: String,
224 environment_region: String,
225 },
226}
227
228impl IcebergSinkPurificationError {
229 pub fn detail(&self) -> Option<String> {
230 match self {
231 Self::CatalogError(e) => Some(e.to_string_with_causes()),
232 Self::AwsSdkContextError(e) => Some(e.to_string_with_causes()),
233 Self::S3TablesRegionMismatch {
234 s3_tables_region,
235 environment_region,
236 } => Some(format!(
237 "S3 Tables connection is configured for region '{}' but this Materialize environment is running in region '{}'",
238 s3_tables_region, environment_region
239 )),
240 }
241 }
242
243 pub fn hint(&self) -> Option<String> {
244 match self {
245 Self::S3TablesRegionMismatch {
246 environment_region, ..
247 } => Some(format!(
248 "Create a new AWS connection with REGION = '{}' to use with S3 Tables in this environment.",
249 environment_region
250 )),
251 _ => None,
252 }
253 }
254}
255
256use mz_ore::error::ErrorExt;
257
258#[derive(Debug, Clone, thiserror::Error)]
260pub enum CsrPurificationError {
261 #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
262 NotCsrConnection(FullItemName),
263 #[error("client errored")]
264 ClientError(Arc<CsrConnectError>),
265 #[error("list subjects failed")]
266 ListSubjectsError(Arc<ListError>),
267}
268
269impl CsrPurificationError {
270 pub fn detail(&self) -> Option<String> {
271 match self {
272 Self::ClientError(e) => Some(e.to_string_with_causes()),
273 Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
274 _ => None,
275 }
276 }
277
278 pub fn hint(&self) -> Option<String> {
279 None
280 }
281}
282
283#[derive(Debug, thiserror::Error)]
285pub enum MySqlSourcePurificationError {
286 #[error("User lacks required MySQL privileges")]
287 UserLacksPrivileges(Vec<(String, String)>),
288 #[error("CREATE SOURCE specifies DETAILS option")]
289 UserSpecifiedDetails,
290 #[error("{0} option is unnecessary when no tables are added")]
291 UnnecessaryOptionsWithoutReferences(String),
292 #[error("{0} is not a MYSQL CONNECTION")]
293 NotMySqlConnection(FullItemName),
294 #[error("referenced tables use unsupported types")]
295 UnrecognizedTypes { cols: Vec<(String, String, String)> },
296 #[error("duplicated column name references in table {0}: {1:?}")]
297 DuplicatedColumnNames(String, Vec<String>),
298 #[error("{option_name} refers to table not currently being added")]
299 DanglingColumns {
300 option_name: String,
301 items: Vec<UnresolvedItemName>,
302 },
303 #[error("Invalid MySQL table reference: {0}")]
304 InvalidTableReference(String),
305 #[error("No tables found for provided reference")]
306 EmptyDatabase,
307 #[error("missing TABLES specification")]
308 RequiresExternalReferences,
309 #[error("No tables found in referenced schemas")]
310 NoTablesFoundForSchemas(Vec<String>),
311 #[error(transparent)]
312 InvalidConnection(#[from] MySqlConnectionValidationError),
313}
314
315impl MySqlSourcePurificationError {
316 pub fn detail(&self) -> Option<String> {
317 match self {
318 Self::UserLacksPrivileges(missing) => Some(format!(
319 "Missing MySQL privileges: {}",
320 itertools::join(
321 missing
322 .iter()
323 .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
324 ", "
325 )
326 )),
327 Self::DanglingColumns {
328 option_name: _,
329 items,
330 } => Some(format!(
331 "the following columns are referenced but not added: {}",
332 itertools::join(items, ", ")
333 )),
334 Self::UnrecognizedTypes { cols } => Some(format!(
335 "the following columns contain unsupported types:\n{}",
336 itertools::join(
337 cols.into_iter().map(|(table, column, data_type)| format!(
338 "'{}' for {}.{}",
339 data_type, column, table
340 )),
341 "\n"
342 )
343 )),
344 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
345 "missing schemas: {}",
346 itertools::join(schemas.iter(), ", ")
347 )),
348 Self::InvalidConnection(e) => e.detail(),
349 _ => None,
350 }
351 }
352
353 pub fn hint(&self) -> Option<String> {
354 match self {
355 Self::UserSpecifiedDetails => Some(
356 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
357 .into(),
358 ),
359 Self::RequiresExternalReferences => {
360 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
361 }
362 Self::InvalidTableReference(_) => Some(
363 "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
364 ),
365 Self::UnrecognizedTypes { cols: _ } => Some(
366 "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
367 ingest their values as text, or ignored using EXCLUDE COLUMNS."
368 .into(),
369 ),
370 Self::EmptyDatabase => Some(
371 "No tables were found to replicate. This could be because \
372 the user does not have privileges on the intended tables."
373 .into(),
374 ),
375 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
376 "Remove the {} option, as no tables are being added.",
377 option
378 )),
379 Self::InvalidConnection(e) => e.hint(),
380 _ => None,
381 }
382 }
383}
384
385#[derive(Debug, Clone, thiserror::Error)]
387pub enum SqlServerSourcePurificationError {
388 #[error("{0} is not a SQL SERVER CONNECTION")]
389 NotSqlServerConnection(FullItemName),
390 #[error("CREATE SOURCE specifies DETAILS option")]
391 UserSpecifiedDetails,
392 #[error("{0} option is unnecessary when no tables are added")]
393 UnnecessaryOptionsWithoutReferences(String),
394 #[error("missing TABLES specification")]
395 RequiresExternalReferences,
396 #[error("{option_name} refers to table not currently being added")]
397 DanglingColumns {
398 option_name: String,
399 items: Vec<UnresolvedItemName>,
400 },
401 #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
402 MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
403 #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
404 UnsupportedColumn {
405 schema_name: Arc<str>,
406 tbl_name: Arc<str>,
407 col_name: Arc<str>,
408 col_type: Arc<str>,
409 context: String,
410 },
411 #[error("Table {tbl_name} had all columns excluded")]
412 AllColumnsExcluded { tbl_name: Arc<str> },
413 #[error("No tables found for provided reference")]
414 NoTables,
415 #[error("programming error: {0}")]
416 ProgrammingError(String),
417 #[error("No start_lsn found for capture instance {0}")]
418 NoStartLsn(String),
419 #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
420 CdcMissingColumns {
421 capture_instance: Arc<str>,
422 col_names: Vec<Arc<str>>,
423 },
424}
425
426impl SqlServerSourcePurificationError {
427 pub fn detail(&self) -> Option<String> {
428 match self {
429 Self::DanglingColumns {
430 option_name: _,
431 items,
432 } => Some(format!(
433 "the following columns are referenced but not added: {}",
434 itertools::join(items, ", ")
435 )),
436 Self::UnsupportedColumn { context, .. } => Some(context.clone()),
437 _ => None,
438 }
439 }
440
441 pub fn hint(&self) -> Option<String> {
442 match self {
443 Self::RequiresExternalReferences => {
444 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
445 }
446 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
447 "Remove the {} option, as no tables are being added.",
448 option
449 )),
450 Self::NoTables => Some(
451 "No tables were found to replicate. This could be because \
452 the user does not have privileges on the intended tables."
453 .into(),
454 ),
455 Self::UnsupportedColumn { .. } => {
456 Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
457 }
458 _ => None,
459 }
460 }
461}