mz_adapter/
notice.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::fmt;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_controller::clusters::ClusterStatus;
15use mz_orchestrator::{OfflineReason, ServiceStatus};
16use mz_ore::str::{StrExt, separated};
17use mz_pgwire_common::{ErrorResponse, Severity};
18use mz_repr::adt::mz_acl_item::AclMode;
19use mz_repr::strconv;
20use mz_sql::ast::NoticeSeverity;
21use mz_sql::catalog::ErrorMessageObjectDescription;
22use mz_sql::plan::PlanNotice;
23use mz_sql::session::vars::IsolationLevel;
24use tokio_postgres::error::SqlState;
25
26use crate::TimestampExplanation;
27
28/// Notices that can occur in the adapter layer.
29///
30/// These are diagnostic warnings or informational messages that are not
31/// severe enough to warrant failing a query entirely.
32#[derive(Clone, Debug)]
33pub enum AdapterNotice {
34    DatabaseAlreadyExists {
35        name: String,
36    },
37    SchemaAlreadyExists {
38        name: String,
39    },
40    TableAlreadyExists {
41        name: String,
42    },
43    ObjectAlreadyExists {
44        name: String,
45        ty: &'static str,
46    },
47    DatabaseDoesNotExist {
48        name: String,
49    },
50    ClusterDoesNotExist {
51        name: String,
52    },
53    DefaultClusterDoesNotExist {
54        name: String,
55        kind: &'static str,
56        suggested_action: String,
57    },
58    NoResolvableSearchPathSchema {
59        search_path: Vec<String>,
60    },
61    ExistingTransactionInProgress,
62    ExplicitTransactionControlInImplicitTransaction,
63    UserRequested {
64        severity: NoticeSeverity,
65    },
66    ClusterReplicaStatusChanged {
67        cluster: String,
68        replica: String,
69        status: ClusterStatus,
70        time: DateTime<Utc>,
71    },
72    CascadeDroppedObject {
73        objects: Vec<String>,
74    },
75    DroppedActiveDatabase {
76        name: String,
77    },
78    DroppedActiveCluster {
79        name: String,
80    },
81    QueryTimestamp {
82        explanation: TimestampExplanation<mz_repr::Timestamp>,
83    },
84    EqualSubscribeBounds {
85        bound: mz_repr::Timestamp,
86    },
87    QueryTrace {
88        trace_id: opentelemetry::trace::TraceId,
89    },
90    UnimplementedIsolationLevel {
91        isolation_level: String,
92    },
93    StrongSessionSerializable,
94    BadStartupSetting {
95        name: String,
96        reason: String,
97    },
98    RbacUserDisabled,
99    RoleMembershipAlreadyExists {
100        role_name: String,
101        member_name: String,
102    },
103    RoleMembershipDoesNotExists {
104        role_name: String,
105        member_name: String,
106    },
107    AutoRunOnCatalogServerCluster,
108    AlterIndexOwner {
109        name: String,
110    },
111    CannotRevoke {
112        object_description: ErrorMessageObjectDescription,
113    },
114    NonApplicablePrivilegeTypes {
115        non_applicable_privileges: AclMode,
116        object_description: ErrorMessageObjectDescription,
117    },
118    PlanNotice(PlanNotice),
119    UnknownSessionDatabase(String),
120    OptimizerNotice {
121        notice: String,
122        hint: String,
123    },
124    WebhookSourceCreated {
125        url: url::Url,
126    },
127    DroppedInUseIndex(DroppedInUseIndex),
128    PerReplicaLogRead {
129        log_names: Vec<String>,
130    },
131    VarDefaultUpdated {
132        role: Option<String>,
133        var_name: Option<String>,
134    },
135    Welcome(String),
136    PlanInsights(String),
137    IntrospectionClusterUsage,
138    AutoRouteIntrospectionQueriesUsage,
139}
140
141impl AdapterNotice {
142    pub fn into_response(self) -> ErrorResponse {
143        ErrorResponse {
144            severity: self.severity(),
145            code: self.code(),
146            message: self.to_string(),
147            detail: self.detail(),
148            hint: self.hint(),
149            position: None,
150        }
151    }
152
153    /// Returns the severity for a notice.
154    pub fn severity(&self) -> Severity {
155        match self {
156            AdapterNotice::DatabaseAlreadyExists { .. } => Severity::Notice,
157            AdapterNotice::SchemaAlreadyExists { .. } => Severity::Notice,
158            AdapterNotice::TableAlreadyExists { .. } => Severity::Notice,
159            AdapterNotice::ObjectAlreadyExists { .. } => Severity::Notice,
160            AdapterNotice::DatabaseDoesNotExist { .. } => Severity::Notice,
161            AdapterNotice::ClusterDoesNotExist { .. } => Severity::Notice,
162            AdapterNotice::DefaultClusterDoesNotExist { .. } => Severity::Notice,
163            AdapterNotice::NoResolvableSearchPathSchema { .. } => Severity::Notice,
164            AdapterNotice::ExistingTransactionInProgress => Severity::Warning,
165            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => Severity::Warning,
166            AdapterNotice::UserRequested { severity } => match severity {
167                NoticeSeverity::Debug => Severity::Debug,
168                NoticeSeverity::Info => Severity::Info,
169                NoticeSeverity::Log => Severity::Log,
170                NoticeSeverity::Notice => Severity::Notice,
171                NoticeSeverity::Warning => Severity::Warning,
172            },
173            AdapterNotice::ClusterReplicaStatusChanged { .. } => Severity::Notice,
174            AdapterNotice::CascadeDroppedObject { .. } => Severity::Notice,
175            AdapterNotice::DroppedActiveDatabase { .. } => Severity::Notice,
176            AdapterNotice::DroppedActiveCluster { .. } => Severity::Notice,
177            AdapterNotice::QueryTimestamp { .. } => Severity::Notice,
178            AdapterNotice::EqualSubscribeBounds { .. } => Severity::Notice,
179            AdapterNotice::QueryTrace { .. } => Severity::Notice,
180            AdapterNotice::UnimplementedIsolationLevel { .. } => Severity::Notice,
181            AdapterNotice::StrongSessionSerializable => Severity::Notice,
182            AdapterNotice::BadStartupSetting { .. } => Severity::Notice,
183            AdapterNotice::RbacUserDisabled => Severity::Notice,
184            AdapterNotice::RoleMembershipAlreadyExists { .. } => Severity::Notice,
185            AdapterNotice::RoleMembershipDoesNotExists { .. } => Severity::Warning,
186            AdapterNotice::AutoRunOnCatalogServerCluster => Severity::Debug,
187            AdapterNotice::AlterIndexOwner { .. } => Severity::Warning,
188            AdapterNotice::CannotRevoke { .. } => Severity::Warning,
189            AdapterNotice::NonApplicablePrivilegeTypes { .. } => Severity::Notice,
190            AdapterNotice::PlanNotice(notice) => match notice {
191                PlanNotice::ObjectDoesNotExist { .. } => Severity::Notice,
192                PlanNotice::ColumnAlreadyExists { .. } => Severity::Notice,
193                PlanNotice::UpsertSinkKeyNotEnforced { .. } => Severity::Warning,
194            },
195            AdapterNotice::UnknownSessionDatabase(_) => Severity::Notice,
196            AdapterNotice::OptimizerNotice { .. } => Severity::Notice,
197            AdapterNotice::WebhookSourceCreated { .. } => Severity::Notice,
198            AdapterNotice::DroppedInUseIndex { .. } => Severity::Notice,
199            AdapterNotice::PerReplicaLogRead { .. } => Severity::Notice,
200            AdapterNotice::VarDefaultUpdated { .. } => Severity::Notice,
201            AdapterNotice::Welcome(_) => Severity::Notice,
202            AdapterNotice::PlanInsights(_) => Severity::Notice,
203            AdapterNotice::IntrospectionClusterUsage => Severity::Warning,
204            AdapterNotice::AutoRouteIntrospectionQueriesUsage => Severity::Warning,
205        }
206    }
207
208    /// Reports additional details about the notice, if any are available.
209    pub fn detail(&self) -> Option<String> {
210        match self {
211            AdapterNotice::PlanNotice(notice) => notice.detail(),
212            AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
213            AdapterNotice::CascadeDroppedObject { objects } => Some(
214                objects
215                    .iter()
216                    .map(|obj_info| format!("drop cascades to {}", obj_info))
217                    .join("\n"),
218            ),
219            _ => None,
220        }
221    }
222
223    /// Reports a hint for the user about how the notice could be addressed.
224    pub fn hint(&self) -> Option<String> {
225        match self {
226            AdapterNotice::DatabaseDoesNotExist { name: _ } => Some("Create the database with CREATE DATABASE or pick an extant database with SET DATABASE = name. List available databases with SHOW DATABASES.".into()),
227            AdapterNotice::ClusterDoesNotExist { name: _ } => Some("Create the cluster with CREATE CLUSTER or pick an extant cluster with SET CLUSTER = name. List available clusters with SHOW CLUSTERS.".into()),
228            AdapterNotice::DefaultClusterDoesNotExist { name: _, kind: _, suggested_action } => Some(suggested_action.clone()),
229            AdapterNotice::NoResolvableSearchPathSchema { search_path: _ } => Some("Create a schema with CREATE SCHEMA or pick an extant schema with SET SCHEMA = name. List available schemas with SHOW SCHEMAS.".into()),
230            AdapterNotice::DroppedActiveDatabase { name: _ } => Some("Choose a new active database by executing SET DATABASE = <name>.".into()),
231            AdapterNotice::DroppedActiveCluster { name: _ } => Some("Choose a new active cluster by executing SET CLUSTER = <name>.".into()),
232            AdapterNotice::ClusterReplicaStatusChanged { status, .. } => {
233                match status {
234                    ServiceStatus::Offline(None)
235                    | ServiceStatus::Offline(Some(OfflineReason::Initializing)) => Some("The cluster replica may be restarting or going offline.".into()),
236                    ServiceStatus::Offline(Some(OfflineReason::OomKilled)) => Some("The cluster replica may have run out of memory and been killed.".into()),
237                    ServiceStatus::Online => None,
238                }
239            },
240            AdapterNotice::RbacUserDisabled => Some("To enable RBAC globally run `ALTER SYSTEM SET enable_rbac_checks TO TRUE` as a superuser. TO enable RBAC for just this session run `SET enable_session_rbac_checks TO TRUE`.".into()),
241            AdapterNotice::AlterIndexOwner {name: _} => Some("Change the ownership of the index's relation, instead.".into()),
242            AdapterNotice::UnknownSessionDatabase(_) => Some(
243                "Create the database with CREATE DATABASE \
244                 or pick an extant database with SET DATABASE = name. \
245                 List available databases with SHOW DATABASES."
246                    .into(),
247            ),
248            AdapterNotice::OptimizerNotice { notice: _, hint } => Some(hint.clone()),
249            AdapterNotice::DroppedInUseIndex(..) => Some("To free up the resources used by the index, recreate all the above-mentioned objects.".into()),
250            AdapterNotice::IntrospectionClusterUsage => Some("Use the new name instead.".into()),
251            AdapterNotice::AutoRouteIntrospectionQueriesUsage => Some("Use the new name instead.".into()),
252            _ => None
253        }
254    }
255
256    /// Reports the error code.
257    pub fn code(&self) -> SqlState {
258        match self {
259            AdapterNotice::DatabaseAlreadyExists { .. } => SqlState::DUPLICATE_DATABASE,
260            AdapterNotice::SchemaAlreadyExists { .. } => SqlState::DUPLICATE_SCHEMA,
261            AdapterNotice::TableAlreadyExists { .. } => SqlState::DUPLICATE_TABLE,
262            AdapterNotice::ObjectAlreadyExists { .. } => SqlState::DUPLICATE_OBJECT,
263            AdapterNotice::DatabaseDoesNotExist { .. } => SqlState::from_code("MZ006"),
264            AdapterNotice::ClusterDoesNotExist { .. } => SqlState::from_code("MZ007"),
265            AdapterNotice::NoResolvableSearchPathSchema { .. } => SqlState::from_code("MZ008"),
266            AdapterNotice::ExistingTransactionInProgress => SqlState::ACTIVE_SQL_TRANSACTION,
267            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
268                SqlState::NO_ACTIVE_SQL_TRANSACTION
269            }
270            AdapterNotice::UserRequested { severity } => match severity {
271                NoticeSeverity::Warning => SqlState::WARNING,
272                _ => SqlState::SUCCESSFUL_COMPLETION,
273            },
274            AdapterNotice::ClusterReplicaStatusChanged { .. } => SqlState::SUCCESSFUL_COMPLETION,
275            AdapterNotice::CascadeDroppedObject { .. } => SqlState::SUCCESSFUL_COMPLETION,
276            AdapterNotice::DroppedActiveDatabase { .. } => SqlState::from_code("MZ002"),
277            AdapterNotice::DroppedActiveCluster { .. } => SqlState::from_code("MZ003"),
278            AdapterNotice::QueryTimestamp { .. } => SqlState::SUCCESSFUL_COMPLETION,
279            AdapterNotice::EqualSubscribeBounds { .. } => SqlState::SUCCESSFUL_COMPLETION,
280            AdapterNotice::QueryTrace { .. } => SqlState::SUCCESSFUL_COMPLETION,
281            AdapterNotice::UnimplementedIsolationLevel { .. } => SqlState::SUCCESSFUL_COMPLETION,
282            AdapterNotice::StrongSessionSerializable => SqlState::SUCCESSFUL_COMPLETION,
283            AdapterNotice::BadStartupSetting { .. } => SqlState::SUCCESSFUL_COMPLETION,
284            AdapterNotice::RbacUserDisabled => SqlState::SUCCESSFUL_COMPLETION,
285            AdapterNotice::RoleMembershipAlreadyExists { .. } => SqlState::SUCCESSFUL_COMPLETION,
286            AdapterNotice::RoleMembershipDoesNotExists { .. } => SqlState::WARNING,
287            AdapterNotice::AutoRunOnCatalogServerCluster => SqlState::SUCCESSFUL_COMPLETION,
288            AdapterNotice::AlterIndexOwner { .. } => SqlState::WARNING,
289            AdapterNotice::CannotRevoke { .. } => SqlState::WARNING_PRIVILEGE_NOT_REVOKED,
290            AdapterNotice::NonApplicablePrivilegeTypes { .. } => SqlState::SUCCESSFUL_COMPLETION,
291            AdapterNotice::PlanNotice(plan) => match plan {
292                PlanNotice::ObjectDoesNotExist { .. } => SqlState::UNDEFINED_OBJECT,
293                PlanNotice::ColumnAlreadyExists { .. } => SqlState::DUPLICATE_COLUMN,
294                PlanNotice::UpsertSinkKeyNotEnforced { .. } => SqlState::WARNING,
295            },
296            AdapterNotice::UnknownSessionDatabase(_) => SqlState::from_code("MZ004"),
297            AdapterNotice::DefaultClusterDoesNotExist { .. } => SqlState::from_code("MZ005"),
298            AdapterNotice::OptimizerNotice { .. } => SqlState::SUCCESSFUL_COMPLETION,
299            AdapterNotice::DroppedInUseIndex { .. } => SqlState::SUCCESSFUL_COMPLETION,
300            AdapterNotice::WebhookSourceCreated { .. } => SqlState::SUCCESSFUL_COMPLETION,
301            AdapterNotice::PerReplicaLogRead { .. } => SqlState::SUCCESSFUL_COMPLETION,
302            AdapterNotice::VarDefaultUpdated { .. } => SqlState::SUCCESSFUL_COMPLETION,
303            AdapterNotice::Welcome(_) => SqlState::SUCCESSFUL_COMPLETION,
304            AdapterNotice::PlanInsights(_) => SqlState::from_code("MZ001"),
305            AdapterNotice::IntrospectionClusterUsage => SqlState::WARNING,
306            AdapterNotice::AutoRouteIntrospectionQueriesUsage => SqlState::WARNING,
307        }
308    }
309}
310
311impl fmt::Display for AdapterNotice {
312    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
313        match self {
314            AdapterNotice::DatabaseAlreadyExists { name } => {
315                write!(f, "database {} already exists, skipping", name.quoted())
316            }
317            AdapterNotice::SchemaAlreadyExists { name } => {
318                write!(f, "schema {} already exists, skipping", name.quoted())
319            }
320            AdapterNotice::TableAlreadyExists { name } => {
321                write!(f, "table {} already exists, skipping", name.quoted())
322            }
323            AdapterNotice::ObjectAlreadyExists { name, ty } => {
324                write!(f, "{} {} already exists, skipping", ty, name.quoted())
325            }
326            AdapterNotice::DatabaseDoesNotExist { name } => {
327                write!(f, "database {} does not exist", name.quoted())
328            }
329            AdapterNotice::CascadeDroppedObject { objects } => {
330                write!(f, "drop cascades to {} other objects", objects.len())
331            }
332            AdapterNotice::ClusterDoesNotExist { name } => {
333                write!(f, "cluster {} does not exist", name.quoted())
334            }
335            AdapterNotice::DefaultClusterDoesNotExist { kind, name, .. } => {
336                write!(f, "{kind} default cluster {} does not exist", name.quoted())
337            }
338            AdapterNotice::NoResolvableSearchPathSchema { search_path } => {
339                write!(
340                    f,
341                    "no schema on the search path exists: {}",
342                    search_path.join(", ")
343                )
344            }
345            AdapterNotice::ExistingTransactionInProgress => {
346                write!(f, "there is already a transaction in progress")
347            }
348            AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
349                write!(f, "there is no transaction in progress")
350            }
351            AdapterNotice::UserRequested { severity } => {
352                write!(f, "raised a test {}", severity.to_string().to_lowercase())
353            }
354            AdapterNotice::ClusterReplicaStatusChanged {
355                cluster,
356                replica,
357                status,
358                time,
359            } => {
360                let mut time_buf = String::new();
361                strconv::format_timestamptz(&mut time_buf, time);
362                write!(
363                    f,
364                    "cluster replica {}.{} changed status to {} at {}",
365                    cluster,
366                    replica,
367                    status.as_kebab_case_str().quoted(),
368                    time_buf,
369                )?;
370                Ok(())
371            }
372            AdapterNotice::DroppedActiveDatabase { name } => {
373                write!(f, "active database {} has been dropped", name.quoted())
374            }
375            AdapterNotice::DroppedActiveCluster { name } => {
376                write!(f, "active cluster {} has been dropped", name.quoted())
377            }
378            AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
379            AdapterNotice::EqualSubscribeBounds { bound } => {
380                write!(
381                    f,
382                    "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty"
383                )
384            }
385            AdapterNotice::QueryTrace { trace_id } => {
386                write!(f, "trace id: {}", trace_id)
387            }
388            AdapterNotice::UnimplementedIsolationLevel { isolation_level } => {
389                write!(
390                    f,
391                    "transaction isolation level {isolation_level} is unimplemented, the session will be upgraded to {}",
392                    IsolationLevel::Serializable.as_str()
393                )
394            }
395            AdapterNotice::StrongSessionSerializable => {
396                write!(
397                    f,
398                    "The Strong Session Serializable isolation level may exhibit consistency violations when reading from catalog objects",
399                )
400            }
401            AdapterNotice::BadStartupSetting { name, reason } => {
402                write!(f, "startup setting {name} not set: {reason}")
403            }
404            AdapterNotice::RbacUserDisabled => {
405                write!(
406                    f,
407                    "RBAC is disabled so no role attributes or object ownership will be considered \
408                    when executing statements"
409                )
410            }
411            AdapterNotice::RoleMembershipAlreadyExists {
412                role_name,
413                member_name,
414            } => write!(
415                f,
416                "role \"{member_name}\" is already a member of role \"{role_name}\""
417            ),
418            AdapterNotice::RoleMembershipDoesNotExists {
419                role_name,
420                member_name,
421            } => write!(
422                f,
423                "role \"{member_name}\" is not a member of role \"{role_name}\""
424            ),
425            AdapterNotice::AutoRunOnCatalogServerCluster => write!(
426                f,
427                "query was automatically run on the \"mz_catalog_server\" cluster"
428            ),
429            AdapterNotice::AlterIndexOwner { name } => {
430                write!(f, "cannot change owner of {}", name.quoted())
431            }
432            AdapterNotice::CannotRevoke { object_description } => {
433                write!(f, "no privileges could be revoked for {object_description}")
434            }
435            AdapterNotice::NonApplicablePrivilegeTypes {
436                non_applicable_privileges,
437                object_description,
438            } => {
439                write!(
440                    f,
441                    "non-applicable privilege types {} for {}",
442                    non_applicable_privileges.to_error_string(),
443                    object_description,
444                )
445            }
446            AdapterNotice::PlanNotice(plan) => plan.fmt(f),
447            AdapterNotice::UnknownSessionDatabase(name) => {
448                write!(f, "session database {} does not exist", name.quoted())
449            }
450            AdapterNotice::OptimizerNotice { notice, hint: _ } => notice.fmt(f),
451            AdapterNotice::WebhookSourceCreated { url } => {
452                write!(f, "URL to POST data is '{url}'")
453            }
454            AdapterNotice::DroppedInUseIndex(DroppedInUseIndex {
455                index_name,
456                dependant_objects,
457            }) => {
458                write!(
459                    f,
460                    "The dropped index {index_name} is being used by the following objects: {}. The index is now dropped from the catalog, but it will continue to be maintained and take up resources until all dependent objects are dropped, altered, or Materialize is restarted!",
461                    separated(", ", dependant_objects)
462                )
463            }
464            AdapterNotice::PerReplicaLogRead { log_names } => {
465                write!(
466                    f,
467                    "Queried introspection relations: {}. Unlike other objects in Materialize, results from querying these objects depend on the current values of the `cluster` and `cluster_replica` session variables.",
468                    log_names.join(", ")
469                )
470            }
471            AdapterNotice::VarDefaultUpdated { role, var_name } => {
472                let vars = match var_name {
473                    Some(name) => format!("variable {} was", name.quoted()),
474                    None => "variables were".to_string(),
475                };
476                let target = match role {
477                    Some(role_name) => role_name.quoted().to_string(),
478                    None => "the system".to_string(),
479                };
480                write!(
481                    f,
482                    "{vars} updated for {target}, this will have no effect on the current session"
483                )
484            }
485            AdapterNotice::Welcome(message) => message.fmt(f),
486            AdapterNotice::PlanInsights(message) => message.fmt(f),
487            AdapterNotice::IntrospectionClusterUsage => write!(
488                f,
489                "The mz_introspection cluster has been renamed to mz_catalog_server."
490            ),
491            AdapterNotice::AutoRouteIntrospectionQueriesUsage => write!(
492                f,
493                "The auto_route_introspection_queries variable has been renamed to auto_route_catalog_queries."
494            ),
495        }
496    }
497}
498
499#[derive(Clone, Debug)]
500pub struct DroppedInUseIndex {
501    pub index_name: String,
502    pub dependant_objects: Vec<String>,
503}
504
505impl From<PlanNotice> for AdapterNotice {
506    fn from(notice: PlanNotice) -> AdapterNotice {
507        AdapterNotice::PlanNotice(notice)
508    }
509}