1use std::fmt;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_controller::clusters::ClusterStatus;
15use mz_orchestrator::{OfflineReason, ServiceStatus};
16use mz_ore::str::{StrExt, separated};
17use mz_pgwire_common::{ErrorResponse, Severity};
18use mz_repr::adt::mz_acl_item::AclMode;
19use mz_repr::strconv;
20use mz_sql::ast::NoticeSeverity;
21use mz_sql::catalog::ErrorMessageObjectDescription;
22use mz_sql::plan::PlanNotice;
23use mz_sql::session::vars::IsolationLevel;
24use tokio_postgres::error::SqlState;
25
26use crate::TimestampExplanation;
27
28#[derive(Clone, Debug)]
33pub enum AdapterNotice {
34 DatabaseAlreadyExists {
35 name: String,
36 },
37 SchemaAlreadyExists {
38 name: String,
39 },
40 TableAlreadyExists {
41 name: String,
42 },
43 ObjectAlreadyExists {
44 name: String,
45 ty: &'static str,
46 },
47 DatabaseDoesNotExist {
48 name: String,
49 },
50 ClusterDoesNotExist {
51 name: String,
52 },
53 DefaultClusterDoesNotExist {
54 name: String,
55 kind: &'static str,
56 suggested_action: String,
57 },
58 NoResolvableSearchPathSchema {
59 search_path: Vec<String>,
60 },
61 ExistingTransactionInProgress,
62 ExplicitTransactionControlInImplicitTransaction,
63 UserRequested {
64 severity: NoticeSeverity,
65 },
66 ClusterReplicaStatusChanged {
67 cluster: String,
68 replica: String,
69 status: ClusterStatus,
70 time: DateTime<Utc>,
71 },
72 CascadeDroppedObject {
73 objects: Vec<String>,
74 },
75 DroppedActiveDatabase {
76 name: String,
77 },
78 DroppedActiveCluster {
79 name: String,
80 },
81 QueryTimestamp {
82 explanation: TimestampExplanation<mz_repr::Timestamp>,
83 },
84 EqualSubscribeBounds {
85 bound: mz_repr::Timestamp,
86 },
87 QueryTrace {
88 trace_id: opentelemetry::trace::TraceId,
89 },
90 UnimplementedIsolationLevel {
91 isolation_level: String,
92 },
93 StrongSessionSerializable,
94 BadStartupSetting {
95 name: String,
96 reason: String,
97 },
98 RbacUserDisabled,
99 RoleMembershipAlreadyExists {
100 role_name: String,
101 member_name: String,
102 },
103 RoleMembershipDoesNotExists {
104 role_name: String,
105 member_name: String,
106 },
107 AutoRunOnCatalogServerCluster,
108 AlterIndexOwner {
109 name: String,
110 },
111 CannotRevoke {
112 object_description: ErrorMessageObjectDescription,
113 },
114 NonApplicablePrivilegeTypes {
115 non_applicable_privileges: AclMode,
116 object_description: ErrorMessageObjectDescription,
117 },
118 PlanNotice(PlanNotice),
119 UnknownSessionDatabase(String),
120 OptimizerNotice {
121 notice: String,
122 hint: String,
123 },
124 WebhookSourceCreated {
125 url: url::Url,
126 },
127 DroppedInUseIndex(DroppedInUseIndex),
128 PerReplicaLogRead {
129 log_names: Vec<String>,
130 },
131 VarDefaultUpdated {
132 role: Option<String>,
133 var_name: Option<String>,
134 },
135 Welcome(String),
136 PlanInsights(String),
137 IntrospectionClusterUsage,
138 AutoRouteIntrospectionQueriesUsage,
139}
140
141impl AdapterNotice {
142 pub fn into_response(self) -> ErrorResponse {
143 ErrorResponse {
144 severity: self.severity(),
145 code: self.code(),
146 message: self.to_string(),
147 detail: self.detail(),
148 hint: self.hint(),
149 position: None,
150 }
151 }
152
153 pub fn severity(&self) -> Severity {
155 match self {
156 AdapterNotice::DatabaseAlreadyExists { .. } => Severity::Notice,
157 AdapterNotice::SchemaAlreadyExists { .. } => Severity::Notice,
158 AdapterNotice::TableAlreadyExists { .. } => Severity::Notice,
159 AdapterNotice::ObjectAlreadyExists { .. } => Severity::Notice,
160 AdapterNotice::DatabaseDoesNotExist { .. } => Severity::Notice,
161 AdapterNotice::ClusterDoesNotExist { .. } => Severity::Notice,
162 AdapterNotice::DefaultClusterDoesNotExist { .. } => Severity::Notice,
163 AdapterNotice::NoResolvableSearchPathSchema { .. } => Severity::Notice,
164 AdapterNotice::ExistingTransactionInProgress => Severity::Warning,
165 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => Severity::Warning,
166 AdapterNotice::UserRequested { severity } => match severity {
167 NoticeSeverity::Debug => Severity::Debug,
168 NoticeSeverity::Info => Severity::Info,
169 NoticeSeverity::Log => Severity::Log,
170 NoticeSeverity::Notice => Severity::Notice,
171 NoticeSeverity::Warning => Severity::Warning,
172 },
173 AdapterNotice::ClusterReplicaStatusChanged { .. } => Severity::Notice,
174 AdapterNotice::CascadeDroppedObject { .. } => Severity::Notice,
175 AdapterNotice::DroppedActiveDatabase { .. } => Severity::Notice,
176 AdapterNotice::DroppedActiveCluster { .. } => Severity::Notice,
177 AdapterNotice::QueryTimestamp { .. } => Severity::Notice,
178 AdapterNotice::EqualSubscribeBounds { .. } => Severity::Notice,
179 AdapterNotice::QueryTrace { .. } => Severity::Notice,
180 AdapterNotice::UnimplementedIsolationLevel { .. } => Severity::Notice,
181 AdapterNotice::StrongSessionSerializable => Severity::Notice,
182 AdapterNotice::BadStartupSetting { .. } => Severity::Notice,
183 AdapterNotice::RbacUserDisabled => Severity::Notice,
184 AdapterNotice::RoleMembershipAlreadyExists { .. } => Severity::Notice,
185 AdapterNotice::RoleMembershipDoesNotExists { .. } => Severity::Warning,
186 AdapterNotice::AutoRunOnCatalogServerCluster => Severity::Debug,
187 AdapterNotice::AlterIndexOwner { .. } => Severity::Warning,
188 AdapterNotice::CannotRevoke { .. } => Severity::Warning,
189 AdapterNotice::NonApplicablePrivilegeTypes { .. } => Severity::Notice,
190 AdapterNotice::PlanNotice(notice) => match notice {
191 PlanNotice::ObjectDoesNotExist { .. } => Severity::Notice,
192 PlanNotice::ColumnAlreadyExists { .. } => Severity::Notice,
193 PlanNotice::UpsertSinkKeyNotEnforced { .. } => Severity::Warning,
194 PlanNotice::ReplicaDiskOptionDeprecated { .. } => Severity::Notice,
195 },
196 AdapterNotice::UnknownSessionDatabase(_) => Severity::Notice,
197 AdapterNotice::OptimizerNotice { .. } => Severity::Notice,
198 AdapterNotice::WebhookSourceCreated { .. } => Severity::Notice,
199 AdapterNotice::DroppedInUseIndex { .. } => Severity::Notice,
200 AdapterNotice::PerReplicaLogRead { .. } => Severity::Notice,
201 AdapterNotice::VarDefaultUpdated { .. } => Severity::Notice,
202 AdapterNotice::Welcome(_) => Severity::Notice,
203 AdapterNotice::PlanInsights(_) => Severity::Notice,
204 AdapterNotice::IntrospectionClusterUsage => Severity::Warning,
205 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Severity::Warning,
206 }
207 }
208
209 pub fn detail(&self) -> Option<String> {
211 match self {
212 AdapterNotice::PlanNotice(notice) => notice.detail(),
213 AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
214 AdapterNotice::CascadeDroppedObject { objects } => Some(
215 objects
216 .iter()
217 .map(|obj_info| format!("drop cascades to {}", obj_info))
218 .join("\n"),
219 ),
220 _ => None,
221 }
222 }
223
224 pub fn hint(&self) -> Option<String> {
226 match self {
227 AdapterNotice::DatabaseDoesNotExist { name: _ } => Some("Create the database with CREATE DATABASE or pick an extant database with SET DATABASE = name. List available databases with SHOW DATABASES.".into()),
228 AdapterNotice::ClusterDoesNotExist { name: _ } => Some("Create the cluster with CREATE CLUSTER or pick an extant cluster with SET CLUSTER = name. List available clusters with SHOW CLUSTERS.".into()),
229 AdapterNotice::DefaultClusterDoesNotExist { name: _, kind: _, suggested_action } => Some(suggested_action.clone()),
230 AdapterNotice::NoResolvableSearchPathSchema { search_path: _ } => Some("Create a schema with CREATE SCHEMA or pick an extant schema with SET SCHEMA = name. List available schemas with SHOW SCHEMAS.".into()),
231 AdapterNotice::DroppedActiveDatabase { name: _ } => Some("Choose a new active database by executing SET DATABASE = <name>.".into()),
232 AdapterNotice::DroppedActiveCluster { name: _ } => Some("Choose a new active cluster by executing SET CLUSTER = <name>.".into()),
233 AdapterNotice::ClusterReplicaStatusChanged { status, .. } => {
234 match status {
235 ServiceStatus::Offline(None)
236 | ServiceStatus::Offline(Some(OfflineReason::Initializing)) => Some("The cluster replica may be restarting or going offline.".into()),
237 ServiceStatus::Offline(Some(OfflineReason::OomKilled)) => Some("The cluster replica may have run out of memory and been killed.".into()),
238 ServiceStatus::Online => None,
239 }
240 },
241 AdapterNotice::RbacUserDisabled => Some("To enable RBAC globally run `ALTER SYSTEM SET enable_rbac_checks TO TRUE` as a superuser. TO enable RBAC for just this session run `SET enable_session_rbac_checks TO TRUE`.".into()),
242 AdapterNotice::AlterIndexOwner {name: _} => Some("Change the ownership of the index's relation, instead.".into()),
243 AdapterNotice::UnknownSessionDatabase(_) => Some(
244 "Create the database with CREATE DATABASE \
245 or pick an extant database with SET DATABASE = name. \
246 List available databases with SHOW DATABASES."
247 .into(),
248 ),
249 AdapterNotice::OptimizerNotice { notice: _, hint } => Some(hint.clone()),
250 AdapterNotice::DroppedInUseIndex(..) => Some("To free up the resources used by the index, recreate all the above-mentioned objects.".into()),
251 AdapterNotice::IntrospectionClusterUsage => Some("Use the new name instead.".into()),
252 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Some("Use the new name instead.".into()),
253 _ => None
254 }
255 }
256
257 pub fn code(&self) -> SqlState {
259 match self {
260 AdapterNotice::DatabaseAlreadyExists { .. } => SqlState::DUPLICATE_DATABASE,
261 AdapterNotice::SchemaAlreadyExists { .. } => SqlState::DUPLICATE_SCHEMA,
262 AdapterNotice::TableAlreadyExists { .. } => SqlState::DUPLICATE_TABLE,
263 AdapterNotice::ObjectAlreadyExists { .. } => SqlState::DUPLICATE_OBJECT,
264 AdapterNotice::DatabaseDoesNotExist { .. } => SqlState::from_code("MZ006"),
265 AdapterNotice::ClusterDoesNotExist { .. } => SqlState::from_code("MZ007"),
266 AdapterNotice::NoResolvableSearchPathSchema { .. } => SqlState::from_code("MZ008"),
267 AdapterNotice::ExistingTransactionInProgress => SqlState::ACTIVE_SQL_TRANSACTION,
268 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
269 SqlState::NO_ACTIVE_SQL_TRANSACTION
270 }
271 AdapterNotice::UserRequested { severity } => match severity {
272 NoticeSeverity::Warning => SqlState::WARNING,
273 _ => SqlState::SUCCESSFUL_COMPLETION,
274 },
275 AdapterNotice::ClusterReplicaStatusChanged { .. } => SqlState::SUCCESSFUL_COMPLETION,
276 AdapterNotice::CascadeDroppedObject { .. } => SqlState::SUCCESSFUL_COMPLETION,
277 AdapterNotice::DroppedActiveDatabase { .. } => SqlState::from_code("MZ002"),
278 AdapterNotice::DroppedActiveCluster { .. } => SqlState::from_code("MZ003"),
279 AdapterNotice::QueryTimestamp { .. } => SqlState::SUCCESSFUL_COMPLETION,
280 AdapterNotice::EqualSubscribeBounds { .. } => SqlState::SUCCESSFUL_COMPLETION,
281 AdapterNotice::QueryTrace { .. } => SqlState::SUCCESSFUL_COMPLETION,
282 AdapterNotice::UnimplementedIsolationLevel { .. } => SqlState::SUCCESSFUL_COMPLETION,
283 AdapterNotice::StrongSessionSerializable => SqlState::SUCCESSFUL_COMPLETION,
284 AdapterNotice::BadStartupSetting { .. } => SqlState::SUCCESSFUL_COMPLETION,
285 AdapterNotice::RbacUserDisabled => SqlState::SUCCESSFUL_COMPLETION,
286 AdapterNotice::RoleMembershipAlreadyExists { .. } => SqlState::SUCCESSFUL_COMPLETION,
287 AdapterNotice::RoleMembershipDoesNotExists { .. } => SqlState::WARNING,
288 AdapterNotice::AutoRunOnCatalogServerCluster => SqlState::SUCCESSFUL_COMPLETION,
289 AdapterNotice::AlterIndexOwner { .. } => SqlState::WARNING,
290 AdapterNotice::CannotRevoke { .. } => SqlState::WARNING_PRIVILEGE_NOT_REVOKED,
291 AdapterNotice::NonApplicablePrivilegeTypes { .. } => SqlState::SUCCESSFUL_COMPLETION,
292 AdapterNotice::PlanNotice(plan) => match plan {
293 PlanNotice::ObjectDoesNotExist { .. } => SqlState::UNDEFINED_OBJECT,
294 PlanNotice::ColumnAlreadyExists { .. } => SqlState::DUPLICATE_COLUMN,
295 PlanNotice::UpsertSinkKeyNotEnforced { .. } => SqlState::WARNING,
296 PlanNotice::ReplicaDiskOptionDeprecated { .. } => {
297 SqlState::WARNING_DEPRECATED_FEATURE
298 }
299 },
300 AdapterNotice::UnknownSessionDatabase(_) => SqlState::from_code("MZ004"),
301 AdapterNotice::DefaultClusterDoesNotExist { .. } => SqlState::from_code("MZ005"),
302 AdapterNotice::OptimizerNotice { .. } => SqlState::SUCCESSFUL_COMPLETION,
303 AdapterNotice::DroppedInUseIndex { .. } => SqlState::SUCCESSFUL_COMPLETION,
304 AdapterNotice::WebhookSourceCreated { .. } => SqlState::SUCCESSFUL_COMPLETION,
305 AdapterNotice::PerReplicaLogRead { .. } => SqlState::SUCCESSFUL_COMPLETION,
306 AdapterNotice::VarDefaultUpdated { .. } => SqlState::SUCCESSFUL_COMPLETION,
307 AdapterNotice::Welcome(_) => SqlState::SUCCESSFUL_COMPLETION,
308 AdapterNotice::PlanInsights(_) => SqlState::from_code("MZ001"),
309 AdapterNotice::IntrospectionClusterUsage => SqlState::WARNING,
310 AdapterNotice::AutoRouteIntrospectionQueriesUsage => SqlState::WARNING,
311 }
312 }
313}
314
315impl fmt::Display for AdapterNotice {
316 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
317 match self {
318 AdapterNotice::DatabaseAlreadyExists { name } => {
319 write!(f, "database {} already exists, skipping", name.quoted())
320 }
321 AdapterNotice::SchemaAlreadyExists { name } => {
322 write!(f, "schema {} already exists, skipping", name.quoted())
323 }
324 AdapterNotice::TableAlreadyExists { name } => {
325 write!(f, "table {} already exists, skipping", name.quoted())
326 }
327 AdapterNotice::ObjectAlreadyExists { name, ty } => {
328 write!(f, "{} {} already exists, skipping", ty, name.quoted())
329 }
330 AdapterNotice::DatabaseDoesNotExist { name } => {
331 write!(f, "database {} does not exist", name.quoted())
332 }
333 AdapterNotice::CascadeDroppedObject { objects } => {
334 write!(f, "drop cascades to {} other objects", objects.len())
335 }
336 AdapterNotice::ClusterDoesNotExist { name } => {
337 write!(f, "cluster {} does not exist", name.quoted())
338 }
339 AdapterNotice::DefaultClusterDoesNotExist { kind, name, .. } => {
340 write!(f, "{kind} default cluster {} does not exist", name.quoted())
341 }
342 AdapterNotice::NoResolvableSearchPathSchema { search_path } => {
343 write!(
344 f,
345 "no schema on the search path exists: {}",
346 search_path.join(", ")
347 )
348 }
349 AdapterNotice::ExistingTransactionInProgress => {
350 write!(f, "there is already a transaction in progress")
351 }
352 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
353 write!(f, "there is no transaction in progress")
354 }
355 AdapterNotice::UserRequested { severity } => {
356 write!(f, "raised a test {}", severity.to_string().to_lowercase())
357 }
358 AdapterNotice::ClusterReplicaStatusChanged {
359 cluster,
360 replica,
361 status,
362 time,
363 } => {
364 let mut time_buf = String::new();
365 strconv::format_timestamptz(&mut time_buf, time);
366 write!(
367 f,
368 "cluster replica {}.{} changed status to {} at {}",
369 cluster,
370 replica,
371 status.as_kebab_case_str().quoted(),
372 time_buf,
373 )?;
374 Ok(())
375 }
376 AdapterNotice::DroppedActiveDatabase { name } => {
377 write!(f, "active database {} has been dropped", name.quoted())
378 }
379 AdapterNotice::DroppedActiveCluster { name } => {
380 write!(f, "active cluster {} has been dropped", name.quoted())
381 }
382 AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
383 AdapterNotice::EqualSubscribeBounds { bound } => {
384 write!(
385 f,
386 "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty"
387 )
388 }
389 AdapterNotice::QueryTrace { trace_id } => {
390 write!(f, "trace id: {}", trace_id)
391 }
392 AdapterNotice::UnimplementedIsolationLevel { isolation_level } => {
393 write!(
394 f,
395 "transaction isolation level {isolation_level} is unimplemented, the session will be upgraded to {}",
396 IsolationLevel::Serializable.as_str()
397 )
398 }
399 AdapterNotice::StrongSessionSerializable => {
400 write!(
401 f,
402 "The Strong Session Serializable isolation level may exhibit consistency violations when reading from catalog objects",
403 )
404 }
405 AdapterNotice::BadStartupSetting { name, reason } => {
406 write!(f, "startup setting {name} not set: {reason}")
407 }
408 AdapterNotice::RbacUserDisabled => {
409 write!(
410 f,
411 "RBAC is disabled so no role attributes or object ownership will be considered \
412 when executing statements"
413 )
414 }
415 AdapterNotice::RoleMembershipAlreadyExists {
416 role_name,
417 member_name,
418 } => write!(
419 f,
420 "role \"{member_name}\" is already a member of role \"{role_name}\""
421 ),
422 AdapterNotice::RoleMembershipDoesNotExists {
423 role_name,
424 member_name,
425 } => write!(
426 f,
427 "role \"{member_name}\" is not a member of role \"{role_name}\""
428 ),
429 AdapterNotice::AutoRunOnCatalogServerCluster => write!(
430 f,
431 "query was automatically run on the \"mz_catalog_server\" cluster"
432 ),
433 AdapterNotice::AlterIndexOwner { name } => {
434 write!(f, "cannot change owner of {}", name.quoted())
435 }
436 AdapterNotice::CannotRevoke { object_description } => {
437 write!(f, "no privileges could be revoked for {object_description}")
438 }
439 AdapterNotice::NonApplicablePrivilegeTypes {
440 non_applicable_privileges,
441 object_description,
442 } => {
443 write!(
444 f,
445 "non-applicable privilege types {} for {}",
446 non_applicable_privileges.to_error_string(),
447 object_description,
448 )
449 }
450 AdapterNotice::PlanNotice(plan) => plan.fmt(f),
451 AdapterNotice::UnknownSessionDatabase(name) => {
452 write!(f, "session database {} does not exist", name.quoted())
453 }
454 AdapterNotice::OptimizerNotice { notice, hint: _ } => notice.fmt(f),
455 AdapterNotice::WebhookSourceCreated { url } => {
456 write!(f, "URL to POST data is '{url}'")
457 }
458 AdapterNotice::DroppedInUseIndex(DroppedInUseIndex {
459 index_name,
460 dependant_objects,
461 }) => {
462 write!(
463 f,
464 "The dropped index {index_name} is being used by the following objects: {}. The index is now dropped from the catalog, but it will continue to be maintained and take up resources until all dependent objects are dropped, altered, or Materialize is restarted!",
465 separated(", ", dependant_objects)
466 )
467 }
468 AdapterNotice::PerReplicaLogRead { log_names } => {
469 write!(
470 f,
471 "Queried introspection relations: {}. Unlike other objects in Materialize, results from querying these objects depend on the current values of the `cluster` and `cluster_replica` session variables.",
472 log_names.join(", ")
473 )
474 }
475 AdapterNotice::VarDefaultUpdated { role, var_name } => {
476 let vars = match var_name {
477 Some(name) => format!("variable {} was", name.quoted()),
478 None => "variables were".to_string(),
479 };
480 let target = match role {
481 Some(role_name) => role_name.quoted().to_string(),
482 None => "the system".to_string(),
483 };
484 write!(
485 f,
486 "{vars} updated for {target}, this will have no effect on the current session"
487 )
488 }
489 AdapterNotice::Welcome(message) => message.fmt(f),
490 AdapterNotice::PlanInsights(message) => message.fmt(f),
491 AdapterNotice::IntrospectionClusterUsage => write!(
492 f,
493 "The mz_introspection cluster has been renamed to mz_catalog_server."
494 ),
495 AdapterNotice::AutoRouteIntrospectionQueriesUsage => write!(
496 f,
497 "The auto_route_introspection_queries variable has been renamed to auto_route_catalog_queries."
498 ),
499 }
500 }
501}
502
503#[derive(Clone, Debug)]
504pub struct DroppedInUseIndex {
505 pub index_name: String,
506 pub dependant_objects: Vec<String>,
507}
508
509impl From<PlanNotice> for AdapterNotice {
510 fn from(notice: PlanNotice) -> AdapterNotice {
511 AdapterNotice::PlanNotice(notice)
512 }
513}