1use std::fmt;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_controller::clusters::ClusterStatus;
15use mz_orchestrator::{OfflineReason, ServiceStatus};
16use mz_ore::str::{StrExt, separated};
17use mz_pgwire_common::{ErrorResponse, Severity};
18use mz_repr::adt::mz_acl_item::AclMode;
19use mz_repr::strconv;
20use mz_sql::ast::NoticeSeverity;
21use mz_sql::catalog::ErrorMessageObjectDescription;
22use mz_sql::plan::PlanNotice;
23use mz_sql::session::vars::IsolationLevel;
24use tokio_postgres::error::SqlState;
25
26use crate::TimestampExplanation;
27
28#[derive(Clone, Debug)]
33pub enum AdapterNotice {
34 DatabaseAlreadyExists {
35 name: String,
36 },
37 SchemaAlreadyExists {
38 name: String,
39 },
40 TableAlreadyExists {
41 name: String,
42 },
43 ObjectAlreadyExists {
44 name: String,
45 ty: &'static str,
46 },
47 DatabaseDoesNotExist {
48 name: String,
49 },
50 ClusterDoesNotExist {
51 name: String,
52 },
53 DefaultClusterDoesNotExist {
54 name: String,
55 kind: &'static str,
56 suggested_action: String,
57 },
58 NoResolvableSearchPathSchema {
59 search_path: Vec<String>,
60 },
61 ExistingTransactionInProgress,
62 ExplicitTransactionControlInImplicitTransaction,
63 UserRequested {
64 severity: NoticeSeverity,
65 },
66 ClusterReplicaStatusChanged {
67 cluster: String,
68 replica: String,
69 status: ClusterStatus,
70 time: DateTime<Utc>,
71 },
72 CascadeDroppedObject {
73 objects: Vec<String>,
74 },
75 DroppedActiveDatabase {
76 name: String,
77 },
78 DroppedActiveCluster {
79 name: String,
80 },
81 QueryTimestamp {
82 explanation: TimestampExplanation,
83 },
84 EqualSubscribeBounds {
85 bound: mz_repr::Timestamp,
86 },
87 QueryTrace {
88 trace_id: opentelemetry::trace::TraceId,
89 },
90 UnimplementedIsolationLevel {
91 isolation_level: String,
92 },
93 StrongSessionSerializable,
94 BadStartupSetting {
95 name: String,
96 reason: String,
97 },
98 RbacUserDisabled,
99 RoleMembershipAlreadyExists {
100 role_name: String,
101 member_name: String,
102 },
103 RoleMembershipDoesNotExists {
104 role_name: String,
105 member_name: String,
106 },
107 AutoRunOnCatalogServerCluster,
108 AlterIndexOwner {
109 name: String,
110 },
111 CannotRevoke {
112 object_description: ErrorMessageObjectDescription,
113 },
114 NonApplicablePrivilegeTypes {
115 non_applicable_privileges: AclMode,
116 object_description: ErrorMessageObjectDescription,
117 },
118 PlanNotice(PlanNotice),
119 UnknownSessionDatabase(String),
120 OptimizerNotice {
121 notice: String,
122 hint: String,
123 },
124 WebhookSourceCreated {
125 url: url::Url,
126 },
127 DroppedInUseIndex(DroppedInUseIndex),
128 PerReplicaLogRead {
129 log_names: Vec<String>,
130 },
131 VarDefaultUpdated {
132 role: Option<String>,
133 var_name: Option<String>,
134 },
135 Welcome(String),
136 PlanInsights(String),
137 IntrospectionClusterUsage,
138 AutoRouteIntrospectionQueriesUsage,
139 OidcGroupSyncUnmatchedGroup {
141 group: String,
142 },
143 OidcGroupSyncReservedRole {
145 group: String,
146 },
147 OidcGroupSyncError {
149 message: String,
150 },
151}
152
153impl AdapterNotice {
154 pub fn into_response(self) -> ErrorResponse {
155 ErrorResponse {
156 severity: self.severity(),
157 code: self.code(),
158 message: self.to_string(),
159 detail: self.detail(),
160 hint: self.hint(),
161 position: None,
162 }
163 }
164
165 pub fn severity(&self) -> Severity {
167 match self {
168 AdapterNotice::DatabaseAlreadyExists { .. } => Severity::Notice,
169 AdapterNotice::SchemaAlreadyExists { .. } => Severity::Notice,
170 AdapterNotice::TableAlreadyExists { .. } => Severity::Notice,
171 AdapterNotice::ObjectAlreadyExists { .. } => Severity::Notice,
172 AdapterNotice::DatabaseDoesNotExist { .. } => Severity::Notice,
173 AdapterNotice::ClusterDoesNotExist { .. } => Severity::Notice,
174 AdapterNotice::DefaultClusterDoesNotExist { .. } => Severity::Notice,
175 AdapterNotice::NoResolvableSearchPathSchema { .. } => Severity::Notice,
176 AdapterNotice::ExistingTransactionInProgress => Severity::Warning,
177 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => Severity::Warning,
178 AdapterNotice::UserRequested { severity } => match severity {
179 NoticeSeverity::Debug => Severity::Debug,
180 NoticeSeverity::Info => Severity::Info,
181 NoticeSeverity::Log => Severity::Log,
182 NoticeSeverity::Notice => Severity::Notice,
183 NoticeSeverity::Warning => Severity::Warning,
184 },
185 AdapterNotice::ClusterReplicaStatusChanged { .. } => Severity::Notice,
186 AdapterNotice::CascadeDroppedObject { .. } => Severity::Notice,
187 AdapterNotice::DroppedActiveDatabase { .. } => Severity::Notice,
188 AdapterNotice::DroppedActiveCluster { .. } => Severity::Notice,
189 AdapterNotice::QueryTimestamp { .. } => Severity::Notice,
190 AdapterNotice::EqualSubscribeBounds { .. } => Severity::Notice,
191 AdapterNotice::QueryTrace { .. } => Severity::Notice,
192 AdapterNotice::UnimplementedIsolationLevel { .. } => Severity::Notice,
193 AdapterNotice::StrongSessionSerializable => Severity::Notice,
194 AdapterNotice::BadStartupSetting { .. } => Severity::Notice,
195 AdapterNotice::RbacUserDisabled => Severity::Notice,
196 AdapterNotice::RoleMembershipAlreadyExists { .. } => Severity::Notice,
197 AdapterNotice::RoleMembershipDoesNotExists { .. } => Severity::Warning,
198 AdapterNotice::AutoRunOnCatalogServerCluster => Severity::Debug,
199 AdapterNotice::AlterIndexOwner { .. } => Severity::Warning,
200 AdapterNotice::CannotRevoke { .. } => Severity::Warning,
201 AdapterNotice::NonApplicablePrivilegeTypes { .. } => Severity::Notice,
202 AdapterNotice::PlanNotice(notice) => match notice {
203 PlanNotice::ObjectDoesNotExist { .. } => Severity::Notice,
204 PlanNotice::ColumnAlreadyExists { .. } => Severity::Notice,
205 PlanNotice::UpsertSinkKeyNotEnforced { .. } => Severity::Warning,
206 PlanNotice::ReplicaDiskOptionDeprecated { .. } => Severity::Notice,
207 },
208 AdapterNotice::UnknownSessionDatabase(_) => Severity::Notice,
209 AdapterNotice::OptimizerNotice { .. } => Severity::Notice,
210 AdapterNotice::WebhookSourceCreated { .. } => Severity::Notice,
211 AdapterNotice::DroppedInUseIndex { .. } => Severity::Notice,
212 AdapterNotice::PerReplicaLogRead { .. } => Severity::Notice,
213 AdapterNotice::VarDefaultUpdated { .. } => Severity::Notice,
214 AdapterNotice::Welcome(_) => Severity::Notice,
215 AdapterNotice::PlanInsights(_) => Severity::Notice,
216 AdapterNotice::IntrospectionClusterUsage => Severity::Warning,
217 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Severity::Warning,
218 AdapterNotice::OidcGroupSyncUnmatchedGroup { .. } => Severity::Notice,
219 AdapterNotice::OidcGroupSyncReservedRole { .. } => Severity::Warning,
220 AdapterNotice::OidcGroupSyncError { .. } => Severity::Warning,
221 }
222 }
223
224 pub fn detail(&self) -> Option<String> {
226 match self {
227 AdapterNotice::PlanNotice(notice) => notice.detail(),
228 AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
229 AdapterNotice::CascadeDroppedObject { objects } => Some(
230 objects
231 .iter()
232 .map(|obj_info| format!("drop cascades to {}", obj_info))
233 .join("\n"),
234 ),
235 _ => None,
236 }
237 }
238
239 pub fn hint(&self) -> Option<String> {
241 match self {
242 AdapterNotice::DatabaseDoesNotExist { name: _ } => Some("Create the database with CREATE DATABASE or pick an extant database with SET DATABASE = name. List available databases with SHOW DATABASES.".into()),
243 AdapterNotice::ClusterDoesNotExist { name: _ } => Some("Create the cluster with CREATE CLUSTER or pick an extant cluster with SET CLUSTER = name. List available clusters with SHOW CLUSTERS.".into()),
244 AdapterNotice::DefaultClusterDoesNotExist {
245 name: _,
246 kind: _,
247 suggested_action,
248 } => Some(suggested_action.clone()),
249 AdapterNotice::NoResolvableSearchPathSchema { search_path: _ } => Some("Create a schema with CREATE SCHEMA or pick an extant schema with SET SCHEMA = name. List available schemas with SHOW SCHEMAS.".into()),
250 AdapterNotice::DroppedActiveDatabase { name: _ } => Some("Choose a new active database by executing SET DATABASE = <name>.".into()),
251 AdapterNotice::DroppedActiveCluster { name: _ } => Some("Choose a new active cluster by executing SET CLUSTER = <name>.".into()),
252 AdapterNotice::ClusterReplicaStatusChanged { status, .. } => {
253 match status {
254 ServiceStatus::Offline(None)
255 | ServiceStatus::Offline(Some(OfflineReason::Initializing)) => Some("The cluster replica may be restarting or going offline.".into()),
256 ServiceStatus::Offline(Some(OfflineReason::OomKilled)) => Some("The cluster replica may have run out of memory and been killed.".into()),
257 ServiceStatus::Online => None,
258 }
259 },
260 AdapterNotice::RbacUserDisabled => Some("To enable RBAC globally run `ALTER SYSTEM SET enable_rbac_checks TO TRUE` as a superuser. TO enable RBAC for just this session run `SET enable_session_rbac_checks TO TRUE`.".into()),
261 AdapterNotice::AlterIndexOwner {name: _} => Some("Change the ownership of the index's relation, instead.".into()),
262 AdapterNotice::UnknownSessionDatabase(_) => Some(
263 "Create the database with CREATE DATABASE \
264 or pick an extant database with SET DATABASE = name. \
265 List available databases with SHOW DATABASES."
266 .into(),
267 ),
268 AdapterNotice::OptimizerNotice { notice: _, hint } => Some(hint.clone()),
269 AdapterNotice::DroppedInUseIndex(..) => Some("To free up the resources used by the index, recreate all the above-mentioned objects.".into()),
270 AdapterNotice::IntrospectionClusterUsage => Some("Use the new name instead.".into()),
271 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Some("Use the new name instead.".into()),
272 _ => None
273 }
274 }
275
276 pub fn code(&self) -> SqlState {
278 match self {
279 AdapterNotice::DatabaseAlreadyExists { .. } => SqlState::DUPLICATE_DATABASE,
280 AdapterNotice::SchemaAlreadyExists { .. } => SqlState::DUPLICATE_SCHEMA,
281 AdapterNotice::TableAlreadyExists { .. } => SqlState::DUPLICATE_TABLE,
282 AdapterNotice::ObjectAlreadyExists { .. } => SqlState::DUPLICATE_OBJECT,
283 AdapterNotice::DatabaseDoesNotExist { .. } => SqlState::from_code("MZ006"),
284 AdapterNotice::ClusterDoesNotExist { .. } => SqlState::from_code("MZ007"),
285 AdapterNotice::NoResolvableSearchPathSchema { .. } => SqlState::from_code("MZ008"),
286 AdapterNotice::ExistingTransactionInProgress => SqlState::ACTIVE_SQL_TRANSACTION,
287 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
288 SqlState::NO_ACTIVE_SQL_TRANSACTION
289 }
290 AdapterNotice::UserRequested { severity } => match severity {
291 NoticeSeverity::Warning => SqlState::WARNING,
292 _ => SqlState::SUCCESSFUL_COMPLETION,
293 },
294 AdapterNotice::ClusterReplicaStatusChanged { .. } => SqlState::SUCCESSFUL_COMPLETION,
295 AdapterNotice::CascadeDroppedObject { .. } => SqlState::SUCCESSFUL_COMPLETION,
296 AdapterNotice::DroppedActiveDatabase { .. } => SqlState::from_code("MZ002"),
297 AdapterNotice::DroppedActiveCluster { .. } => SqlState::from_code("MZ003"),
298 AdapterNotice::QueryTimestamp { .. } => SqlState::SUCCESSFUL_COMPLETION,
299 AdapterNotice::EqualSubscribeBounds { .. } => SqlState::SUCCESSFUL_COMPLETION,
300 AdapterNotice::QueryTrace { .. } => SqlState::SUCCESSFUL_COMPLETION,
301 AdapterNotice::UnimplementedIsolationLevel { .. } => SqlState::SUCCESSFUL_COMPLETION,
302 AdapterNotice::StrongSessionSerializable => SqlState::SUCCESSFUL_COMPLETION,
303 AdapterNotice::BadStartupSetting { .. } => SqlState::SUCCESSFUL_COMPLETION,
304 AdapterNotice::RbacUserDisabled => SqlState::SUCCESSFUL_COMPLETION,
305 AdapterNotice::RoleMembershipAlreadyExists { .. } => SqlState::SUCCESSFUL_COMPLETION,
306 AdapterNotice::RoleMembershipDoesNotExists { .. } => SqlState::WARNING,
307 AdapterNotice::AutoRunOnCatalogServerCluster => SqlState::SUCCESSFUL_COMPLETION,
308 AdapterNotice::AlterIndexOwner { .. } => SqlState::WARNING,
309 AdapterNotice::CannotRevoke { .. } => SqlState::WARNING_PRIVILEGE_NOT_REVOKED,
310 AdapterNotice::NonApplicablePrivilegeTypes { .. } => SqlState::SUCCESSFUL_COMPLETION,
311 AdapterNotice::PlanNotice(plan) => match plan {
312 PlanNotice::ObjectDoesNotExist { .. } => SqlState::UNDEFINED_OBJECT,
313 PlanNotice::ColumnAlreadyExists { .. } => SqlState::DUPLICATE_COLUMN,
314 PlanNotice::UpsertSinkKeyNotEnforced { .. } => SqlState::WARNING,
315 PlanNotice::ReplicaDiskOptionDeprecated { .. } => {
316 SqlState::WARNING_DEPRECATED_FEATURE
317 }
318 },
319 AdapterNotice::UnknownSessionDatabase(_) => SqlState::from_code("MZ004"),
320 AdapterNotice::DefaultClusterDoesNotExist { .. } => SqlState::from_code("MZ005"),
321 AdapterNotice::OptimizerNotice { .. } => SqlState::SUCCESSFUL_COMPLETION,
322 AdapterNotice::DroppedInUseIndex { .. } => SqlState::SUCCESSFUL_COMPLETION,
323 AdapterNotice::WebhookSourceCreated { .. } => SqlState::SUCCESSFUL_COMPLETION,
324 AdapterNotice::PerReplicaLogRead { .. } => SqlState::SUCCESSFUL_COMPLETION,
325 AdapterNotice::VarDefaultUpdated { .. } => SqlState::SUCCESSFUL_COMPLETION,
326 AdapterNotice::Welcome(_) => SqlState::SUCCESSFUL_COMPLETION,
327 AdapterNotice::PlanInsights(_) => SqlState::from_code("MZ001"),
328 AdapterNotice::IntrospectionClusterUsage => SqlState::WARNING,
329 AdapterNotice::AutoRouteIntrospectionQueriesUsage => SqlState::WARNING,
330 AdapterNotice::OidcGroupSyncUnmatchedGroup { .. } => SqlState::SUCCESSFUL_COMPLETION,
331 AdapterNotice::OidcGroupSyncReservedRole { .. } => SqlState::WARNING,
332 AdapterNotice::OidcGroupSyncError { .. } => SqlState::WARNING,
333 }
334 }
335}
336
337impl fmt::Display for AdapterNotice {
338 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
339 match self {
340 AdapterNotice::DatabaseAlreadyExists { name } => {
341 write!(f, "database {} already exists, skipping", name.quoted())
342 }
343 AdapterNotice::SchemaAlreadyExists { name } => {
344 write!(f, "schema {} already exists, skipping", name.quoted())
345 }
346 AdapterNotice::TableAlreadyExists { name } => {
347 write!(f, "table {} already exists, skipping", name.quoted())
348 }
349 AdapterNotice::ObjectAlreadyExists { name, ty } => {
350 write!(f, "{} {} already exists, skipping", ty, name.quoted())
351 }
352 AdapterNotice::DatabaseDoesNotExist { name } => {
353 write!(f, "database {} does not exist", name.quoted())
354 }
355 AdapterNotice::CascadeDroppedObject { objects } => {
356 write!(f, "drop cascades to {} other objects", objects.len())
357 }
358 AdapterNotice::ClusterDoesNotExist { name } => {
359 write!(f, "cluster {} does not exist", name.quoted())
360 }
361 AdapterNotice::DefaultClusterDoesNotExist { kind, name, .. } => {
362 write!(f, "{kind} default cluster {} does not exist", name.quoted())
363 }
364 AdapterNotice::NoResolvableSearchPathSchema { search_path } => {
365 write!(
366 f,
367 "no schema on the search path exists: {}",
368 search_path.join(", ")
369 )
370 }
371 AdapterNotice::ExistingTransactionInProgress => {
372 write!(f, "there is already a transaction in progress")
373 }
374 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
375 write!(f, "there is no transaction in progress")
376 }
377 AdapterNotice::UserRequested { severity } => {
378 write!(f, "raised a test {}", severity.to_string().to_lowercase())
379 }
380 AdapterNotice::ClusterReplicaStatusChanged {
381 cluster,
382 replica,
383 status,
384 time,
385 } => {
386 let mut time_buf = String::new();
387 strconv::format_timestamptz(&mut time_buf, time);
388 write!(
389 f,
390 "cluster replica {}.{} changed status to {} at {}",
391 cluster,
392 replica,
393 status.as_kebab_case_str().quoted(),
394 time_buf,
395 )?;
396 Ok(())
397 }
398 AdapterNotice::DroppedActiveDatabase { name } => {
399 write!(f, "active database {} has been dropped", name.quoted())
400 }
401 AdapterNotice::DroppedActiveCluster { name } => {
402 write!(f, "active cluster {} has been dropped", name.quoted())
403 }
404 AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
405 AdapterNotice::EqualSubscribeBounds { bound } => {
406 write!(
407 f,
408 "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty"
409 )
410 }
411 AdapterNotice::QueryTrace { trace_id } => {
412 write!(f, "trace id: {}", trace_id)
413 }
414 AdapterNotice::UnimplementedIsolationLevel { isolation_level } => {
415 write!(
416 f,
417 "transaction isolation level {isolation_level} is unimplemented, the session will be upgraded to {}",
418 IsolationLevel::Serializable.as_str()
419 )
420 }
421 AdapterNotice::StrongSessionSerializable => {
422 write!(
423 f,
424 "The Strong Session Serializable isolation level may exhibit consistency violations when reading from catalog objects",
425 )
426 }
427 AdapterNotice::BadStartupSetting { name, reason } => {
428 write!(f, "startup setting {name} not set: {reason}")
429 }
430 AdapterNotice::RbacUserDisabled => {
431 write!(
432 f,
433 "RBAC is disabled so no role attributes or object ownership will be considered \
434 when executing statements"
435 )
436 }
437 AdapterNotice::RoleMembershipAlreadyExists {
438 role_name,
439 member_name,
440 } => write!(
441 f,
442 "role \"{member_name}\" is already a member of role \"{role_name}\""
443 ),
444 AdapterNotice::RoleMembershipDoesNotExists {
445 role_name,
446 member_name,
447 } => write!(
448 f,
449 "role \"{member_name}\" is not a member of role \"{role_name}\""
450 ),
451 AdapterNotice::AutoRunOnCatalogServerCluster => write!(
452 f,
453 "query was automatically run on the \"mz_catalog_server\" cluster"
454 ),
455 AdapterNotice::AlterIndexOwner { name } => {
456 write!(f, "cannot change owner of {}", name.quoted())
457 }
458 AdapterNotice::CannotRevoke { object_description } => {
459 write!(f, "no privileges could be revoked for {object_description}")
460 }
461 AdapterNotice::NonApplicablePrivilegeTypes {
462 non_applicable_privileges,
463 object_description,
464 } => {
465 write!(
466 f,
467 "non-applicable privilege types {} for {}",
468 non_applicable_privileges.to_error_string(),
469 object_description,
470 )
471 }
472 AdapterNotice::PlanNotice(plan) => plan.fmt(f),
473 AdapterNotice::UnknownSessionDatabase(name) => {
474 write!(f, "session database {} does not exist", name.quoted())
475 }
476 AdapterNotice::OptimizerNotice { notice, hint: _ } => notice.fmt(f),
477 AdapterNotice::WebhookSourceCreated { url } => {
478 write!(f, "URL to POST data is '{url}'")
479 }
480 AdapterNotice::DroppedInUseIndex(DroppedInUseIndex {
481 index_name,
482 dependant_objects,
483 }) => {
484 write!(
485 f,
486 "The dropped index {index_name} is being used by the following objects: {}. The index is now dropped from the catalog, but it will continue to be maintained and take up resources until all dependent objects are dropped, altered, or Materialize is restarted!",
487 separated(", ", dependant_objects)
488 )
489 }
490 AdapterNotice::PerReplicaLogRead { log_names } => {
491 write!(
492 f,
493 "Queried introspection relations: {}. Unlike other objects in Materialize, results from querying these objects depend on the current values of the `cluster` and `cluster_replica` session variables.",
494 log_names.join(", ")
495 )
496 }
497 AdapterNotice::VarDefaultUpdated { role, var_name } => {
498 let vars = match var_name {
499 Some(name) => format!("variable {} was", name.quoted()),
500 None => "variables were".to_string(),
501 };
502 let target = match role {
503 Some(role_name) => role_name.quoted().to_string(),
504 None => "the system".to_string(),
505 };
506 write!(
507 f,
508 "{vars} updated for {target}, this will have no effect on the current session"
509 )
510 }
511 AdapterNotice::Welcome(message) => message.fmt(f),
512 AdapterNotice::PlanInsights(message) => message.fmt(f),
513 AdapterNotice::IntrospectionClusterUsage => write!(
514 f,
515 "The mz_introspection cluster has been renamed to mz_catalog_server."
516 ),
517 AdapterNotice::AutoRouteIntrospectionQueriesUsage => write!(
518 f,
519 "The auto_route_introspection_queries variable has been renamed to auto_route_catalog_queries."
520 ),
521 AdapterNotice::OidcGroupSyncUnmatchedGroup { group } => {
522 write!(
523 f,
524 "OIDC group \"{}\" has no matching Materialize role, skipping",
525 group
526 )
527 }
528 AdapterNotice::OidcGroupSyncReservedRole { group } => {
529 write!(
530 f,
531 "OIDC group \"{}\" maps to a reserved role name, skipping",
532 group
533 )
534 }
535 AdapterNotice::OidcGroupSyncError { message } => {
536 write!(f, "OIDC group-to-role sync failed: {}", message)
537 }
538 }
539 }
540}
541
542#[derive(Clone, Debug)]
543pub struct DroppedInUseIndex {
544 pub index_name: String,
545 pub dependant_objects: Vec<String>,
546}
547
548impl From<PlanNotice> for AdapterNotice {
549 fn from(notice: PlanNotice) -> AdapterNotice {
550 AdapterNotice::PlanNotice(notice)
551 }
552}