1use std::fmt;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_controller::clusters::ClusterStatus;
15use mz_orchestrator::{OfflineReason, ServiceStatus};
16use mz_ore::str::{StrExt, separated};
17use mz_pgwire_common::{ErrorResponse, Severity};
18use mz_repr::adt::mz_acl_item::AclMode;
19use mz_repr::strconv;
20use mz_sql::ast::NoticeSeverity;
21use mz_sql::catalog::ErrorMessageObjectDescription;
22use mz_sql::plan::PlanNotice;
23use mz_sql::session::vars::IsolationLevel;
24use tokio_postgres::error::SqlState;
25
26use crate::TimestampExplanation;
27
28#[derive(Clone, Debug)]
33pub enum AdapterNotice {
34 DatabaseAlreadyExists {
35 name: String,
36 },
37 SchemaAlreadyExists {
38 name: String,
39 },
40 TableAlreadyExists {
41 name: String,
42 },
43 ObjectAlreadyExists {
44 name: String,
45 ty: &'static str,
46 },
47 DatabaseDoesNotExist {
48 name: String,
49 },
50 ClusterDoesNotExist {
51 name: String,
52 },
53 DefaultClusterDoesNotExist {
54 name: String,
55 kind: &'static str,
56 suggested_action: String,
57 },
58 NoResolvableSearchPathSchema {
59 search_path: Vec<String>,
60 },
61 ExistingTransactionInProgress,
62 ExplicitTransactionControlInImplicitTransaction,
63 UserRequested {
64 severity: NoticeSeverity,
65 },
66 ClusterReplicaStatusChanged {
67 cluster: String,
68 replica: String,
69 status: ClusterStatus,
70 time: DateTime<Utc>,
71 },
72 CascadeDroppedObject {
73 objects: Vec<String>,
74 },
75 DroppedActiveDatabase {
76 name: String,
77 },
78 DroppedActiveCluster {
79 name: String,
80 },
81 QueryTimestamp {
82 explanation: TimestampExplanation<mz_repr::Timestamp>,
83 },
84 EqualSubscribeBounds {
85 bound: mz_repr::Timestamp,
86 },
87 QueryTrace {
88 trace_id: opentelemetry::trace::TraceId,
89 },
90 UnimplementedIsolationLevel {
91 isolation_level: String,
92 },
93 StrongSessionSerializable,
94 BadStartupSetting {
95 name: String,
96 reason: String,
97 },
98 RbacUserDisabled,
99 RoleMembershipAlreadyExists {
100 role_name: String,
101 member_name: String,
102 },
103 RoleMembershipDoesNotExists {
104 role_name: String,
105 member_name: String,
106 },
107 AutoRunOnCatalogServerCluster,
108 AlterIndexOwner {
109 name: String,
110 },
111 CannotRevoke {
112 object_description: ErrorMessageObjectDescription,
113 },
114 NonApplicablePrivilegeTypes {
115 non_applicable_privileges: AclMode,
116 object_description: ErrorMessageObjectDescription,
117 },
118 PlanNotice(PlanNotice),
119 UnknownSessionDatabase(String),
120 OptimizerNotice {
121 notice: String,
122 hint: String,
123 },
124 WebhookSourceCreated {
125 url: url::Url,
126 },
127 DroppedInUseIndex(DroppedInUseIndex),
128 PerReplicaLogRead {
129 log_names: Vec<String>,
130 },
131 VarDefaultUpdated {
132 role: Option<String>,
133 var_name: Option<String>,
134 },
135 Welcome(String),
136 PlanInsights(String),
137 IntrospectionClusterUsage,
138 AutoRouteIntrospectionQueriesUsage,
139}
140
141impl AdapterNotice {
142 pub fn into_response(self) -> ErrorResponse {
143 ErrorResponse {
144 severity: self.severity(),
145 code: self.code(),
146 message: self.to_string(),
147 detail: self.detail(),
148 hint: self.hint(),
149 position: None,
150 }
151 }
152
153 pub fn severity(&self) -> Severity {
155 match self {
156 AdapterNotice::DatabaseAlreadyExists { .. } => Severity::Notice,
157 AdapterNotice::SchemaAlreadyExists { .. } => Severity::Notice,
158 AdapterNotice::TableAlreadyExists { .. } => Severity::Notice,
159 AdapterNotice::ObjectAlreadyExists { .. } => Severity::Notice,
160 AdapterNotice::DatabaseDoesNotExist { .. } => Severity::Notice,
161 AdapterNotice::ClusterDoesNotExist { .. } => Severity::Notice,
162 AdapterNotice::DefaultClusterDoesNotExist { .. } => Severity::Notice,
163 AdapterNotice::NoResolvableSearchPathSchema { .. } => Severity::Notice,
164 AdapterNotice::ExistingTransactionInProgress => Severity::Warning,
165 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => Severity::Warning,
166 AdapterNotice::UserRequested { severity } => match severity {
167 NoticeSeverity::Debug => Severity::Debug,
168 NoticeSeverity::Info => Severity::Info,
169 NoticeSeverity::Log => Severity::Log,
170 NoticeSeverity::Notice => Severity::Notice,
171 NoticeSeverity::Warning => Severity::Warning,
172 },
173 AdapterNotice::ClusterReplicaStatusChanged { .. } => Severity::Notice,
174 AdapterNotice::CascadeDroppedObject { .. } => Severity::Notice,
175 AdapterNotice::DroppedActiveDatabase { .. } => Severity::Notice,
176 AdapterNotice::DroppedActiveCluster { .. } => Severity::Notice,
177 AdapterNotice::QueryTimestamp { .. } => Severity::Notice,
178 AdapterNotice::EqualSubscribeBounds { .. } => Severity::Notice,
179 AdapterNotice::QueryTrace { .. } => Severity::Notice,
180 AdapterNotice::UnimplementedIsolationLevel { .. } => Severity::Notice,
181 AdapterNotice::StrongSessionSerializable => Severity::Notice,
182 AdapterNotice::BadStartupSetting { .. } => Severity::Notice,
183 AdapterNotice::RbacUserDisabled => Severity::Notice,
184 AdapterNotice::RoleMembershipAlreadyExists { .. } => Severity::Notice,
185 AdapterNotice::RoleMembershipDoesNotExists { .. } => Severity::Warning,
186 AdapterNotice::AutoRunOnCatalogServerCluster => Severity::Debug,
187 AdapterNotice::AlterIndexOwner { .. } => Severity::Warning,
188 AdapterNotice::CannotRevoke { .. } => Severity::Warning,
189 AdapterNotice::NonApplicablePrivilegeTypes { .. } => Severity::Notice,
190 AdapterNotice::PlanNotice(notice) => match notice {
191 PlanNotice::ObjectDoesNotExist { .. } => Severity::Notice,
192 PlanNotice::ColumnAlreadyExists { .. } => Severity::Notice,
193 PlanNotice::UpsertSinkKeyNotEnforced { .. } => Severity::Warning,
194 PlanNotice::ReplicaDiskOptionDeprecated { .. } => Severity::Notice,
195 },
196 AdapterNotice::UnknownSessionDatabase(_) => Severity::Notice,
197 AdapterNotice::OptimizerNotice { .. } => Severity::Notice,
198 AdapterNotice::WebhookSourceCreated { .. } => Severity::Notice,
199 AdapterNotice::DroppedInUseIndex { .. } => Severity::Notice,
200 AdapterNotice::PerReplicaLogRead { .. } => Severity::Notice,
201 AdapterNotice::VarDefaultUpdated { .. } => Severity::Notice,
202 AdapterNotice::Welcome(_) => Severity::Notice,
203 AdapterNotice::PlanInsights(_) => Severity::Notice,
204 AdapterNotice::IntrospectionClusterUsage => Severity::Warning,
205 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Severity::Warning,
206 }
207 }
208
209 pub fn detail(&self) -> Option<String> {
211 match self {
212 AdapterNotice::PlanNotice(notice) => notice.detail(),
213 AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
214 AdapterNotice::CascadeDroppedObject { objects } => Some(
215 objects
216 .iter()
217 .map(|obj_info| format!("drop cascades to {}", obj_info))
218 .join("\n"),
219 ),
220 _ => None,
221 }
222 }
223
224 pub fn hint(&self) -> Option<String> {
226 match self {
227 AdapterNotice::DatabaseDoesNotExist { name: _ } => Some("Create the database with CREATE DATABASE or pick an extant database with SET DATABASE = name. List available databases with SHOW DATABASES.".into()),
228 AdapterNotice::ClusterDoesNotExist { name: _ } => Some("Create the cluster with CREATE CLUSTER or pick an extant cluster with SET CLUSTER = name. List available clusters with SHOW CLUSTERS.".into()),
229 AdapterNotice::DefaultClusterDoesNotExist {
230 name: _,
231 kind: _,
232 suggested_action,
233 } => Some(suggested_action.clone()),
234 AdapterNotice::NoResolvableSearchPathSchema { search_path: _ } => Some("Create a schema with CREATE SCHEMA or pick an extant schema with SET SCHEMA = name. List available schemas with SHOW SCHEMAS.".into()),
235 AdapterNotice::DroppedActiveDatabase { name: _ } => Some("Choose a new active database by executing SET DATABASE = <name>.".into()),
236 AdapterNotice::DroppedActiveCluster { name: _ } => Some("Choose a new active cluster by executing SET CLUSTER = <name>.".into()),
237 AdapterNotice::ClusterReplicaStatusChanged { status, .. } => {
238 match status {
239 ServiceStatus::Offline(None)
240 | ServiceStatus::Offline(Some(OfflineReason::Initializing)) => Some("The cluster replica may be restarting or going offline.".into()),
241 ServiceStatus::Offline(Some(OfflineReason::OomKilled)) => Some("The cluster replica may have run out of memory and been killed.".into()),
242 ServiceStatus::Online => None,
243 }
244 },
245 AdapterNotice::RbacUserDisabled => Some("To enable RBAC globally run `ALTER SYSTEM SET enable_rbac_checks TO TRUE` as a superuser. TO enable RBAC for just this session run `SET enable_session_rbac_checks TO TRUE`.".into()),
246 AdapterNotice::AlterIndexOwner {name: _} => Some("Change the ownership of the index's relation, instead.".into()),
247 AdapterNotice::UnknownSessionDatabase(_) => Some(
248 "Create the database with CREATE DATABASE \
249 or pick an extant database with SET DATABASE = name. \
250 List available databases with SHOW DATABASES."
251 .into(),
252 ),
253 AdapterNotice::OptimizerNotice { notice: _, hint } => Some(hint.clone()),
254 AdapterNotice::DroppedInUseIndex(..) => Some("To free up the resources used by the index, recreate all the above-mentioned objects.".into()),
255 AdapterNotice::IntrospectionClusterUsage => Some("Use the new name instead.".into()),
256 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Some("Use the new name instead.".into()),
257 _ => None
258 }
259 }
260
261 pub fn code(&self) -> SqlState {
263 match self {
264 AdapterNotice::DatabaseAlreadyExists { .. } => SqlState::DUPLICATE_DATABASE,
265 AdapterNotice::SchemaAlreadyExists { .. } => SqlState::DUPLICATE_SCHEMA,
266 AdapterNotice::TableAlreadyExists { .. } => SqlState::DUPLICATE_TABLE,
267 AdapterNotice::ObjectAlreadyExists { .. } => SqlState::DUPLICATE_OBJECT,
268 AdapterNotice::DatabaseDoesNotExist { .. } => SqlState::from_code("MZ006"),
269 AdapterNotice::ClusterDoesNotExist { .. } => SqlState::from_code("MZ007"),
270 AdapterNotice::NoResolvableSearchPathSchema { .. } => SqlState::from_code("MZ008"),
271 AdapterNotice::ExistingTransactionInProgress => SqlState::ACTIVE_SQL_TRANSACTION,
272 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
273 SqlState::NO_ACTIVE_SQL_TRANSACTION
274 }
275 AdapterNotice::UserRequested { severity } => match severity {
276 NoticeSeverity::Warning => SqlState::WARNING,
277 _ => SqlState::SUCCESSFUL_COMPLETION,
278 },
279 AdapterNotice::ClusterReplicaStatusChanged { .. } => SqlState::SUCCESSFUL_COMPLETION,
280 AdapterNotice::CascadeDroppedObject { .. } => SqlState::SUCCESSFUL_COMPLETION,
281 AdapterNotice::DroppedActiveDatabase { .. } => SqlState::from_code("MZ002"),
282 AdapterNotice::DroppedActiveCluster { .. } => SqlState::from_code("MZ003"),
283 AdapterNotice::QueryTimestamp { .. } => SqlState::SUCCESSFUL_COMPLETION,
284 AdapterNotice::EqualSubscribeBounds { .. } => SqlState::SUCCESSFUL_COMPLETION,
285 AdapterNotice::QueryTrace { .. } => SqlState::SUCCESSFUL_COMPLETION,
286 AdapterNotice::UnimplementedIsolationLevel { .. } => SqlState::SUCCESSFUL_COMPLETION,
287 AdapterNotice::StrongSessionSerializable => SqlState::SUCCESSFUL_COMPLETION,
288 AdapterNotice::BadStartupSetting { .. } => SqlState::SUCCESSFUL_COMPLETION,
289 AdapterNotice::RbacUserDisabled => SqlState::SUCCESSFUL_COMPLETION,
290 AdapterNotice::RoleMembershipAlreadyExists { .. } => SqlState::SUCCESSFUL_COMPLETION,
291 AdapterNotice::RoleMembershipDoesNotExists { .. } => SqlState::WARNING,
292 AdapterNotice::AutoRunOnCatalogServerCluster => SqlState::SUCCESSFUL_COMPLETION,
293 AdapterNotice::AlterIndexOwner { .. } => SqlState::WARNING,
294 AdapterNotice::CannotRevoke { .. } => SqlState::WARNING_PRIVILEGE_NOT_REVOKED,
295 AdapterNotice::NonApplicablePrivilegeTypes { .. } => SqlState::SUCCESSFUL_COMPLETION,
296 AdapterNotice::PlanNotice(plan) => match plan {
297 PlanNotice::ObjectDoesNotExist { .. } => SqlState::UNDEFINED_OBJECT,
298 PlanNotice::ColumnAlreadyExists { .. } => SqlState::DUPLICATE_COLUMN,
299 PlanNotice::UpsertSinkKeyNotEnforced { .. } => SqlState::WARNING,
300 PlanNotice::ReplicaDiskOptionDeprecated { .. } => {
301 SqlState::WARNING_DEPRECATED_FEATURE
302 }
303 },
304 AdapterNotice::UnknownSessionDatabase(_) => SqlState::from_code("MZ004"),
305 AdapterNotice::DefaultClusterDoesNotExist { .. } => SqlState::from_code("MZ005"),
306 AdapterNotice::OptimizerNotice { .. } => SqlState::SUCCESSFUL_COMPLETION,
307 AdapterNotice::DroppedInUseIndex { .. } => SqlState::SUCCESSFUL_COMPLETION,
308 AdapterNotice::WebhookSourceCreated { .. } => SqlState::SUCCESSFUL_COMPLETION,
309 AdapterNotice::PerReplicaLogRead { .. } => SqlState::SUCCESSFUL_COMPLETION,
310 AdapterNotice::VarDefaultUpdated { .. } => SqlState::SUCCESSFUL_COMPLETION,
311 AdapterNotice::Welcome(_) => SqlState::SUCCESSFUL_COMPLETION,
312 AdapterNotice::PlanInsights(_) => SqlState::from_code("MZ001"),
313 AdapterNotice::IntrospectionClusterUsage => SqlState::WARNING,
314 AdapterNotice::AutoRouteIntrospectionQueriesUsage => SqlState::WARNING,
315 }
316 }
317}
318
319impl fmt::Display for AdapterNotice {
320 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
321 match self {
322 AdapterNotice::DatabaseAlreadyExists { name } => {
323 write!(f, "database {} already exists, skipping", name.quoted())
324 }
325 AdapterNotice::SchemaAlreadyExists { name } => {
326 write!(f, "schema {} already exists, skipping", name.quoted())
327 }
328 AdapterNotice::TableAlreadyExists { name } => {
329 write!(f, "table {} already exists, skipping", name.quoted())
330 }
331 AdapterNotice::ObjectAlreadyExists { name, ty } => {
332 write!(f, "{} {} already exists, skipping", ty, name.quoted())
333 }
334 AdapterNotice::DatabaseDoesNotExist { name } => {
335 write!(f, "database {} does not exist", name.quoted())
336 }
337 AdapterNotice::CascadeDroppedObject { objects } => {
338 write!(f, "drop cascades to {} other objects", objects.len())
339 }
340 AdapterNotice::ClusterDoesNotExist { name } => {
341 write!(f, "cluster {} does not exist", name.quoted())
342 }
343 AdapterNotice::DefaultClusterDoesNotExist { kind, name, .. } => {
344 write!(f, "{kind} default cluster {} does not exist", name.quoted())
345 }
346 AdapterNotice::NoResolvableSearchPathSchema { search_path } => {
347 write!(
348 f,
349 "no schema on the search path exists: {}",
350 search_path.join(", ")
351 )
352 }
353 AdapterNotice::ExistingTransactionInProgress => {
354 write!(f, "there is already a transaction in progress")
355 }
356 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
357 write!(f, "there is no transaction in progress")
358 }
359 AdapterNotice::UserRequested { severity } => {
360 write!(f, "raised a test {}", severity.to_string().to_lowercase())
361 }
362 AdapterNotice::ClusterReplicaStatusChanged {
363 cluster,
364 replica,
365 status,
366 time,
367 } => {
368 let mut time_buf = String::new();
369 strconv::format_timestamptz(&mut time_buf, time);
370 write!(
371 f,
372 "cluster replica {}.{} changed status to {} at {}",
373 cluster,
374 replica,
375 status.as_kebab_case_str().quoted(),
376 time_buf,
377 )?;
378 Ok(())
379 }
380 AdapterNotice::DroppedActiveDatabase { name } => {
381 write!(f, "active database {} has been dropped", name.quoted())
382 }
383 AdapterNotice::DroppedActiveCluster { name } => {
384 write!(f, "active cluster {} has been dropped", name.quoted())
385 }
386 AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
387 AdapterNotice::EqualSubscribeBounds { bound } => {
388 write!(
389 f,
390 "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty"
391 )
392 }
393 AdapterNotice::QueryTrace { trace_id } => {
394 write!(f, "trace id: {}", trace_id)
395 }
396 AdapterNotice::UnimplementedIsolationLevel { isolation_level } => {
397 write!(
398 f,
399 "transaction isolation level {isolation_level} is unimplemented, the session will be upgraded to {}",
400 IsolationLevel::Serializable.as_str()
401 )
402 }
403 AdapterNotice::StrongSessionSerializable => {
404 write!(
405 f,
406 "The Strong Session Serializable isolation level may exhibit consistency violations when reading from catalog objects",
407 )
408 }
409 AdapterNotice::BadStartupSetting { name, reason } => {
410 write!(f, "startup setting {name} not set: {reason}")
411 }
412 AdapterNotice::RbacUserDisabled => {
413 write!(
414 f,
415 "RBAC is disabled so no role attributes or object ownership will be considered \
416 when executing statements"
417 )
418 }
419 AdapterNotice::RoleMembershipAlreadyExists {
420 role_name,
421 member_name,
422 } => write!(
423 f,
424 "role \"{member_name}\" is already a member of role \"{role_name}\""
425 ),
426 AdapterNotice::RoleMembershipDoesNotExists {
427 role_name,
428 member_name,
429 } => write!(
430 f,
431 "role \"{member_name}\" is not a member of role \"{role_name}\""
432 ),
433 AdapterNotice::AutoRunOnCatalogServerCluster => write!(
434 f,
435 "query was automatically run on the \"mz_catalog_server\" cluster"
436 ),
437 AdapterNotice::AlterIndexOwner { name } => {
438 write!(f, "cannot change owner of {}", name.quoted())
439 }
440 AdapterNotice::CannotRevoke { object_description } => {
441 write!(f, "no privileges could be revoked for {object_description}")
442 }
443 AdapterNotice::NonApplicablePrivilegeTypes {
444 non_applicable_privileges,
445 object_description,
446 } => {
447 write!(
448 f,
449 "non-applicable privilege types {} for {}",
450 non_applicable_privileges.to_error_string(),
451 object_description,
452 )
453 }
454 AdapterNotice::PlanNotice(plan) => plan.fmt(f),
455 AdapterNotice::UnknownSessionDatabase(name) => {
456 write!(f, "session database {} does not exist", name.quoted())
457 }
458 AdapterNotice::OptimizerNotice { notice, hint: _ } => notice.fmt(f),
459 AdapterNotice::WebhookSourceCreated { url } => {
460 write!(f, "URL to POST data is '{url}'")
461 }
462 AdapterNotice::DroppedInUseIndex(DroppedInUseIndex {
463 index_name,
464 dependant_objects,
465 }) => {
466 write!(
467 f,
468 "The dropped index {index_name} is being used by the following objects: {}. The index is now dropped from the catalog, but it will continue to be maintained and take up resources until all dependent objects are dropped, altered, or Materialize is restarted!",
469 separated(", ", dependant_objects)
470 )
471 }
472 AdapterNotice::PerReplicaLogRead { log_names } => {
473 write!(
474 f,
475 "Queried introspection relations: {}. Unlike other objects in Materialize, results from querying these objects depend on the current values of the `cluster` and `cluster_replica` session variables.",
476 log_names.join(", ")
477 )
478 }
479 AdapterNotice::VarDefaultUpdated { role, var_name } => {
480 let vars = match var_name {
481 Some(name) => format!("variable {} was", name.quoted()),
482 None => "variables were".to_string(),
483 };
484 let target = match role {
485 Some(role_name) => role_name.quoted().to_string(),
486 None => "the system".to_string(),
487 };
488 write!(
489 f,
490 "{vars} updated for {target}, this will have no effect on the current session"
491 )
492 }
493 AdapterNotice::Welcome(message) => message.fmt(f),
494 AdapterNotice::PlanInsights(message) => message.fmt(f),
495 AdapterNotice::IntrospectionClusterUsage => write!(
496 f,
497 "The mz_introspection cluster has been renamed to mz_catalog_server."
498 ),
499 AdapterNotice::AutoRouteIntrospectionQueriesUsage => write!(
500 f,
501 "The auto_route_introspection_queries variable has been renamed to auto_route_catalog_queries."
502 ),
503 }
504 }
505}
506
507#[derive(Clone, Debug)]
508pub struct DroppedInUseIndex {
509 pub index_name: String,
510 pub dependant_objects: Vec<String>,
511}
512
513impl From<PlanNotice> for AdapterNotice {
514 fn from(notice: PlanNotice) -> AdapterNotice {
515 AdapterNotice::PlanNotice(notice)
516 }
517}