1use std::sync::Arc;
11
12use aws_sdk_sts::operation::get_caller_identity::GetCallerIdentityError;
13use mz_ccsr::ListError;
14use mz_repr::adt::system::Oid;
15use mz_sql_parser::ast::display::AstDisplay;
16use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
17use mz_storage_types::connections::{
18 MySqlConnectionValidationError, PostgresConnectionValidationError,
19};
20use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
21
22use crate::names::{FullItemName, PartialItemName};
23
24#[derive(Debug, Clone, thiserror::Error)]
26pub enum PgSourcePurificationError {
27 #[error("CREATE SOURCE specifies DETAILS option")]
28 UserSpecifiedDetails,
29 #[error("{0} option is unnecessary when no tables are added")]
30 UnnecessaryOptionsWithoutReferences(String),
31 #[error("PUBLICATION {0} is empty")]
32 EmptyPublication(String),
33 #[error("database {database} missing referenced schemas")]
34 DatabaseMissingFilteredSchemas {
35 database: String,
36 schemas: Vec<String>,
37 },
38 #[error("missing TABLES specification")]
39 RequiresExternalReferences,
40 #[error("insufficient privileges")]
41 UserLacksUsageOnSchemas { schemas: Vec<String> },
42 #[error("insufficient privileges")]
43 UserLacksSelectOnTables { tables: Vec<String> },
44 #[error("one or more tables requires BYPASSRLS")]
45 BypassRLSRequired { tables: Vec<String> },
46 #[error("referenced items not tables with REPLICA IDENTITY FULL")]
47 NotTablesWReplicaIdentityFull { items: Vec<String> },
48 #[error("TEXT COLUMNS refers to table not currently being added")]
49 DanglingTextColumns { items: Vec<PartialItemName> },
50 #[error("EXCLUDE COLUMNS refers to table not currently being added")]
51 DanglingExcludeColumns { items: Vec<PartialItemName> },
52 #[error("duplicated column name references: {0:?}")]
53 DuplicatedColumnNames(Vec<String>),
54 #[error("referenced tables use unsupported types")]
55 UnrecognizedTypes { cols: Vec<(String, Oid)> },
56 #[error("{0} is not a POSTGRES CONNECTION")]
57 NotPgConnection(FullItemName),
58 #[error("CONNECTION must specify PUBLICATION")]
59 ConnectionMissingPublication,
60 #[error(transparent)]
61 InvalidConnection(PostgresConnectionValidationError),
62}
63
64impl PgSourcePurificationError {
65 pub fn detail(&self) -> Option<String> {
66 match self {
67 Self::DanglingTextColumns { items } => Some(format!(
68 "the following tables are referenced but not added: {}",
69 itertools::join(items, ", ")
70 )),
71 Self::DatabaseMissingFilteredSchemas {
72 database: _,
73 schemas,
74 } => Some(format!(
75 "missing schemas: {}",
76 itertools::join(schemas.iter(), ", ")
77 )),
78 Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
79 "user lacks USAGE privileges for schemas {}",
80 schemas.join(", ")
81 )),
82 Self::UserLacksSelectOnTables { tables } => Some(format!(
83 "user lacks SELECT privileges for tables {}",
84 tables.join(", ")
85 )),
86 Self::BypassRLSRequired { tables } => Some(format!(
87 "user must have BYPASSRLS attribute to read tables {}",
88 tables.join(", "),
89 )),
90 Self::NotTablesWReplicaIdentityFull { items } => {
91 Some(format!("referenced items: {}", items.join(", ")))
92 }
93 Self::UnrecognizedTypes { cols } => Some(format!(
94 "the following columns contain unsupported types:\n{}",
95 itertools::join(
96 cols.into_iter()
97 .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
98 "\n"
99 )
100 )),
101 Self::InvalidConnection(e) => e.detail(),
102 _ => None,
103 }
104 }
105
106 pub fn hint(&self) -> Option<String> {
107 match self {
108 Self::UserSpecifiedDetails => Some(
109 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
110 .into(),
111 ),
112 Self::RequiresExternalReferences => {
113 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
114 }
115 Self::UnrecognizedTypes {
116 cols: _,
117 } => Some(
118 "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
119 as text."
120 .into(),
121 ),
122 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
123 "Remove the {} option, as no tables are being added.",
124 option
125 )),
126 Self::BypassRLSRequired { .. } => Some("Add the BYPASSRLS attribute to the Materialize user".into()),
127 Self::InvalidConnection(e) => e.hint(),
128 _ => None,
129 }
130 }
131}
132
133#[derive(Debug, Clone, thiserror::Error)]
135pub enum KafkaSourcePurificationError {
136 #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
137 ReferencedSubsources(ExternalReferences),
138 #[error("KAFKA CONNECTION without TOPIC")]
139 ConnectionMissingTopic,
140 #[error("{0} is not a KAFKA CONNECTION")]
141 NotKafkaConnection(FullItemName),
142 #[error("failed to create and connect Kafka consumer")]
143 KafkaConsumerError(String),
144 #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
145 WrongKafkaTopic(String, UnresolvedItemName),
146}
147
148impl KafkaSourcePurificationError {
149 pub fn detail(&self) -> Option<String> {
150 match self {
151 Self::KafkaConsumerError(e) => Some(e.clone()),
152 _ => None,
153 }
154 }
155
156 pub fn hint(&self) -> Option<String> {
157 None
158 }
159}
160
161#[derive(Debug, Clone, thiserror::Error)]
163pub enum LoadGeneratorSourcePurificationError {
164 #[error("FOR ALL TABLES is only valid for multi-output sources")]
165 ForAllTables,
166 #[error("FOR SCHEMAS (..) unsupported")]
167 ForSchemas,
168 #[error("FOR TABLES (..) unsupported")]
169 ForTables,
170 #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
171 MultiOutputRequiresForAllTables,
172 #[error("multi-output sources require an external reference")]
173 MultiOutputRequiresExternalReference,
174 #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
175 WrongLoadGenerator(String, UnresolvedItemName),
176}
177
178impl LoadGeneratorSourcePurificationError {
179 pub fn detail(&self) -> Option<String> {
180 match self {
181 _ => None,
182 }
183 }
184
185 pub fn hint(&self) -> Option<String> {
186 match self {
187 _ => None,
188 }
189 }
190}
191
192#[derive(Debug, Clone, thiserror::Error)]
194pub enum KafkaSinkPurificationError {
195 #[error("{0} is not a KAFKA CONNECTION")]
196 NotKafkaConnection(FullItemName),
197 #[error("admin client errored")]
198 AdminClientError(Arc<ContextCreationError>),
199 #[error("zero brokers discovered in metadata request")]
200 ZeroBrokers,
201}
202
203impl KafkaSinkPurificationError {
204 pub fn detail(&self) -> Option<String> {
205 match self {
206 Self::AdminClientError(e) => Some(e.to_string_with_causes()),
207 _ => None,
208 }
209 }
210
211 pub fn hint(&self) -> Option<String> {
212 None
213 }
214}
215
216#[derive(Debug, Clone, thiserror::Error)]
217pub enum IcebergSinkPurificationError {
218 #[error("catalog connection errored")]
219 CatalogError(Arc<anyhow::Error>),
220 #[error("error loading aws sdk context")]
221 AwsSdkContextError(Arc<anyhow::Error>),
222 #[error("error listing sts identity")]
223 StsIdentityError(Arc<GetCallerIdentityError>),
224}
225
226impl IcebergSinkPurificationError {
227 pub fn detail(&self) -> Option<String> {
228 match self {
229 Self::CatalogError(e) => Some(e.to_string_with_causes()),
230 Self::AwsSdkContextError(e) => Some(e.to_string_with_causes()),
231 Self::StsIdentityError(e) => Some(e.to_string_with_causes()),
232 }
233 }
234
235 pub fn hint(&self) -> Option<String> {
236 None
237 }
238}
239
240use mz_ore::error::ErrorExt;
241
242#[derive(Debug, Clone, thiserror::Error)]
244pub enum CsrPurificationError {
245 #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
246 NotCsrConnection(FullItemName),
247 #[error("client errored")]
248 ClientError(Arc<CsrConnectError>),
249 #[error("list subjects failed")]
250 ListSubjectsError(Arc<ListError>),
251}
252
253impl CsrPurificationError {
254 pub fn detail(&self) -> Option<String> {
255 match self {
256 Self::ClientError(e) => Some(e.to_string_with_causes()),
257 Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
258 _ => None,
259 }
260 }
261
262 pub fn hint(&self) -> Option<String> {
263 None
264 }
265}
266
267#[derive(Debug, thiserror::Error)]
269pub enum MySqlSourcePurificationError {
270 #[error("User lacks required MySQL privileges")]
271 UserLacksPrivileges(Vec<(String, String)>),
272 #[error("CREATE SOURCE specifies DETAILS option")]
273 UserSpecifiedDetails,
274 #[error("{0} option is unnecessary when no tables are added")]
275 UnnecessaryOptionsWithoutReferences(String),
276 #[error("{0} is not a MYSQL CONNECTION")]
277 NotMySqlConnection(FullItemName),
278 #[error("referenced tables use unsupported types")]
279 UnrecognizedTypes { cols: Vec<(String, String, String)> },
280 #[error("duplicated column name references in table {0}: {1:?}")]
281 DuplicatedColumnNames(String, Vec<String>),
282 #[error("{option_name} refers to table not currently being added")]
283 DanglingColumns {
284 option_name: String,
285 items: Vec<UnresolvedItemName>,
286 },
287 #[error("Invalid MySQL table reference: {0}")]
288 InvalidTableReference(String),
289 #[error("No tables found for provided reference")]
290 EmptyDatabase,
291 #[error("missing TABLES specification")]
292 RequiresExternalReferences,
293 #[error("No tables found in referenced schemas")]
294 NoTablesFoundForSchemas(Vec<String>),
295 #[error(transparent)]
296 InvalidConnection(#[from] MySqlConnectionValidationError),
297}
298
299impl MySqlSourcePurificationError {
300 pub fn detail(&self) -> Option<String> {
301 match self {
302 Self::UserLacksPrivileges(missing) => Some(format!(
303 "Missing MySQL privileges: {}",
304 itertools::join(
305 missing
306 .iter()
307 .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
308 ", "
309 )
310 )),
311 Self::DanglingColumns {
312 option_name: _,
313 items,
314 } => Some(format!(
315 "the following columns are referenced but not added: {}",
316 itertools::join(items, ", ")
317 )),
318 Self::UnrecognizedTypes { cols } => Some(format!(
319 "the following columns contain unsupported types:\n{}",
320 itertools::join(
321 cols.into_iter().map(|(table, column, data_type)| format!(
322 "'{}' for {}.{}",
323 data_type, column, table
324 )),
325 "\n"
326 )
327 )),
328 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
329 "missing schemas: {}",
330 itertools::join(schemas.iter(), ", ")
331 )),
332 Self::InvalidConnection(e) => e.detail(),
333 _ => None,
334 }
335 }
336
337 pub fn hint(&self) -> Option<String> {
338 match self {
339 Self::UserSpecifiedDetails => Some(
340 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
341 .into(),
342 ),
343 Self::RequiresExternalReferences => {
344 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
345 }
346 Self::InvalidTableReference(_) => Some(
347 "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
348 ),
349 Self::UnrecognizedTypes { cols: _ } => Some(
350 "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
351 ingest their values as text, or ignored using EXCLUDE COLUMNS."
352 .into(),
353 ),
354 Self::EmptyDatabase => Some(
355 "No tables were found to replicate. This could be because \
356 the user does not have privileges on the intended tables."
357 .into(),
358 ),
359 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
360 "Remove the {} option, as no tables are being added.",
361 option
362 )),
363 Self::InvalidConnection(e) => e.hint(),
364 _ => None,
365 }
366 }
367}
368
369#[derive(Debug, Clone, thiserror::Error)]
371pub enum SqlServerSourcePurificationError {
372 #[error("{0} is not a SQL SERVER CONNECTION")]
373 NotSqlServerConnection(FullItemName),
374 #[error("CREATE SOURCE specifies DETAILS option")]
375 UserSpecifiedDetails,
376 #[error("{0} option is unnecessary when no tables are added")]
377 UnnecessaryOptionsWithoutReferences(String),
378 #[error("missing TABLES specification")]
379 RequiresExternalReferences,
380 #[error("{option_name} refers to table not currently being added")]
381 DanglingColumns {
382 option_name: String,
383 items: Vec<UnresolvedItemName>,
384 },
385 #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
386 MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
387 #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
388 UnsupportedColumn {
389 schema_name: Arc<str>,
390 tbl_name: Arc<str>,
391 col_name: Arc<str>,
392 col_type: Arc<str>,
393 context: String,
394 },
395 #[error("Table {tbl_name} had all columns excluded")]
396 AllColumnsExcluded { tbl_name: Arc<str> },
397 #[error("No tables found for provided reference")]
398 NoTables,
399 #[error("programming error: {0}")]
400 ProgrammingError(String),
401 #[error("No start_lsn found for capture instance {0}")]
402 NoStartLsn(String),
403 #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
404 CdcMissingColumns {
405 capture_instance: Arc<str>,
406 col_names: Vec<Arc<str>>,
407 },
408}
409
410impl SqlServerSourcePurificationError {
411 pub fn detail(&self) -> Option<String> {
412 match self {
413 Self::DanglingColumns {
414 option_name: _,
415 items,
416 } => Some(format!(
417 "the following columns are referenced but not added: {}",
418 itertools::join(items, ", ")
419 )),
420 Self::UnsupportedColumn { context, .. } => Some(context.clone()),
421 _ => None,
422 }
423 }
424
425 pub fn hint(&self) -> Option<String> {
426 match self {
427 Self::RequiresExternalReferences => {
428 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
429 }
430 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
431 "Remove the {} option, as no tables are being added.",
432 option
433 )),
434 Self::NoTables => Some(
435 "No tables were found to replicate. This could be because \
436 the user does not have privileges on the intended tables."
437 .into(),
438 ),
439 Self::UnsupportedColumn { .. } => {
440 Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
441 }
442 _ => None,
443 }
444 }
445}