1use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::connections::{
17 MySqlConnectionValidationError, PostgresConnectionValidationError,
18};
19use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
20
21use crate::names::{FullItemName, PartialItemName};
22
23#[derive(Debug, Clone, thiserror::Error)]
25pub enum PgSourcePurificationError {
26 #[error("CREATE SOURCE specifies DETAILS option")]
27 UserSpecifiedDetails,
28 #[error("{0} option is unnecessary when no tables are added")]
29 UnnecessaryOptionsWithoutReferences(String),
30 #[error("PUBLICATION {0} is empty")]
31 EmptyPublication(String),
32 #[error("database {database} missing referenced schemas")]
33 DatabaseMissingFilteredSchemas {
34 database: String,
35 schemas: Vec<String>,
36 },
37 #[error("missing TABLES specification")]
38 RequiresExternalReferences,
39 #[error("insufficient privileges")]
40 UserLacksUsageOnSchemas { schemas: Vec<String> },
41 #[error("insufficient privileges")]
42 UserLacksSelectOnTables { tables: Vec<String> },
43 #[error("referenced items not tables with REPLICA IDENTITY FULL")]
44 NotTablesWReplicaIdentityFull { items: Vec<String> },
45 #[error("TEXT COLUMNS refers to table not currently being added")]
46 DanglingTextColumns { items: Vec<PartialItemName> },
47 #[error("referenced tables use unsupported types")]
48 UnrecognizedTypes { cols: Vec<(String, Oid)> },
49 #[error("{0} is not a POSTGRES CONNECTION")]
50 NotPgConnection(FullItemName),
51 #[error("CONNECTION must specify PUBLICATION")]
52 ConnectionMissingPublication,
53 #[error(transparent)]
54 InvalidConnection(PostgresConnectionValidationError),
55}
56
57impl PgSourcePurificationError {
58 pub fn detail(&self) -> Option<String> {
59 match self {
60 Self::DanglingTextColumns { items } => Some(format!(
61 "the following tables are referenced but not added: {}",
62 itertools::join(items, ", ")
63 )),
64 Self::DatabaseMissingFilteredSchemas {
65 database: _,
66 schemas,
67 } => Some(format!(
68 "missing schemas: {}",
69 itertools::join(schemas.iter(), ", ")
70 )),
71 Self::UserLacksUsageOnSchemas { schemas } => Some(format!(
72 "user lacks USAGE privileges for schemas {}",
73 schemas.join(", ")
74 )),
75 Self::UserLacksSelectOnTables { tables } => Some(format!(
76 "user lacks SELECT privileges for tables {}",
77 tables.join(", ")
78 )),
79 Self::NotTablesWReplicaIdentityFull { items } => {
80 Some(format!("referenced items: {}", items.join(", ")))
81 }
82 Self::UnrecognizedTypes { cols } => Some(format!(
83 "the following columns contain unsupported types:\n{}",
84 itertools::join(
85 cols.into_iter()
86 .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
87 "\n"
88 )
89 )),
90 Self::InvalidConnection(e) => e.detail(),
91 _ => None,
92 }
93 }
94
95 pub fn hint(&self) -> Option<String> {
96 match self {
97 Self::UserSpecifiedDetails => Some(
98 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
99 .into(),
100 ),
101 Self::RequiresExternalReferences => {
102 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
103 }
104 Self::UnrecognizedTypes {
105 cols: _,
106 } => Some(
107 "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
108 as text."
109 .into(),
110 ),
111 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
112 "Remove the {} option, as no tables are being added.",
113 option
114 )),
115 Self::InvalidConnection(e) => e.hint(),
116 _ => None,
117 }
118 }
119}
120
121#[derive(Debug, Clone, thiserror::Error)]
123pub enum KafkaSourcePurificationError {
124 #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
125 ReferencedSubsources(ExternalReferences),
126 #[error("KAFKA CONNECTION without TOPIC")]
127 ConnectionMissingTopic,
128 #[error("{0} is not a KAFKA CONNECTION")]
129 NotKafkaConnection(FullItemName),
130 #[error("failed to create and connect Kafka consumer")]
131 KafkaConsumerError(String),
132 #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
133 WrongKafkaTopic(String, UnresolvedItemName),
134}
135
136impl KafkaSourcePurificationError {
137 pub fn detail(&self) -> Option<String> {
138 match self {
139 Self::KafkaConsumerError(e) => Some(e.clone()),
140 _ => None,
141 }
142 }
143
144 pub fn hint(&self) -> Option<String> {
145 None
146 }
147}
148
149#[derive(Debug, Clone, thiserror::Error)]
151pub enum LoadGeneratorSourcePurificationError {
152 #[error("FOR ALL TABLES is only valid for multi-output sources")]
153 ForAllTables,
154 #[error("FOR SCHEMAS (..) unsupported")]
155 ForSchemas,
156 #[error("FOR TABLES (..) unsupported")]
157 ForTables,
158 #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
159 MultiOutputRequiresForAllTables,
160 #[error("multi-output sources require an external reference")]
161 MultiOutputRequiresExternalReference,
162 #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
163 WrongLoadGenerator(String, UnresolvedItemName),
164}
165
166impl LoadGeneratorSourcePurificationError {
167 pub fn detail(&self) -> Option<String> {
168 match self {
169 _ => None,
170 }
171 }
172
173 pub fn hint(&self) -> Option<String> {
174 match self {
175 _ => None,
176 }
177 }
178}
179
180#[derive(Debug, Clone, thiserror::Error)]
182pub enum KafkaSinkPurificationError {
183 #[error("{0} is not a KAFKA CONNECTION")]
184 NotKafkaConnection(FullItemName),
185 #[error("admin client errored")]
186 AdminClientError(Arc<ContextCreationError>),
187 #[error("zero brokers discovered in metadata request")]
188 ZeroBrokers,
189}
190
191impl KafkaSinkPurificationError {
192 pub fn detail(&self) -> Option<String> {
193 match self {
194 Self::AdminClientError(e) => Some(e.to_string_with_causes()),
195 _ => None,
196 }
197 }
198
199 pub fn hint(&self) -> Option<String> {
200 None
201 }
202}
203
204use mz_ore::error::ErrorExt;
205
206#[derive(Debug, Clone, thiserror::Error)]
208pub enum CsrPurificationError {
209 #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
210 NotCsrConnection(FullItemName),
211 #[error("client errored")]
212 ClientError(Arc<CsrConnectError>),
213 #[error("list subjects failed")]
214 ListSubjectsError(Arc<ListError>),
215}
216
217impl CsrPurificationError {
218 pub fn detail(&self) -> Option<String> {
219 match self {
220 Self::ClientError(e) => Some(e.to_string_with_causes()),
221 Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
222 _ => None,
223 }
224 }
225
226 pub fn hint(&self) -> Option<String> {
227 None
228 }
229}
230
231#[derive(Debug, thiserror::Error)]
233pub enum MySqlSourcePurificationError {
234 #[error("User lacks required MySQL privileges")]
235 UserLacksPrivileges(Vec<(String, String)>),
236 #[error("CREATE SOURCE specifies DETAILS option")]
237 UserSpecifiedDetails,
238 #[error("{0} option is unnecessary when no tables are added")]
239 UnnecessaryOptionsWithoutReferences(String),
240 #[error("{0} is not a MYSQL CONNECTION")]
241 NotMySqlConnection(FullItemName),
242 #[error("referenced tables use unsupported types")]
243 UnrecognizedTypes { cols: Vec<(String, String, String)> },
244 #[error("duplicated column name references in table {0}: {1:?}")]
245 DuplicatedColumnNames(String, Vec<String>),
246 #[error("{option_name} refers to table not currently being added")]
247 DanglingColumns {
248 option_name: String,
249 items: Vec<UnresolvedItemName>,
250 },
251 #[error("Invalid MySQL table reference: {0}")]
252 InvalidTableReference(String),
253 #[error("No tables found for provided reference")]
254 EmptyDatabase,
255 #[error("missing TABLES specification")]
256 RequiresExternalReferences,
257 #[error("No tables found in referenced schemas")]
258 NoTablesFoundForSchemas(Vec<String>),
259 #[error(transparent)]
260 InvalidConnection(#[from] MySqlConnectionValidationError),
261}
262
263impl MySqlSourcePurificationError {
264 pub fn detail(&self) -> Option<String> {
265 match self {
266 Self::UserLacksPrivileges(missing) => Some(format!(
267 "Missing MySQL privileges: {}",
268 itertools::join(
269 missing
270 .iter()
271 .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
272 ", "
273 )
274 )),
275 Self::DanglingColumns {
276 option_name: _,
277 items,
278 } => Some(format!(
279 "the following columns are referenced but not added: {}",
280 itertools::join(items, ", ")
281 )),
282 Self::UnrecognizedTypes { cols } => Some(format!(
283 "the following columns contain unsupported types:\n{}",
284 itertools::join(
285 cols.into_iter().map(|(table, column, data_type)| format!(
286 "'{}' for {}.{}",
287 data_type, column, table
288 )),
289 "\n"
290 )
291 )),
292 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
293 "missing schemas: {}",
294 itertools::join(schemas.iter(), ", ")
295 )),
296 Self::InvalidConnection(e) => e.detail(),
297 _ => None,
298 }
299 }
300
301 pub fn hint(&self) -> Option<String> {
302 match self {
303 Self::UserSpecifiedDetails => Some(
304 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
305 .into(),
306 ),
307 Self::RequiresExternalReferences => {
308 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
309 }
310 Self::InvalidTableReference(_) => Some(
311 "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
312 ),
313 Self::UnrecognizedTypes { cols: _ } => Some(
314 "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
315 ingest their values as text, or ignored using EXCLUDE COLUMNS."
316 .into(),
317 ),
318 Self::EmptyDatabase => Some(
319 "No tables were found to replicate. This could be because \
320 the user does not have privileges on the intended tables."
321 .into(),
322 ),
323 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
324 "Remove the {} option, as no tables are being added.",
325 option
326 )),
327 Self::InvalidConnection(e) => e.hint(),
328 _ => None,
329 }
330 }
331}
332
333#[derive(Debug, Clone, thiserror::Error)]
335pub enum SqlServerSourcePurificationError {
336 #[error("{0} is not a SQL SERVER CONNECTION")]
337 NotSqlServerConnection(FullItemName),
338 #[error("CREATE SOURCE specifies DETAILS option")]
339 UserSpecifiedDetails,
340 #[error("{0} option is unnecessary when no tables are added")]
341 UnnecessaryOptionsWithoutReferences(String),
342 #[error("missing TABLES specification")]
343 RequiresExternalReferences,
344 #[error("{option_name} refers to table not currently being added")]
345 DanglingColumns {
346 option_name: String,
347 items: Vec<UnresolvedItemName>,
348 },
349 #[error("found multiple primary keys for a table. constraints {constraint_names:?}")]
350 MultiplePrimaryKeys { constraint_names: Vec<Arc<str>> },
351 #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
352 UnsupportedColumn {
353 schema_name: Arc<str>,
354 tbl_name: Arc<str>,
355 col_name: Arc<str>,
356 col_type: Arc<str>,
357 },
358 #[error("Table {tbl_name} had all columns excluded")]
359 AllColumnsExcluded { tbl_name: Arc<str> },
360 #[error("No tables found for provided reference")]
361 NoTables,
362 #[error("programming error: {0}")]
363 ProgrammingError(String),
364 #[error("No start_lsn found for capture instance {0}")]
365 NoStartLsn(String),
366 #[error("Capture instance {capture_instance} has missing columns: {col_names:?}")]
367 CdcMissingColumns {
368 capture_instance: Arc<str>,
369 col_names: Vec<Arc<str>>,
370 },
371}
372
373impl SqlServerSourcePurificationError {
374 pub fn detail(&self) -> Option<String> {
375 match self {
376 Self::DanglingColumns {
377 option_name: _,
378 items,
379 } => Some(format!(
380 "the following columns are referenced but not added: {}",
381 itertools::join(items, ", ")
382 )),
383 _ => None,
384 }
385 }
386
387 pub fn hint(&self) -> Option<String> {
388 match self {
389 Self::RequiresExternalReferences => {
390 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
391 }
392 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
393 "Remove the {} option, as no tables are being added.",
394 option
395 )),
396 Self::NoTables => Some(
397 "No tables were found to replicate. This could be because \
398 the user does not have privileges on the intended tables."
399 .into(),
400 ),
401 Self::UnsupportedColumn { .. } => {
402 Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
403 }
404 _ => None,
405 }
406 }
407}