1use std::sync::Arc;
11
12use mz_ccsr::ListError;
13use mz_repr::adt::system::Oid;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{ExternalReferences, UnresolvedItemName};
16use mz_storage_types::errors::{ContextCreationError, CsrConnectError};
17
18use crate::names::{FullItemName, PartialItemName};
19
20#[derive(Debug, Clone, thiserror::Error)]
22pub enum PgSourcePurificationError {
23 #[error("CREATE SOURCE specifies DETAILS option")]
24 UserSpecifiedDetails,
25 #[error("{0} option is unnecessary when no tables are added")]
26 UnnecessaryOptionsWithoutReferences(String),
27 #[error("PUBLICATION {0} is empty")]
28 EmptyPublication(String),
29 #[error("database {database} missing referenced schemas")]
30 DatabaseMissingFilteredSchemas {
31 database: String,
32 schemas: Vec<String>,
33 },
34 #[error("missing TABLES specification")]
35 RequiresExternalReferences,
36 #[error("insufficient privileges")]
37 UserLacksUsageOnSchemas { user: String, schemas: Vec<String> },
38 #[error("insufficient privileges")]
39 UserLacksSelectOnTables { user: String, tables: Vec<String> },
40 #[error("referenced items not tables with REPLICA IDENTITY FULL")]
41 NotTablesWReplicaIdentityFull { items: Vec<String> },
42 #[error("TEXT COLUMNS refers to table not currently being added")]
43 DanglingTextColumns { items: Vec<PartialItemName> },
44 #[error("referenced tables use unsupported types")]
45 UnrecognizedTypes { cols: Vec<(String, Oid)> },
46 #[error("{0} is not a POSTGRES CONNECTION")]
47 NotPgConnection(FullItemName),
48 #[error("{0} is not a YUGABYTE CONNECTION")]
49 NotYugabyteConnection(FullItemName),
50 #[error("CONNECTION must specify PUBLICATION")]
51 ConnectionMissingPublication,
52 #[error("PostgreSQL server has insufficient number of replication slots available")]
53 InsufficientReplicationSlotsAvailable { count: usize },
54 #[error("server must have wal_level >= logical, but has {wal_level}")]
55 InsufficientWalLevel {
56 wal_level: mz_postgres_util::replication::WalLevel,
57 },
58 #[error("replication disabled on server")]
59 ReplicationDisabled,
60}
61
62impl PgSourcePurificationError {
63 pub fn detail(&self) -> Option<String> {
64 match self {
65 Self::DanglingTextColumns { items } => Some(format!(
66 "the following tables are referenced but not added: {}",
67 itertools::join(items, ", ")
68 )),
69 Self::DatabaseMissingFilteredSchemas {
70 database: _,
71 schemas,
72 } => Some(format!(
73 "missing schemas: {}",
74 itertools::join(schemas.iter(), ", ")
75 )),
76 Self::UserLacksUsageOnSchemas { user, schemas } => Some(format!(
77 "user {} lacks USAGE privileges for schemas {}",
78 user,
79 schemas.join(", ")
80 )),
81 Self::UserLacksSelectOnTables { user, tables } => Some(format!(
82 "user {} lacks SELECT privileges for tables {}",
83 user,
84 tables.join(", ")
85 )),
86 Self::NotTablesWReplicaIdentityFull { items } => {
87 Some(format!("referenced items: {}", items.join(", ")))
88 }
89 Self::UnrecognizedTypes { cols } => Some(format!(
90 "the following columns contain unsupported types:\n{}",
91 itertools::join(
92 cols.into_iter()
93 .map(|(col, Oid(oid))| format!("{} (OID {})", col, oid)),
94 "\n"
95 )
96 )),
97 Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
98 "executing this statement requires {} replication slot{}",
99 count,
100 if *count == 1 { "" } else { "s" }
101 )),
102 _ => None,
103 }
104 }
105
106 pub fn hint(&self) -> Option<String> {
107 match self {
108 Self::UserSpecifiedDetails => Some(
109 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
110 .into(),
111 ),
112 Self::RequiresExternalReferences => {
113 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
114 }
115 Self::UnrecognizedTypes {
116 cols: _,
117 } => Some(
118 "Use the TEXT COLUMNS option naming the listed columns, and Materialize can ingest their values \
119 as text."
120 .into(),
121 ),
122 Self::InsufficientReplicationSlotsAvailable { .. } => Some(
123 "you might be able to wait for other sources to finish snapshotting and try again".into()
124 ),
125 Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
126 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
127 "Remove the {} option, as no tables are being added.",
128 option
129 )),
130 _ => None,
131 }
132 }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
137pub enum KafkaSourcePurificationError {
138 #[error("{} is only valid for multi-output sources", .0.to_ast_string_simple())]
139 ReferencedSubsources(ExternalReferences),
140 #[error("KAFKA CONNECTION without TOPIC")]
141 ConnectionMissingTopic,
142 #[error("{0} is not a KAFKA CONNECTION")]
143 NotKafkaConnection(FullItemName),
144 #[error("failed to create and connect Kafka consumer")]
145 KafkaConsumerError(String),
146 #[error("Referenced kafka connection uses a different topic '{0}' than specified: '{1}'")]
147 WrongKafkaTopic(String, UnresolvedItemName),
148}
149
150impl KafkaSourcePurificationError {
151 pub fn detail(&self) -> Option<String> {
152 match self {
153 Self::KafkaConsumerError(e) => Some(e.clone()),
154 _ => None,
155 }
156 }
157
158 pub fn hint(&self) -> Option<String> {
159 None
160 }
161}
162
163#[derive(Debug, Clone, thiserror::Error)]
165pub enum LoadGeneratorSourcePurificationError {
166 #[error("FOR ALL TABLES is only valid for multi-output sources")]
167 ForAllTables,
168 #[error("FOR SCHEMAS (..) unsupported")]
169 ForSchemas,
170 #[error("FOR TABLES (..) unsupported")]
171 ForTables,
172 #[error("multi-output sources require a FOR TABLES (..) or FOR ALL TABLES statement")]
173 MultiOutputRequiresForAllTables,
174 #[error("multi-output sources require an external reference")]
175 MultiOutputRequiresExternalReference,
176 #[error("Referenced load generator is different '{0}' than specified: '{1}'")]
177 WrongLoadGenerator(String, UnresolvedItemName),
178}
179
180impl LoadGeneratorSourcePurificationError {
181 pub fn detail(&self) -> Option<String> {
182 match self {
183 _ => None,
184 }
185 }
186
187 pub fn hint(&self) -> Option<String> {
188 match self {
189 _ => None,
190 }
191 }
192}
193
194#[derive(Debug, Clone, thiserror::Error)]
196pub enum KafkaSinkPurificationError {
197 #[error("{0} is not a KAFKA CONNECTION")]
198 NotKafkaConnection(FullItemName),
199 #[error("admin client errored")]
200 AdminClientError(Arc<ContextCreationError>),
201 #[error("zero brokers discovered in metadata request")]
202 ZeroBrokers,
203}
204
205impl KafkaSinkPurificationError {
206 pub fn detail(&self) -> Option<String> {
207 match self {
208 Self::AdminClientError(e) => Some(e.to_string_with_causes()),
209 _ => None,
210 }
211 }
212
213 pub fn hint(&self) -> Option<String> {
214 None
215 }
216}
217
218use mz_ore::error::ErrorExt;
219
220#[derive(Debug, Clone, thiserror::Error)]
222pub enum CsrPurificationError {
223 #[error("{0} is not a CONFLUENT SCHEMA REGISTRY CONNECTION")]
224 NotCsrConnection(FullItemName),
225 #[error("client errored")]
226 ClientError(Arc<CsrConnectError>),
227 #[error("list subjects failed")]
228 ListSubjectsError(Arc<ListError>),
229}
230
231impl CsrPurificationError {
232 pub fn detail(&self) -> Option<String> {
233 match self {
234 Self::ClientError(e) => Some(e.to_string_with_causes()),
235 Self::ListSubjectsError(e) => Some(e.to_string_with_causes()),
236 _ => None,
237 }
238 }
239
240 pub fn hint(&self) -> Option<String> {
241 None
242 }
243}
244
245#[derive(Debug, Clone, thiserror::Error)]
247pub enum MySqlSourcePurificationError {
248 #[error("User lacks required MySQL privileges")]
249 UserLacksPrivileges(Vec<(String, String)>),
250 #[error("CREATE SOURCE specifies DETAILS option")]
251 UserSpecifiedDetails,
252 #[error("{0} option is unnecessary when no tables are added")]
253 UnnecessaryOptionsWithoutReferences(String),
254 #[error("{0} is not a MYSQL CONNECTION")]
255 NotMySqlConnection(FullItemName),
256 #[error("Invalid MySQL system replication settings")]
257 ReplicationSettingsError(Vec<(String, String, String)>),
258 #[error("referenced tables use unsupported types")]
259 UnrecognizedTypes { cols: Vec<(String, String, String)> },
260 #[error("duplicated column name references in table {0}: {1:?}")]
261 DuplicatedColumnNames(String, Vec<String>),
262 #[error("{option_name} refers to table not currently being added")]
263 DanglingColumns {
264 option_name: String,
265 items: Vec<UnresolvedItemName>,
266 },
267 #[error("Invalid MySQL table reference: {0}")]
268 InvalidTableReference(String),
269 #[error("No tables found for provided reference")]
270 EmptyDatabase,
271 #[error("missing TABLES specification")]
272 RequiresExternalReferences,
273 #[error("No tables found in referenced schemas")]
274 NoTablesFoundForSchemas(Vec<String>),
275}
276
277impl MySqlSourcePurificationError {
278 pub fn detail(&self) -> Option<String> {
279 match self {
280 Self::UserLacksPrivileges(missing) => Some(format!(
281 "Missing MySQL privileges: {}",
282 itertools::join(
283 missing
284 .iter()
285 .map(|(privilege, table)| format!("'{}' on '{}'", privilege, table)),
286 ", "
287 )
288 )),
289 Self::DanglingColumns {
290 option_name: _,
291 items,
292 } => Some(format!(
293 "the following columns are referenced but not added: {}",
294 itertools::join(items, ", ")
295 )),
296 Self::ReplicationSettingsError(settings) => Some(format!(
297 "Invalid MySQL system replication settings: {}",
298 itertools::join(
299 settings.iter().map(|(setting, expected, actual)| format!(
300 "{}: expected {}, got {}",
301 setting, expected, actual
302 )),
303 "; "
304 )
305 )),
306 Self::UnrecognizedTypes { cols } => Some(format!(
307 "the following columns contain unsupported types:\n{}",
308 itertools::join(
309 cols.into_iter().map(|(table, column, data_type)| format!(
310 "'{}' for {}.{}",
311 data_type, column, table
312 )),
313 "\n"
314 )
315 )),
316 Self::NoTablesFoundForSchemas(schemas) => Some(format!(
317 "missing schemas: {}",
318 itertools::join(schemas.iter(), ", ")
319 )),
320 _ => None,
321 }
322 }
323
324 pub fn hint(&self) -> Option<String> {
325 match self {
326 Self::UserSpecifiedDetails => Some(
327 "If trying to use the output of SHOW CREATE SOURCE, remove the DETAILS option."
328 .into(),
329 ),
330 Self::ReplicationSettingsError(_) => {
331 Some("Set the necessary MySQL database system settings.".into())
332 }
333 Self::RequiresExternalReferences => {
334 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
335 }
336 Self::InvalidTableReference(_) => Some(
337 "Specify tables names as SCHEMA_NAME.TABLE_NAME in a FOR TABLES (..) clause".into(),
338 ),
339 Self::UnrecognizedTypes { cols: _ } => Some(
340 "Check the docs -- some types can be supported using the TEXT COLUMNS option to \
341 ingest their values as text, or ignored using EXCLUDE COLUMNS."
342 .into(),
343 ),
344 Self::EmptyDatabase => Some(
345 "No tables were found to replicate. This could be because \
346 the user does not have privileges on the intended tables."
347 .into(),
348 ),
349 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
350 "Remove the {} option, as no tables are being added.",
351 option
352 )),
353 _ => None,
354 }
355 }
356}
357
358#[derive(Debug, Clone, thiserror::Error)]
360pub enum SqlServerSourcePurificationError {
361 #[error("{0} is not a SQL SERVER CONNECTION")]
362 NotSqlServerConnection(FullItemName),
363 #[error("CREATE SOURCE specifies DETAILS option")]
364 UserSpecifiedDetails,
365 #[error("{0} option is unnecessary when no tables are added")]
366 UnnecessaryOptionsWithoutReferences(String),
367 #[error("Invalid SQL Server system replication settings")]
368 ReplicationSettingsError(Vec<(String, String, String)>),
369 #[error("missing TABLES specification")]
370 RequiresExternalReferences,
371 #[error("{option_name} refers to table not currently being added")]
372 DanglingColumns {
373 option_name: String,
374 items: Vec<UnresolvedItemName>,
375 },
376 #[error("column {schema_name}.{tbl_name}.{col_name} of type {col_type} is not supported")]
377 UnsupportedColumn {
378 schema_name: Arc<str>,
379 tbl_name: Arc<str>,
380 col_name: Arc<str>,
381 col_type: Arc<str>,
382 },
383 #[error("No tables found for provided reference")]
384 NoTables,
385 #[error("programming error: {0}")]
386 ProgrammingError(String),
387}
388
389impl SqlServerSourcePurificationError {
390 pub fn detail(&self) -> Option<String> {
391 match self {
392 Self::ReplicationSettingsError(settings) => Some(format!(
393 "Invalid SQL Server system replication settings: {}",
394 itertools::join(
395 settings.iter().map(|(setting, expected, actual)| format!(
396 "{}: expected {}, got {}",
397 setting, expected, actual
398 )),
399 "; "
400 )
401 )),
402 Self::DanglingColumns {
403 option_name: _,
404 items,
405 } => Some(format!(
406 "the following columns are referenced but not added: {}",
407 itertools::join(items, ", ")
408 )),
409 _ => None,
410 }
411 }
412
413 pub fn hint(&self) -> Option<String> {
414 match self {
415 Self::RequiresExternalReferences => {
416 Some("provide a FOR TABLES (..), FOR SCHEMAS (..), or FOR ALL TABLES clause".into())
417 }
418 Self::UnnecessaryOptionsWithoutReferences(option) => Some(format!(
419 "Remove the {} option, as no tables are being added.",
420 option
421 )),
422 Self::NoTables => Some(
423 "No tables were found to replicate. This could be because \
424 the user does not have privileges on the intended tables."
425 .into(),
426 ),
427 Self::UnsupportedColumn { .. } => {
428 Some("Use EXCLUDE COLUMNS (...) to exclude a column from this source".into())
429 }
430 _ => None,
431 }
432 }
433}