Skip to main content

mz_adapter/
notice.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_controller::clusters::ClusterStatus;
15use mz_orchestrator::{OfflineReason, ServiceStatus};
16use mz_ore::str::{StrExt, separated};
17use mz_pgwire_common::{ErrorResponse, Severity};
18use mz_repr::adt::mz_acl_item::AclMode;
19use mz_repr::strconv;
20use mz_sql::ast::NoticeSeverity;
21use mz_sql::catalog::ErrorMessageObjectDescription;
22use mz_sql::plan::PlanNotice;
23use mz_sql::session::vars::IsolationLevel;
24use tokio_postgres::error::SqlState;
25
26use crate::TimestampExplanation;
27
28/// Notices that can occur in the adapter layer.
29///
30/// These are diagnostic warnings or informational messages that are not
31/// severe enough to warrant failing a query entirely.
32#[derive(Clone, Debug)]
33pub enum AdapterNotice {
34    DatabaseAlreadyExists {
35        name: String,
36    },
37    SchemaAlreadyExists {
38        name: String,
39    },
40    TableAlreadyExists {
41        name: String,
42    },
43    ObjectAlreadyExists {
44        name: String,
45        ty: &'static str,
46    },
47    DatabaseDoesNotExist {
48        name: String,
49    },
50    ClusterDoesNotExist {
51        name: String,
52    },
53    DefaultClusterDoesNotExist {
54        name: String,
55        kind: &'static str,
56        suggested_action: String,
57    },
58    NoResolvableSearchPathSchema {
59        search_path: Vec<String>,
60    },
61    ExistingTransactionInProgress,
62    ExplicitTransactionControlInImplicitTransaction,
63    UserRequested {
64        severity: NoticeSeverity,
65    },
66    ClusterReplicaStatusChanged {
67        cluster: String,
68        replica: String,
69        status: ClusterStatus,
70        time: DateTime<Utc>,
71    },
72    CascadeDroppedObject {
73        objects: Vec<String>,
74    },
75    DroppedActiveDatabase {
76        name: String,
77    },
78    DroppedActiveCluster {
79        name: String,
80    },
81    QueryTimestamp {
82        explanation: TimestampExplanation<mz_repr::Timestamp>,
83    },
84    EqualSubscribeBounds {
85        bound: mz_repr::Timestamp,
86    },
87    QueryTrace {
88        trace_id: opentelemetry::trace::TraceId,
89    },
90    UnimplementedIsolationLevel {
91        isolation_level: String,
92    },
93    StrongSessionSerializable,
94    BadStartupSetting {
95        name: String,
96        reason: String,
97    },
98    RbacUserDisabled,
99    RoleMembershipAlreadyExists {
100        role_name: String,
101        member_name: String,
102    },
103    RoleMembershipDoesNotExists {
104        role_name: String,
105        member_name: String,
106    },
107    AutoRunOnCatalogServerCluster,
108    AlterIndexOwner {
109        name: String,
110    },
111    CannotRevoke {
112        object_description: ErrorMessageObjectDescription,
113    },
114    NonApplicablePrivilegeTypes {
115        non_applicable_privileges: AclMode,
116        object_description: ErrorMessageObjectDescription,
117    },
118    PlanNotice(PlanNotice),
119    UnknownSessionDatabase(String),
120    OptimizerNotice {
121        notice: String,
122        hint: String,
123    },
124    WebhookSourceCreated {
125        url: url::Url,
126    },
127    DroppedInUseIndex(DroppedInUseIndex),
128    PerReplicaLogRead {
129        log_names: Vec<String>,
130    },
131    VarDefaultUpdated {
132        role: Option<String>,
133        var_name: Option<String>,
134    },
135    Welcome(String),
136    PlanInsights(String),
137    IntrospectionClusterUsage,
138    AutoRouteIntrospectionQueriesUsage,
139}
140
141impl AdapterNotice {
142    pub fn into_response(self) -> ErrorResponse {
143        ErrorResponse {
144            severity: self.severity(),
145            code: self.code(),
146            message: self.to_string(),
147            detail: self.detail(),
148            hint: self.hint(),
149            position: None,
150        }
151    }
152
153    /// Returns the severity for a notice.
154    pub fn severity(&self) -> Severity {
155        match self {
156            AdapterNotice::DatabaseAlreadyExists { .. } => Severity::Notice,
157            AdapterNotice::SchemaAlreadyExists { .. } => Severity::Notice,
158            AdapterNotice::TableAlreadyExists { .. } => Severity::Notice,
159            AdapterNotice::ObjectAlreadyExists { .. } => Severity::Notice,
160            AdapterNotice::DatabaseDoesNotExist { .. } => Severity::Notice,
161            AdapterNotice::ClusterDoesNotExist { .. } => Severity::Notice,
162            AdapterNotice::DefaultClusterDoesNotExist { .. } => Severity::Notice,
163            AdapterNotice::NoResolvableSearchPathSchema { .. } => Severity::Notice,
164            AdapterNotice::ExistingTransactionInProgress => Severity::Warning,
165            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => Severity::Warning,
166            AdapterNotice::UserRequested { severity } => match severity {
167                NoticeSeverity::Debug => Severity::Debug,
168                NoticeSeverity::Info => Severity::Info,
169                NoticeSeverity::Log => Severity::Log,
170                NoticeSeverity::Notice => Severity::Notice,
171                NoticeSeverity::Warning => Severity::Warning,
172            },
173            AdapterNotice::ClusterReplicaStatusChanged { .. } => Severity::Notice,
174            AdapterNotice::CascadeDroppedObject { .. } => Severity::Notice,
175            AdapterNotice::DroppedActiveDatabase { .. } => Severity::Notice,
176            AdapterNotice::DroppedActiveCluster { .. } => Severity::Notice,
177            AdapterNotice::QueryTimestamp { .. } => Severity::Notice,
178            AdapterNotice::EqualSubscribeBounds { .. } => Severity::Notice,
179            AdapterNotice::QueryTrace { .. } => Severity::Notice,
180            AdapterNotice::UnimplementedIsolationLevel { .. } => Severity::Notice,
181            AdapterNotice::StrongSessionSerializable => Severity::Notice,
182            AdapterNotice::BadStartupSetting { .. } => Severity::Notice,
183            AdapterNotice::RbacUserDisabled => Severity::Notice,
184            AdapterNotice::RoleMembershipAlreadyExists { .. } => Severity::Notice,
185            AdapterNotice::RoleMembershipDoesNotExists { .. } => Severity::Warning,
186            AdapterNotice::AutoRunOnCatalogServerCluster => Severity::Debug,
187            AdapterNotice::AlterIndexOwner { .. } => Severity::Warning,
188            AdapterNotice::CannotRevoke { .. } => Severity::Warning,
189            AdapterNotice::NonApplicablePrivilegeTypes { .. } => Severity::Notice,
190            AdapterNotice::PlanNotice(notice) => match notice {
191                PlanNotice::ObjectDoesNotExist { .. } => Severity::Notice,
192                PlanNotice::ColumnAlreadyExists { .. } => Severity::Notice,
193                PlanNotice::UpsertSinkKeyNotEnforced { .. } => Severity::Warning,
194                PlanNotice::ReplicaDiskOptionDeprecated { .. } => Severity::Notice,
195            },
196            AdapterNotice::UnknownSessionDatabase(_) => Severity::Notice,
197            AdapterNotice::OptimizerNotice { .. } => Severity::Notice,
198            AdapterNotice::WebhookSourceCreated { .. } => Severity::Notice,
199            AdapterNotice::DroppedInUseIndex { .. } => Severity::Notice,
200            AdapterNotice::PerReplicaLogRead { .. } => Severity::Notice,
201            AdapterNotice::VarDefaultUpdated { .. } => Severity::Notice,
202            AdapterNotice::Welcome(_) => Severity::Notice,
203            AdapterNotice::PlanInsights(_) => Severity::Notice,
204            AdapterNotice::IntrospectionClusterUsage => Severity::Warning,
205            AdapterNotice::AutoRouteIntrospectionQueriesUsage => Severity::Warning,
206        }
207    }
208
209    /// Reports additional details about the notice, if any are available.
210    pub fn detail(&self) -> Option<String> {
211        match self {
212            AdapterNotice::PlanNotice(notice) => notice.detail(),
213            AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
214            AdapterNotice::CascadeDroppedObject { objects } => Some(
215                objects
216                    .iter()
217                    .map(|obj_info| format!("drop cascades to {}", obj_info))
218                    .join("\n"),
219            ),
220            _ => None,
221        }
222    }
223
224    /// Reports a hint for the user about how the notice could be addressed.
225    pub fn hint(&self) -> Option<String> {
226        match self {
227            AdapterNotice::DatabaseDoesNotExist { name: _ } => Some("Create the database with CREATE DATABASE or pick an extant database with SET DATABASE = name. List available databases with SHOW DATABASES.".into()),
228            AdapterNotice::ClusterDoesNotExist { name: _ } => Some("Create the cluster with CREATE CLUSTER or pick an extant cluster with SET CLUSTER = name. List available clusters with SHOW CLUSTERS.".into()),
229            AdapterNotice::DefaultClusterDoesNotExist {
230                name: _,
231                kind: _,
232                suggested_action,
233            } => Some(suggested_action.clone()),
234            AdapterNotice::NoResolvableSearchPathSchema { search_path: _ } => Some("Create a schema with CREATE SCHEMA or pick an extant schema with SET SCHEMA = name. List available schemas with SHOW SCHEMAS.".into()),
235            AdapterNotice::DroppedActiveDatabase { name: _ } => Some("Choose a new active database by executing SET DATABASE = <name>.".into()),
236            AdapterNotice::DroppedActiveCluster { name: _ } => Some("Choose a new active cluster by executing SET CLUSTER = <name>.".into()),
237            AdapterNotice::ClusterReplicaStatusChanged { status, .. } => {
238                match status {
239                    ServiceStatus::Offline(None)
240                    | ServiceStatus::Offline(Some(OfflineReason::Initializing)) => Some("The cluster replica may be restarting or going offline.".into()),
241                    ServiceStatus::Offline(Some(OfflineReason::OomKilled)) => Some("The cluster replica may have run out of memory and been killed.".into()),
242                    ServiceStatus::Online => None,
243                }
244            },
245            AdapterNotice::RbacUserDisabled => Some("To enable RBAC globally run `ALTER SYSTEM SET enable_rbac_checks TO TRUE` as a superuser. TO enable RBAC for just this session run `SET enable_session_rbac_checks TO TRUE`.".into()),
246            AdapterNotice::AlterIndexOwner {name: _} => Some("Change the ownership of the index's relation, instead.".into()),
247            AdapterNotice::UnknownSessionDatabase(_) => Some(
248                "Create the database with CREATE DATABASE \
249                 or pick an extant database with SET DATABASE = name. \
250                 List available databases with SHOW DATABASES."
251                    .into(),
252            ),
253            AdapterNotice::OptimizerNotice { notice: _, hint } => Some(hint.clone()),
254            AdapterNotice::DroppedInUseIndex(..) => Some("To free up the resources used by the index, recreate all the above-mentioned objects.".into()),
255            AdapterNotice::IntrospectionClusterUsage => Some("Use the new name instead.".into()),
256            AdapterNotice::AutoRouteIntrospectionQueriesUsage => Some("Use the new name instead.".into()),
257            _ => None
258        }
259    }
260
261    /// Reports the error code.
262    pub fn code(&self) -> SqlState {
263        match self {
264            AdapterNotice::DatabaseAlreadyExists { .. } => SqlState::DUPLICATE_DATABASE,
265            AdapterNotice::SchemaAlreadyExists { .. } => SqlState::DUPLICATE_SCHEMA,
266            AdapterNotice::TableAlreadyExists { .. } => SqlState::DUPLICATE_TABLE,
267            AdapterNotice::ObjectAlreadyExists { .. } => SqlState::DUPLICATE_OBJECT,
268            AdapterNotice::DatabaseDoesNotExist { .. } => SqlState::from_code("MZ006"),
269            AdapterNotice::ClusterDoesNotExist { .. } => SqlState::from_code("MZ007"),
270            AdapterNotice::NoResolvableSearchPathSchema { .. } => SqlState::from_code("MZ008"),
271            AdapterNotice::ExistingTransactionInProgress => SqlState::ACTIVE_SQL_TRANSACTION,
272            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
273                SqlState::NO_ACTIVE_SQL_TRANSACTION
274            }
275            AdapterNotice::UserRequested { severity } => match severity {
276                NoticeSeverity::Warning => SqlState::WARNING,
277                _ => SqlState::SUCCESSFUL_COMPLETION,
278            },
279            AdapterNotice::ClusterReplicaStatusChanged { .. } => SqlState::SUCCESSFUL_COMPLETION,
280            AdapterNotice::CascadeDroppedObject { .. } => SqlState::SUCCESSFUL_COMPLETION,
281            AdapterNotice::DroppedActiveDatabase { .. } => SqlState::from_code("MZ002"),
282            AdapterNotice::DroppedActiveCluster { .. } => SqlState::from_code("MZ003"),
283            AdapterNotice::QueryTimestamp { .. } => SqlState::SUCCESSFUL_COMPLETION,
284            AdapterNotice::EqualSubscribeBounds { .. } => SqlState::SUCCESSFUL_COMPLETION,
285            AdapterNotice::QueryTrace { .. } => SqlState::SUCCESSFUL_COMPLETION,
286            AdapterNotice::UnimplementedIsolationLevel { .. } => SqlState::SUCCESSFUL_COMPLETION,
287            AdapterNotice::StrongSessionSerializable => SqlState::SUCCESSFUL_COMPLETION,
288            AdapterNotice::BadStartupSetting { .. } => SqlState::SUCCESSFUL_COMPLETION,
289            AdapterNotice::RbacUserDisabled => SqlState::SUCCESSFUL_COMPLETION,
290            AdapterNotice::RoleMembershipAlreadyExists { .. } => SqlState::SUCCESSFUL_COMPLETION,
291            AdapterNotice::RoleMembershipDoesNotExists { .. } => SqlState::WARNING,
292            AdapterNotice::AutoRunOnCatalogServerCluster => SqlState::SUCCESSFUL_COMPLETION,
293            AdapterNotice::AlterIndexOwner { .. } => SqlState::WARNING,
294            AdapterNotice::CannotRevoke { .. } => SqlState::WARNING_PRIVILEGE_NOT_REVOKED,
295            AdapterNotice::NonApplicablePrivilegeTypes { .. } => SqlState::SUCCESSFUL_COMPLETION,
296            AdapterNotice::PlanNotice(plan) => match plan {
297                PlanNotice::ObjectDoesNotExist { .. } => SqlState::UNDEFINED_OBJECT,
298                PlanNotice::ColumnAlreadyExists { .. } => SqlState::DUPLICATE_COLUMN,
299                PlanNotice::UpsertSinkKeyNotEnforced { .. } => SqlState::WARNING,
300                PlanNotice::ReplicaDiskOptionDeprecated { .. } => {
301                    SqlState::WARNING_DEPRECATED_FEATURE
302                }
303            },
304            AdapterNotice::UnknownSessionDatabase(_) => SqlState::from_code("MZ004"),
305            AdapterNotice::DefaultClusterDoesNotExist { .. } => SqlState::from_code("MZ005"),
306            AdapterNotice::OptimizerNotice { .. } => SqlState::SUCCESSFUL_COMPLETION,
307            AdapterNotice::DroppedInUseIndex { .. } => SqlState::SUCCESSFUL_COMPLETION,
308            AdapterNotice::WebhookSourceCreated { .. } => SqlState::SUCCESSFUL_COMPLETION,
309            AdapterNotice::PerReplicaLogRead { .. } => SqlState::SUCCESSFUL_COMPLETION,
310            AdapterNotice::VarDefaultUpdated { .. } => SqlState::SUCCESSFUL_COMPLETION,
311            AdapterNotice::Welcome(_) => SqlState::SUCCESSFUL_COMPLETION,
312            AdapterNotice::PlanInsights(_) => SqlState::from_code("MZ001"),
313            AdapterNotice::IntrospectionClusterUsage => SqlState::WARNING,
314            AdapterNotice::AutoRouteIntrospectionQueriesUsage => SqlState::WARNING,
315        }
316    }
317}
318
319impl fmt::Display for AdapterNotice {
320    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
321        match self {
322            AdapterNotice::DatabaseAlreadyExists { name } => {
323                write!(f, "database {} already exists, skipping", name.quoted())
324            }
325            AdapterNotice::SchemaAlreadyExists { name } => {
326                write!(f, "schema {} already exists, skipping", name.quoted())
327            }
328            AdapterNotice::TableAlreadyExists { name } => {
329                write!(f, "table {} already exists, skipping", name.quoted())
330            }
331            AdapterNotice::ObjectAlreadyExists { name, ty } => {
332                write!(f, "{} {} already exists, skipping", ty, name.quoted())
333            }
334            AdapterNotice::DatabaseDoesNotExist { name } => {
335                write!(f, "database {} does not exist", name.quoted())
336            }
337            AdapterNotice::CascadeDroppedObject { objects } => {
338                write!(f, "drop cascades to {} other objects", objects.len())
339            }
340            AdapterNotice::ClusterDoesNotExist { name } => {
341                write!(f, "cluster {} does not exist", name.quoted())
342            }
343            AdapterNotice::DefaultClusterDoesNotExist { kind, name, .. } => {
344                write!(f, "{kind} default cluster {} does not exist", name.quoted())
345            }
346            AdapterNotice::NoResolvableSearchPathSchema { search_path } => {
347                write!(
348                    f,
349                    "no schema on the search path exists: {}",
350                    search_path.join(", ")
351                )
352            }
353            AdapterNotice::ExistingTransactionInProgress => {
354                write!(f, "there is already a transaction in progress")
355            }
356            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
357                write!(f, "there is no transaction in progress")
358            }
359            AdapterNotice::UserRequested { severity } => {
360                write!(f, "raised a test {}", severity.to_string().to_lowercase())
361            }
362            AdapterNotice::ClusterReplicaStatusChanged {
363                cluster,
364                replica,
365                status,
366                time,
367            } => {
368                let mut time_buf = String::new();
369                strconv::format_timestamptz(&mut time_buf, time);
370                write!(
371                    f,
372                    "cluster replica {}.{} changed status to {} at {}",
373                    cluster,
374                    replica,
375                    status.as_kebab_case_str().quoted(),
376                    time_buf,
377                )?;
378                Ok(())
379            }
380            AdapterNotice::DroppedActiveDatabase { name } => {
381                write!(f, "active database {} has been dropped", name.quoted())
382            }
383            AdapterNotice::DroppedActiveCluster { name } => {
384                write!(f, "active cluster {} has been dropped", name.quoted())
385            }
386            AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
387            AdapterNotice::EqualSubscribeBounds { bound } => {
388                write!(
389                    f,
390                    "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty"
391                )
392            }
393            AdapterNotice::QueryTrace { trace_id } => {
394                write!(f, "trace id: {}", trace_id)
395            }
396            AdapterNotice::UnimplementedIsolationLevel { isolation_level } => {
397                write!(
398                    f,
399                    "transaction isolation level {isolation_level} is unimplemented, the session will be upgraded to {}",
400                    IsolationLevel::Serializable.as_str()
401                )
402            }
403            AdapterNotice::StrongSessionSerializable => {
404                write!(
405                    f,
406                    "The Strong Session Serializable isolation level may exhibit consistency violations when reading from catalog objects",
407                )
408            }
409            AdapterNotice::BadStartupSetting { name, reason } => {
410                write!(f, "startup setting {name} not set: {reason}")
411            }
412            AdapterNotice::RbacUserDisabled => {
413                write!(
414                    f,
415                    "RBAC is disabled so no role attributes or object ownership will be considered \
416                    when executing statements"
417                )
418            }
419            AdapterNotice::RoleMembershipAlreadyExists {
420                role_name,
421                member_name,
422            } => write!(
423                f,
424                "role \"{member_name}\" is already a member of role \"{role_name}\""
425            ),
426            AdapterNotice::RoleMembershipDoesNotExists {
427                role_name,
428                member_name,
429            } => write!(
430                f,
431                "role \"{member_name}\" is not a member of role \"{role_name}\""
432            ),
433            AdapterNotice::AutoRunOnCatalogServerCluster => write!(
434                f,
435                "query was automatically run on the \"mz_catalog_server\" cluster"
436            ),
437            AdapterNotice::AlterIndexOwner { name } => {
438                write!(f, "cannot change owner of {}", name.quoted())
439            }
440            AdapterNotice::CannotRevoke { object_description } => {
441                write!(f, "no privileges could be revoked for {object_description}")
442            }
443            AdapterNotice::NonApplicablePrivilegeTypes {
444                non_applicable_privileges,
445                object_description,
446            } => {
447                write!(
448                    f,
449                    "non-applicable privilege types {} for {}",
450                    non_applicable_privileges.to_error_string(),
451                    object_description,
452                )
453            }
454            AdapterNotice::PlanNotice(plan) => plan.fmt(f),
455            AdapterNotice::UnknownSessionDatabase(name) => {
456                write!(f, "session database {} does not exist", name.quoted())
457            }
458            AdapterNotice::OptimizerNotice { notice, hint: _ } => notice.fmt(f),
459            AdapterNotice::WebhookSourceCreated { url } => {
460                write!(f, "URL to POST data is '{url}'")
461            }
462            AdapterNotice::DroppedInUseIndex(DroppedInUseIndex {
463                index_name,
464                dependant_objects,
465            }) => {
466                write!(
467                    f,
468                    "The dropped index {index_name} is being used by the following objects: {}. The index is now dropped from the catalog, but it will continue to be maintained and take up resources until all dependent objects are dropped, altered, or Materialize is restarted!",
469                    separated(", ", dependant_objects)
470                )
471            }
472            AdapterNotice::PerReplicaLogRead { log_names } => {
473                write!(
474                    f,
475                    "Queried introspection relations: {}. Unlike other objects in Materialize, results from querying these objects depend on the current values of the `cluster` and `cluster_replica` session variables.",
476                    log_names.join(", ")
477                )
478            }
479            AdapterNotice::VarDefaultUpdated { role, var_name } => {
480                let vars = match var_name {
481                    Some(name) => format!("variable {} was", name.quoted()),
482                    None => "variables were".to_string(),
483                };
484                let target = match role {
485                    Some(role_name) => role_name.quoted().to_string(),
486                    None => "the system".to_string(),
487                };
488                write!(
489                    f,
490                    "{vars} updated for {target}, this will have no effect on the current session"
491                )
492            }
493            AdapterNotice::Welcome(message) => message.fmt(f),
494            AdapterNotice::PlanInsights(message) => message.fmt(f),
495            AdapterNotice::IntrospectionClusterUsage => write!(
496                f,
497                "The mz_introspection cluster has been renamed to mz_catalog_server."
498            ),
499            AdapterNotice::AutoRouteIntrospectionQueriesUsage => write!(
500                f,
501                "The auto_route_introspection_queries variable has been renamed to auto_route_catalog_queries."
502            ),
503        }
504    }
505}
506
507#[derive(Clone, Debug)]
508pub struct DroppedInUseIndex {
509    pub index_name: String,
510    pub dependant_objects: Vec<String>,
511}
512
513impl From<PlanNotice> for AdapterNotice {
514    fn from(notice: PlanNotice) -> AdapterNotice {
515        AdapterNotice::PlanNotice(notice)
516    }
517}