1use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::connections::{
17 MySqlConnectionValidationError, PostgresConnectionValidationError,
18};
19use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
20
21use crate::names::{FullItemName, PartialItemName};
22
23#[derive(Debug, Clone, thiserror::Error)]
25pub enum PgSourcePurificationError {
26 #[error("CREATE SOURCE specifies DETAILS option")]
27 UserSpecifiedDetails,
28 #[error("{0} option is unnecessary when no tables are added")]
29 UnnecessaryOptionsWithoutReferences(String),
30 #[error("PUBLICATION {0} is empty")]
31 EmptyPublication(String),
32 #[error("database {database} missing referenced schemas")]
33 DatabaseMissingFilteredSchemas {
34 database: String,
35 schemas: Vec<String>,
36 },
37 #[error("missing TABLES specification")]
38 RequiresExternalReferences,
39 #[error("insufficient privileges")]
40 UserLacksUsageOnSchemas { schemas: Vec<String> },
41 #[error("insufficient privileges")]
42 UserLacksSelectOnTables { tables: Vec<String> },
43 #[error("one or more tables requires BYPASSRLS")]
44 BypassRLSRequired { tables: Vec<String> },
45 #[error("referenced items not tables with REPLICA IDENTITY FULL")]
46 NotTablesWReplicaIdentityFull { items: Vec<String> },
47 #[error("TEXT COLUMNS refers to table not currently being added")]
48 DanglingTextColumns { items: Vec<PartialItemName> },
49 #[error("EXCLUDE COLUMNS refers to table not currently being added")]
50 DanglingExcludeColumns { items: Vec<PartialItemName> },
51 #[error("duplicated column name references: {0:?}")]
52 DuplicatedColumnNames(Vec<String>),
53 #[error("referenced tables use unsupported types")]
54 UnrecognizedTypes { cols: Vec<(String, Oid)> },
55 #[error("{0} is not a POSTGRES CONNECTION")]
56 NotPgConnection(FullItemName),
57 #[error("CONNECTION must specify PUBLICATION")]
58 ConnectionMissingPublication,
59 #[error(transparent)]
60 InvalidConnection(PostgresConnectionValidationError),
61}
62
63impl PgSourcePurificationError {
64 pub fn detail(&self) -> Option<String> {
65 match self {
66 Self::DanglingTextColumns { items } => Some(format!(
67 "the following tables are referenced but not added: {}",
68 itertools::join(items, ", ")
69 )),
70 Self::DatabaseMissingFilteredSchemas {
71 database: _,
72 schemas,
73 } => Some(format!(
74 "missing schemas: {}",
75 itertools::join(schemas.iter(), ", ")
76 )),
77 Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
78 "user lacks USAGE privileges for schemas {}",
79 schemas.join(", ")
80 )),
81 Self::UserLacksSelectOnTables { tables } => Some(format!(
82 "user lacks SELECT privileges for tables {}",
83 tables.join(", ")
84 )),
85 Self::BypassRLSRequired { tables } => Some(format!(
86 "user must have BYPASSRLS attribute to read tables {}",
87 tables.join(", "),
88 )),
89 Self::NotTablesWReplicaIdentityFull { items } => {
90 Some(format!("referenced items: {}", items.join(", ")))
91 }
92 Self::UnrecognizedTypes { cols } => Some(format!(
93 "the following columns contain unsupported types:\n{}",
94 itertools::join(
95 cols.into_iter()
96 .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
97 "\n"
98 )
99 )),
100 Self::InvalidConnection(e) => e.detail(),
101 _ => None,
102 }
103 }
104
105 pub fn hint(&self) -> Option<String> {
106 match self {
107 Self::UserSpecifiedDetails => Some(
108 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
109 .into(),
110 ),
111 Self::RequiresExternalReferences => {
112 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
113 }
114 Self::UnrecognizedTypes {
115 cols: _,
116 } => Some(
117 "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
118 as text."
119 .into(),
120 ),
121 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
122 "Remove the {} option, as no tables are being added.",
123 option
124 )),
125 Self::BypassRLSRequired { .. } => Some("Add the BYPASSRLS attribute to the Materialize user".into()),
126 Self::InvalidConnection(e) => e.hint(),
127 _ => None,
128 }
129 }
130}
131
132#[derive(Debug, Clone, thiserror::Error)]
134pub enum KafkaSourcePurificationError {
135 #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
136 ReferencedSubsources(ExternalReferences),
137 #[error("KAFKA CONNECTION without TOPIC")]
138 ConnectionMissingTopic,
139 #[error("{0} is not a KAFKA CONNECTION")]
140 NotKafkaConnection(FullItemName),
141 #[error("failed to create and connect Kafka consumer")]
142 KafkaConsumerError(String),
143 #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
144 WrongKafkaTopic(String, UnresolvedItemName),
145}
146
147impl KafkaSourcePurificationError {
148 pub fn detail(&self) -> Option<String> {
149 match self {
150 Self::KafkaConsumerError(e) => Some(e.clone()),
151 _ => None,
152 }
153 }
154
155 pub fn hint(&self) -> Option<String> {
156 None
157 }
158}
159
160#[derive(Debug, Clone, thiserror::Error)]
162pub enum LoadGeneratorSourcePurificationError {
163 #[error("FOR ALL TABLES is only valid for multi-output sources")]
164 ForAllTables,
165 #[error("FOR SCHEMAS (..) unsupported")]
166 ForSchemas,
167 #[error("FOR TABLES (..) unsupported")]
168 ForTables,
169 #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
170 MultiOutputRequiresForAllTables,
171 #[error("multi-output sources require an external reference")]
172 MultiOutputRequiresExternalReference,
173 #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
174 WrongLoadGenerator(String, UnresolvedItemName),
175}
176
177impl LoadGeneratorSourcePurificationError {
178 pub fn detail(&self) -> Option<String> {
179 match self {
180 _ => None,
181 }
182 }
183
184 pub fn hint(&self) -> Option<String> {
185 match self {
186 _ => None,
187 }
188 }
189}
190
191#[derive(Debug, Clone, thiserror::Error)]
193pub enum KafkaSinkPurificationError {
194 #[error("{0} is not a KAFKA CONNECTION")]
195 NotKafkaConnection(FullItemName),
196 #[error("admin client errored")]
197 AdminClientError(Arc<ContextCreationError>),
198 #[error("zero brokers discovered in metadata request")]
199 ZeroBrokers,
200}
201
202impl KafkaSinkPurificationError {
203 pub fn detail(&self) -> Option<String> {
204 match self {
205 Self::AdminClientError(e) => Some(e.to_string_with_causes()),
206 _ => None,
207 }
208 }
209
210 pub fn hint(&self) -> Option<String> {
211 None
212 }
213}
214
215#[derive(Debug, Clone, thiserror::Error)]
216pub enum IcebergSinkPurificationError {
217 #[error("catalog connection errored")]
218 CatalogError(Arc<anyhow::Error>),
219 #[error("error loading aws sdk context")]
220 AwsSdkContextError(Arc<anyhow::Error>),
221}
222
223impl IcebergSinkPurificationError {
224 pub fn detail(&self) -> Option<String> {
225 match self {
226 Self::CatalogError(e) => Some(e.to_string_with_causes()),
227 Self::AwsSdkContextError(e) => Some(e.to_string_with_causes()),
228 }
229 }
230
231 pub fn hint(&self) -> Option<String> {
232 None
233 }
234}
235
236use mz_ore::error::ErrorExt;
237
238#[derive(Debug, Clone, thiserror::Error)]
240pub enum CsrPurificationError {
241 #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
242 NotCsrConnection(FullItemName),
243 #[error("client errored")]
244 ClientError(Arc<CsrConnectError>),
245 #[error("list subjects failed")]
246 ListSubjectsError(Arc<ListError>),
247}
248
249impl CsrPurificationError {
250 pub fn detail(&self) -> Option<String> {
251 match self {
252 Self::ClientError(e) => Some(e.to_string_with_causes()),
253 Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
254 _ => None,
255 }
256 }
257
258 pub fn hint(&self) -> Option<String> {
259 None
260 }
261}
262
263#[derive(Debug, thiserror::Error)]
265pub enum MySqlSourcePurificationError {
266 #[error("User lacks required MySQL privileges")]
267 UserLacksPrivileges(Vec<(String, String)>),
268 #[error("CREATE SOURCE specifies DETAILS option")]
269 UserSpecifiedDetails,
270 #[error("{0} option is unnecessary when no tables are added")]
271 UnnecessaryOptionsWithoutReferences(String),
272 #[error("{0} is not a MYSQL CONNECTION")]
273 NotMySqlConnection(FullItemName),
274 #[error("referenced tables use unsupported types")]
275 UnrecognizedTypes { cols: Vec<(String, String, String)> },
276 #[error("duplicated column name references in table {0}: {1:?}")]
277 DuplicatedColumnNames(String, Vec<String>),
278 #[error("{option_name} refers to table not currently being added")]
279 DanglingColumns {
280 option_name: String,
281 items: Vec<UnresolvedItemName>,
282 },
283 #[error("Invalid MySQL table reference: {0}")]
284 InvalidTableReference(String),
285 #[error("No tables found for provided reference")]
286 EmptyDatabase,
287 #[error("missing TABLES specification")]
288 RequiresExternalReferences,
289 #[error("No tables found in referenced schemas")]
290 NoTablesFoundForSchemas(Vec<String>),
291 #[error(transparent)]
292 InvalidConnection(#[from] MySqlConnectionValidationError),
293}
294
295impl MySqlSourcePurificationError {
296 pub fn detail(&self) -> Option<String> {
297 match self {
298 Self::UserLacksPrivileges(missing) => Some(format!(
299 "Missing MySQL privileges: {}",
300 itertools::join(
301 missing
302 .iter()
303 .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
304 ", "
305 )
306 )),
307 Self::DanglingColumns {
308 option_name: _,
309 items,
310 } => Some(format!(
311 "the following columns are referenced but not added: {}",
312 itertools::join(items, ", ")
313 )),
314 Self::UnrecognizedTypes { cols } => Some(format!(
315 "the following columns contain unsupported types:\n{}",
316 itertools::join(
317 cols.into_iter().map(|(table, column, data_type)| format!(
318 "'{}' for {}.{}",
319 data_type, column, table
320 )),
321 "\n"
322 )
323 )),
324 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
325 "missing schemas: {}",
326 itertools::join(schemas.iter(), ", ")
327 )),
328 Self::InvalidConnection(e) => e.detail(),
329 _ => None,
330 }
331 }
332
333 pub fn hint(&self) -> Option<String> {
334 match self {
335 Self::UserSpecifiedDetails => Some(
336 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
337 .into(),
338 ),
339 Self::RequiresExternalReferences => {
340 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
341 }
342 Self::InvalidTableReference(_) => Some(
343 "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
344 ),
345 Self::UnrecognizedTypes { cols: _ } => Some(
346 "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
347 ingest their values as text, or ignored using EXCLUDE COLUMNS."
348 .into(),
349 ),
350 Self::EmptyDatabase => Some(
351 "No tables were found to replicate. This could be because \
352 the user does not have privileges on the intended tables."
353 .into(),
354 ),
355 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
356 "Remove the {} option, as no tables are being added.",
357 option
358 )),
359 Self::InvalidConnection(e) => e.hint(),
360 _ => None,
361 }
362 }
363}
364
365#[derive(Debug, Clone, thiserror::Error)]
367pub enum SqlServerSourcePurificationError {
368 #[error("{0} is not a SQL SERVER CONNECTION")]
369 NotSqlServerConnection(FullItemName),
370 #[error("CREATE SOURCE specifies DETAILS option")]
371 UserSpecifiedDetails,
372 #[error("{0} option is unnecessary when no tables are added")]
373 UnnecessaryOptionsWithoutReferences(String),
374 #[error("missing TABLES specification")]
375 RequiresExternalReferences,
376 #[error("{option_name} refers to table not currently being added")]
377 DanglingColumns {
378 option_name: String,
379 items: Vec<UnresolvedItemName>,
380 },
381 #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
382 MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
383 #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
384 UnsupportedColumn {
385 schema_name: Arc<str>,
386 tbl_name: Arc<str>,
387 col_name: Arc<str>,
388 col_type: Arc<str>,
389 context: String,
390 },
391 #[error("Table {tbl_name} had all columns excluded")]
392 AllColumnsExcluded { tbl_name: Arc<str> },
393 #[error("No tables found for provided reference")]
394 NoTables,
395 #[error("programming error: {0}")]
396 ProgrammingError(String),
397 #[error("No start_lsn found for capture instance {0}")]
398 NoStartLsn(String),
399 #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
400 CdcMissingColumns {
401 capture_instance: Arc<str>,
402 col_names: Vec<Arc<str>>,
403 },
404}
405
406impl SqlServerSourcePurificationError {
407 pub fn detail(&self) -> Option<String> {
408 match self {
409 Self::DanglingColumns {
410 option_name: _,
411 items,
412 } => Some(format!(
413 "the following columns are referenced but not added: {}",
414 itertools::join(items, ", ")
415 )),
416 Self::UnsupportedColumn { context, .. } => Some(context.clone()),
417 _ => None,
418 }
419 }
420
421 pub fn hint(&self) -> Option<String> {
422 match self {
423 Self::RequiresExternalReferences => {
424 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
425 }
426 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
427 "Remove the {} option, as no tables are being added.",
428 option
429 )),
430 Self::NoTables => Some(
431 "No tables were found to replicate. This could be because \
432 the user does not have privileges on the intended tables."
433 .into(),
434 ),
435 Self::UnsupportedColumn { .. } => {
436 Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
437 }
438 _ => None,
439 }
440 }
441}