mz_adapter/
notice.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_controller::clusters::ClusterStatus;
15use mz_orchestrator::{OfflineReason, ServiceStatus};
16use mz_ore::str::{StrExt, separated};
17use mz_pgwire_common::{ErrorResponse, Severity};
18use mz_repr::adt::mz_acl_item::AclMode;
19use mz_repr::strconv;
20use mz_sql::ast::NoticeSeverity;
21use mz_sql::catalog::ErrorMessageObjectDescription;
22use mz_sql::plan::PlanNotice;
23use mz_sql::session::vars::IsolationLevel;
24use tokio_postgres::error::SqlState;
25
26use crate::TimestampExplanation;
27
28/// Notices that can occur in the adapter layer.
29///
30/// These are diagnostic warnings or informational messages that are not
31/// severe enough to warrant failing a query entirely.
32#[derive(Clone, Debug)]
33pub enum AdapterNotice {
34    DatabaseAlreadyExists {
35        name: String,
36    },
37    SchemaAlreadyExists {
38        name: String,
39    },
40    TableAlreadyExists {
41        name: String,
42    },
43    ObjectAlreadyExists {
44        name: String,
45        ty: &'static str,
46    },
47    DatabaseDoesNotExist {
48        name: String,
49    },
50    ClusterDoesNotExist {
51        name: String,
52    },
53    DefaultClusterDoesNotExist {
54        name: String,
55        kind: &'static str,
56        suggested_action: String,
57    },
58    NoResolvableSearchPathSchema {
59        search_path: Vec<String>,
60    },
61    ExistingTransactionInProgress,
62    ExplicitTransactionControlInImplicitTransaction,
63    UserRequested {
64        severity: NoticeSeverity,
65    },
66    ClusterReplicaStatusChanged {
67        cluster: String,
68        replica: String,
69        status: ClusterStatus,
70        time: DateTime<Utc>,
71    },
72    CascadeDroppedObject {
73        objects: Vec<String>,
74    },
75    DroppedActiveDatabase {
76        name: String,
77    },
78    DroppedActiveCluster {
79        name: String,
80    },
81    QueryTimestamp {
82        explanation: TimestampExplanation<mz_repr::Timestamp>,
83    },
84    EqualSubscribeBounds {
85        bound: mz_repr::Timestamp,
86    },
87    QueryTrace {
88        trace_id: opentelemetry::trace::TraceId,
89    },
90    UnimplementedIsolationLevel {
91        isolation_level: String,
92    },
93    StrongSessionSerializable,
94    BadStartupSetting {
95        name: String,
96        reason: String,
97    },
98    RbacUserDisabled,
99    RoleMembershipAlreadyExists {
100        role_name: String,
101        member_name: String,
102    },
103    RoleMembershipDoesNotExists {
104        role_name: String,
105        member_name: String,
106    },
107    AutoRunOnCatalogServerCluster,
108    AlterIndexOwner {
109        name: String,
110    },
111    CannotRevoke {
112        object_description: ErrorMessageObjectDescription,
113    },
114    NonApplicablePrivilegeTypes {
115        non_applicable_privileges: AclMode,
116        object_description: ErrorMessageObjectDescription,
117    },
118    PlanNotice(PlanNotice),
119    UnknownSessionDatabase(String),
120    OptimizerNotice {
121        notice: String,
122        hint: String,
123    },
124    WebhookSourceCreated {
125        url: url::Url,
126    },
127    DroppedInUseIndex(DroppedInUseIndex),
128    PerReplicaLogRead {
129        log_names: Vec<String>,
130    },
131    VarDefaultUpdated {
132        role: Option<String>,
133        var_name: Option<String>,
134    },
135    Welcome(String),
136    PlanInsights(String),
137    IntrospectionClusterUsage,
138    AutoRouteIntrospectionQueriesUsage,
139}
140
141impl AdapterNotice {
142    pub fn into_response(self) -> ErrorResponse {
143        ErrorResponse {
144            severity: self.severity(),
145            code: self.code(),
146            message: self.to_string(),
147            detail: self.detail(),
148            hint: self.hint(),
149            position: None,
150        }
151    }
152
153    /// Returns the severity for a notice.
154    pub fn severity(&self) -> Severity {
155        match self {
156            AdapterNotice::DatabaseAlreadyExists { .. } => Severity::Notice,
157            AdapterNotice::SchemaAlreadyExists { .. } => Severity::Notice,
158            AdapterNotice::TableAlreadyExists { .. } => Severity::Notice,
159            AdapterNotice::ObjectAlreadyExists { .. } => Severity::Notice,
160            AdapterNotice::DatabaseDoesNotExist { .. } => Severity::Notice,
161            AdapterNotice::ClusterDoesNotExist { .. } => Severity::Notice,
162            AdapterNotice::DefaultClusterDoesNotExist { .. } => Severity::Notice,
163            AdapterNotice::NoResolvableSearchPathSchema { .. } => Severity::Notice,
164            AdapterNotice::ExistingTransactionInProgress => Severity::Warning,
165            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => Severity::Warning,
166            AdapterNotice::UserRequested { severity } => match severity {
167                NoticeSeverity::Debug => Severity::Debug,
168                NoticeSeverity::Info => Severity::Info,
169                NoticeSeverity::Log => Severity::Log,
170                NoticeSeverity::Notice => Severity::Notice,
171                NoticeSeverity::Warning => Severity::Warning,
172            },
173            AdapterNotice::ClusterReplicaStatusChanged { .. } => Severity::Notice,
174            AdapterNotice::CascadeDroppedObject { .. } => Severity::Notice,
175            AdapterNotice::DroppedActiveDatabase { .. } => Severity::Notice,
176            AdapterNotice::DroppedActiveCluster { .. } => Severity::Notice,
177            AdapterNotice::QueryTimestamp { .. } => Severity::Notice,
178            AdapterNotice::EqualSubscribeBounds { .. } => Severity::Notice,
179            AdapterNotice::QueryTrace { .. } => Severity::Notice,
180            AdapterNotice::UnimplementedIsolationLevel { .. } => Severity::Notice,
181            AdapterNotice::StrongSessionSerializable => Severity::Notice,
182            AdapterNotice::BadStartupSetting { .. } => Severity::Notice,
183            AdapterNotice::RbacUserDisabled => Severity::Notice,
184            AdapterNotice::RoleMembershipAlreadyExists { .. } => Severity::Notice,
185            AdapterNotice::RoleMembershipDoesNotExists { .. } => Severity::Warning,
186            AdapterNotice::AutoRunOnCatalogServerCluster => Severity::Debug,
187            AdapterNotice::AlterIndexOwner { .. } => Severity::Warning,
188            AdapterNotice::CannotRevoke { .. } => Severity::Warning,
189            AdapterNotice::NonApplicablePrivilegeTypes { .. } => Severity::Notice,
190            AdapterNotice::PlanNotice(notice) => match notice {
191                PlanNotice::ObjectDoesNotExist { .. } => Severity::Notice,
192                PlanNotice::ColumnAlreadyExists { .. } => Severity::Notice,
193                PlanNotice::UpsertSinkKeyNotEnforced { .. } => Severity::Warning,
194                PlanNotice::ReplicaDiskOptionDeprecated { .. } => Severity::Notice,
195            },
196            AdapterNotice::UnknownSessionDatabase(_) => Severity::Notice,
197            AdapterNotice::OptimizerNotice { .. } => Severity::Notice,
198            AdapterNotice::WebhookSourceCreated { .. } => Severity::Notice,
199            AdapterNotice::DroppedInUseIndex { .. } => Severity::Notice,
200            AdapterNotice::PerReplicaLogRead { .. } => Severity::Notice,
201            AdapterNotice::VarDefaultUpdated { .. } => Severity::Notice,
202            AdapterNotice::Welcome(_) => Severity::Notice,
203            AdapterNotice::PlanInsights(_) => Severity::Notice,
204            AdapterNotice::IntrospectionClusterUsage => Severity::Warning,
205            AdapterNotice::AutoRouteIntrospectionQueriesUsage => Severity::Warning,
206        }
207    }
208
209    /// Reports additional details about the notice, if any are available.
210    pub fn detail(&self) -> Option<String> {
211        match self {
212            AdapterNotice::PlanNotice(notice) => notice.detail(),
213            AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
214            AdapterNotice::CascadeDroppedObject { objects } => Some(
215                objects
216                    .iter()
217                    .map(|obj_info| format!("drop cascades to {}", obj_info))
218                    .join("\n"),
219            ),
220            _ => None,
221        }
222    }
223
224    /// Reports a hint for the user about how the notice could be addressed.
225    pub fn hint(&self) -> Option<String> {
226        match self {
227            AdapterNotice::DatabaseDoesNotExist { name: _ } => Some("Create the database with CREATE DATABASE or pick an extant database with SET DATABASE = name. List available databases with SHOW DATABASES.".into()),
228            AdapterNotice::ClusterDoesNotExist { name: _ } => Some("Create the cluster with CREATE CLUSTER or pick an extant cluster with SET CLUSTER = name. List available clusters with SHOW CLUSTERS.".into()),
229            AdapterNotice::DefaultClusterDoesNotExist { name: _, kind: _, suggested_action } => Some(suggested_action.clone()),
230            AdapterNotice::NoResolvableSearchPathSchema { search_path: _ } => Some("Create a schema with CREATE SCHEMA or pick an extant schema with SET SCHEMA = name. List available schemas with SHOW SCHEMAS.".into()),
231            AdapterNotice::DroppedActiveDatabase { name: _ } => Some("Choose a new active database by executing SET DATABASE = <name>.".into()),
232            AdapterNotice::DroppedActiveCluster { name: _ } => Some("Choose a new active cluster by executing SET CLUSTER = <name>.".into()),
233            AdapterNotice::ClusterReplicaStatusChanged { status, .. } => {
234                match status {
235                    ServiceStatus::Offline(None)
236                    | ServiceStatus::Offline(Some(OfflineReason::Initializing)) => Some("The cluster replica may be restarting or going offline.".into()),
237                    ServiceStatus::Offline(Some(OfflineReason::OomKilled)) => Some("The cluster replica may have run out of memory and been killed.".into()),
238                    ServiceStatus::Online => None,
239                }
240            },
241            AdapterNotice::RbacUserDisabled => Some("To enable RBAC globally run `ALTER SYSTEM SET enable_rbac_checks TO TRUE` as a superuser. TO enable RBAC for just this session run `SET enable_session_rbac_checks TO TRUE`.".into()),
242            AdapterNotice::AlterIndexOwner {name: _} => Some("Change the ownership of the index's relation, instead.".into()),
243            AdapterNotice::UnknownSessionDatabase(_) => Some(
244                "Create the database with CREATE DATABASE \
245                 or pick an extant database with SET DATABASE = name. \
246                 List available databases with SHOW DATABASES."
247                    .into(),
248            ),
249            AdapterNotice::OptimizerNotice { notice: _, hint } => Some(hint.clone()),
250            AdapterNotice::DroppedInUseIndex(..) => Some("To free up the resources used by the index, recreate all the above-mentioned objects.".into()),
251            AdapterNotice::IntrospectionClusterUsage => Some("Use the new name instead.".into()),
252            AdapterNotice::AutoRouteIntrospectionQueriesUsage => Some("Use the new name instead.".into()),
253            _ => None
254        }
255    }
256
257    /// Reports the error code.
258    pub fn code(&self) -> SqlState {
259        match self {
260            AdapterNotice::DatabaseAlreadyExists { .. } => SqlState::DUPLICATE_DATABASE,
261            AdapterNotice::SchemaAlreadyExists { .. } => SqlState::DUPLICATE_SCHEMA,
262            AdapterNotice::TableAlreadyExists { .. } => SqlState::DUPLICATE_TABLE,
263            AdapterNotice::ObjectAlreadyExists { .. } => SqlState::DUPLICATE_OBJECT,
264            AdapterNotice::DatabaseDoesNotExist { .. } => SqlState::from_code("MZ006"),
265            AdapterNotice::ClusterDoesNotExist { .. } => SqlState::from_code("MZ007"),
266            AdapterNotice::NoResolvableSearchPathSchema { .. } => SqlState::from_code("MZ008"),
267            AdapterNotice::ExistingTransactionInProgress => SqlState::ACTIVE_SQL_TRANSACTION,
268            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
269                SqlState::NO_ACTIVE_SQL_TRANSACTION
270            }
271            AdapterNotice::UserRequested { severity } => match severity {
272                NoticeSeverity::Warning => SqlState::WARNING,
273                _ => SqlState::SUCCESSFUL_COMPLETION,
274            },
275            AdapterNotice::ClusterReplicaStatusChanged { .. } => SqlState::SUCCESSFUL_COMPLETION,
276            AdapterNotice::CascadeDroppedObject { .. } => SqlState::SUCCESSFUL_COMPLETION,
277            AdapterNotice::DroppedActiveDatabase { .. } => SqlState::from_code("MZ002"),
278            AdapterNotice::DroppedActiveCluster { .. } => SqlState::from_code("MZ003"),
279            AdapterNotice::QueryTimestamp { .. } => SqlState::SUCCESSFUL_COMPLETION,
280            AdapterNotice::EqualSubscribeBounds { .. } => SqlState::SUCCESSFUL_COMPLETION,
281            AdapterNotice::QueryTrace { .. } => SqlState::SUCCESSFUL_COMPLETION,
282            AdapterNotice::UnimplementedIsolationLevel { .. } => SqlState::SUCCESSFUL_COMPLETION,
283            AdapterNotice::StrongSessionSerializable => SqlState::SUCCESSFUL_COMPLETION,
284            AdapterNotice::BadStartupSetting { .. } => SqlState::SUCCESSFUL_COMPLETION,
285            AdapterNotice::RbacUserDisabled => SqlState::SUCCESSFUL_COMPLETION,
286            AdapterNotice::RoleMembershipAlreadyExists { .. } => SqlState::SUCCESSFUL_COMPLETION,
287            AdapterNotice::RoleMembershipDoesNotExists { .. } => SqlState::WARNING,
288            AdapterNotice::AutoRunOnCatalogServerCluster => SqlState::SUCCESSFUL_COMPLETION,
289            AdapterNotice::AlterIndexOwner { .. } => SqlState::WARNING,
290            AdapterNotice::CannotRevoke { .. } => SqlState::WARNING_PRIVILEGE_NOT_REVOKED,
291            AdapterNotice::NonApplicablePrivilegeTypes { .. } => SqlState::SUCCESSFUL_COMPLETION,
292            AdapterNotice::PlanNotice(plan) => match plan {
293                PlanNotice::ObjectDoesNotExist { .. } => SqlState::UNDEFINED_OBJECT,
294                PlanNotice::ColumnAlreadyExists { .. } => SqlState::DUPLICATE_COLUMN,
295                PlanNotice::UpsertSinkKeyNotEnforced { .. } => SqlState::WARNING,
296                PlanNotice::ReplicaDiskOptionDeprecated { .. } => {
297                    SqlState::WARNING_DEPRECATED_FEATURE
298                }
299            },
300            AdapterNotice::UnknownSessionDatabase(_) => SqlState::from_code("MZ004"),
301            AdapterNotice::DefaultClusterDoesNotExist { .. } => SqlState::from_code("MZ005"),
302            AdapterNotice::OptimizerNotice { .. } => SqlState::SUCCESSFUL_COMPLETION,
303            AdapterNotice::DroppedInUseIndex { .. } => SqlState::SUCCESSFUL_COMPLETION,
304            AdapterNotice::WebhookSourceCreated { .. } => SqlState::SUCCESSFUL_COMPLETION,
305            AdapterNotice::PerReplicaLogRead { .. } => SqlState::SUCCESSFUL_COMPLETION,
306            AdapterNotice::VarDefaultUpdated { .. } => SqlState::SUCCESSFUL_COMPLETION,
307            AdapterNotice::Welcome(_) => SqlState::SUCCESSFUL_COMPLETION,
308            AdapterNotice::PlanInsights(_) => SqlState::from_code("MZ001"),
309            AdapterNotice::IntrospectionClusterUsage => SqlState::WARNING,
310            AdapterNotice::AutoRouteIntrospectionQueriesUsage => SqlState::WARNING,
311        }
312    }
313}
314
315impl fmt::Display for AdapterNotice {
316    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
317        match self {
318            AdapterNotice::DatabaseAlreadyExists { name } => {
319                write!(f, "database {} already exists, skipping", name.quoted())
320            }
321            AdapterNotice::SchemaAlreadyExists { name } => {
322                write!(f, "schema {} already exists, skipping", name.quoted())
323            }
324            AdapterNotice::TableAlreadyExists { name } => {
325                write!(f, "table {} already exists, skipping", name.quoted())
326            }
327            AdapterNotice::ObjectAlreadyExists { name, ty } => {
328                write!(f, "{} {} already exists, skipping", ty, name.quoted())
329            }
330            AdapterNotice::DatabaseDoesNotExist { name } => {
331                write!(f, "database {} does not exist", name.quoted())
332            }
333            AdapterNotice::CascadeDroppedObject { objects } => {
334                write!(f, "drop cascades to {} other objects", objects.len())
335            }
336            AdapterNotice::ClusterDoesNotExist { name } => {
337                write!(f, "cluster {} does not exist", name.quoted())
338            }
339            AdapterNotice::DefaultClusterDoesNotExist { kind, name, .. } => {
340                write!(f, "{kind} default cluster {} does not exist", name.quoted())
341            }
342            AdapterNotice::NoResolvableSearchPathSchema { search_path } => {
343                write!(
344                    f,
345                    "no schema on the search path exists: {}",
346                    search_path.join(", ")
347                )
348            }
349            AdapterNotice::ExistingTransactionInProgress => {
350                write!(f, "there is already a transaction in progress")
351            }
352            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
353                write!(f, "there is no transaction in progress")
354            }
355            AdapterNotice::UserRequested { severity } => {
356                write!(f, "raised a test {}", severity.to_string().to_lowercase())
357            }
358            AdapterNotice::ClusterReplicaStatusChanged {
359                cluster,
360                replica,
361                status,
362                time,
363            } => {
364                let mut time_buf = String::new();
365                strconv::format_timestamptz(&mut time_buf, time);
366                write!(
367                    f,
368                    "cluster replica {}.{} changed status to {} at {}",
369                    cluster,
370                    replica,
371                    status.as_kebab_case_str().quoted(),
372                    time_buf,
373                )?;
374                Ok(())
375            }
376            AdapterNotice::DroppedActiveDatabase { name } => {
377                write!(f, "active database {} has been dropped", name.quoted())
378            }
379            AdapterNotice::DroppedActiveCluster { name } => {
380                write!(f, "active cluster {} has been dropped", name.quoted())
381            }
382            AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
383            AdapterNotice::EqualSubscribeBounds { bound } => {
384                write!(
385                    f,
386                    "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty"
387                )
388            }
389            AdapterNotice::QueryTrace { trace_id } => {
390                write!(f, "trace id: {}", trace_id)
391            }
392            AdapterNotice::UnimplementedIsolationLevel { isolation_level } => {
393                write!(
394                    f,
395                    "transaction isolation level {isolation_level} is unimplemented, the session will be upgraded to {}",
396                    IsolationLevel::Serializable.as_str()
397                )
398            }
399            AdapterNotice::StrongSessionSerializable => {
400                write!(
401                    f,
402                    "The Strong Session Serializable isolation level may exhibit consistency violations when reading from catalog objects",
403                )
404            }
405            AdapterNotice::BadStartupSetting { name, reason } => {
406                write!(f, "startup setting {name} not set: {reason}")
407            }
408            AdapterNotice::RbacUserDisabled => {
409                write!(
410                    f,
411                    "RBAC is disabled so no role attributes or object ownership will be considered \
412                    when executing statements"
413                )
414            }
415            AdapterNotice::RoleMembershipAlreadyExists {
416                role_name,
417                member_name,
418            } => write!(
419                f,
420                "role \"{member_name}\" is already a member of role \"{role_name}\""
421            ),
422            AdapterNotice::RoleMembershipDoesNotExists {
423                role_name,
424                member_name,
425            } => write!(
426                f,
427                "role \"{member_name}\" is not a member of role \"{role_name}\""
428            ),
429            AdapterNotice::AutoRunOnCatalogServerCluster => write!(
430                f,
431                "query was automatically run on the \"mz_catalog_server\" cluster"
432            ),
433            AdapterNotice::AlterIndexOwner { name } => {
434                write!(f, "cannot change owner of {}", name.quoted())
435            }
436            AdapterNotice::CannotRevoke { object_description } => {
437                write!(f, "no privileges could be revoked for {object_description}")
438            }
439            AdapterNotice::NonApplicablePrivilegeTypes {
440                non_applicable_privileges,
441                object_description,
442            } => {
443                write!(
444                    f,
445                    "non-applicable privilege types {} for {}",
446                    non_applicable_privileges.to_error_string(),
447                    object_description,
448                )
449            }
450            AdapterNotice::PlanNotice(plan) => plan.fmt(f),
451            AdapterNotice::UnknownSessionDatabase(name) => {
452                write!(f, "session database {} does not exist", name.quoted())
453            }
454            AdapterNotice::OptimizerNotice { notice, hint: _ } => notice.fmt(f),
455            AdapterNotice::WebhookSourceCreated { url } => {
456                write!(f, "URL to POST data is '{url}'")
457            }
458            AdapterNotice::DroppedInUseIndex(DroppedInUseIndex {
459                index_name,
460                dependant_objects,
461            }) => {
462                write!(
463                    f,
464                    "The dropped index {index_name} is being used by the following objects: {}. The index is now dropped from the catalog, but it will continue to be maintained and take up resources until all dependent objects are dropped, altered, or Materialize is restarted!",
465                    separated(", ", dependant_objects)
466                )
467            }
468            AdapterNotice::PerReplicaLogRead { log_names } => {
469                write!(
470                    f,
471                    "Queried introspection relations: {}. Unlike other objects in Materialize, results from querying these objects depend on the current values of the `cluster` and `cluster_replica` session variables.",
472                    log_names.join(", ")
473                )
474            }
475            AdapterNotice::VarDefaultUpdated { role, var_name } => {
476                let vars = match var_name {
477                    Some(name) => format!("variable {} was", name.quoted()),
478                    None => "variables were".to_string(),
479                };
480                let target = match role {
481                    Some(role_name) => role_name.quoted().to_string(),
482                    None => "the system".to_string(),
483                };
484                write!(
485                    f,
486                    "{vars} updated for {target}, this will have no effect on the current session"
487                )
488            }
489            AdapterNotice::Welcome(message) => message.fmt(f),
490            AdapterNotice::PlanInsights(message) => message.fmt(f),
491            AdapterNotice::IntrospectionClusterUsage => write!(
492                f,
493                "The mz_introspection cluster has been renamed to mz_catalog_server."
494            ),
495            AdapterNotice::AutoRouteIntrospectionQueriesUsage => write!(
496                f,
497                "The auto_route_introspection_queries variable has been renamed to auto_route_catalog_queries."
498            ),
499        }
500    }
501}
502
503#[derive(Clone, Debug)]
504pub struct DroppedInUseIndex {
505    pub index_name: String,
506    pub dependant_objects: Vec<String>,
507}
508
509impl From<PlanNotice> for AdapterNotice {
510    fn from(notice: PlanNotice) -> AdapterNotice {
511        AdapterNotice::PlanNotice(notice)
512    }
513}