1use std::fmt;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_controller::clusters::ClusterStatus;
15use mz_orchestrator::{OfflineReason, ServiceStatus};
16use mz_ore::str::{StrExt, separated};
17use mz_pgwire_common::{ErrorResponse, Severity};
18use mz_repr::adt::mz_acl_item::AclMode;
19use mz_repr::strconv;
20use mz_sql::ast::NoticeSeverity;
21use mz_sql::catalog::ErrorMessageObjectDescription;
22use mz_sql::plan::PlanNotice;
23use mz_sql::session::vars::IsolationLevel;
24use tokio_postgres::error::SqlState;
25
26use crate::TimestampExplanation;
27
28#[derive(Clone, Debug)]
33pub enum AdapterNotice {
34 DatabaseAlreadyExists {
35 name: String,
36 },
37 SchemaAlreadyExists {
38 name: String,
39 },
40 TableAlreadyExists {
41 name: String,
42 },
43 ObjectAlreadyExists {
44 name: String,
45 ty: &'static str,
46 },
47 DatabaseDoesNotExist {
48 name: String,
49 },
50 ClusterDoesNotExist {
51 name: String,
52 },
53 DefaultClusterDoesNotExist {
54 name: String,
55 kind: &'static str,
56 suggested_action: String,
57 },
58 NoResolvableSearchPathSchema {
59 search_path: Vec<String>,
60 },
61 ExistingTransactionInProgress,
62 ExplicitTransactionControlInImplicitTransaction,
63 UserRequested {
64 severity: NoticeSeverity,
65 },
66 ClusterReplicaStatusChanged {
67 cluster: String,
68 replica: String,
69 status: ClusterStatus,
70 time: DateTime<Utc>,
71 },
72 CascadeDroppedObject {
73 objects: Vec<String>,
74 },
75 DroppedActiveDatabase {
76 name: String,
77 },
78 DroppedActiveCluster {
79 name: String,
80 },
81 QueryTimestamp {
82 explanation: TimestampExplanation<mz_repr::Timestamp>,
83 },
84 EqualSubscribeBounds {
85 bound: mz_repr::Timestamp,
86 },
87 QueryTrace {
88 trace_id: opentelemetry::trace::TraceId,
89 },
90 UnimplementedIsolationLevel {
91 isolation_level: String,
92 },
93 StrongSessionSerializable,
94 BadStartupSetting {
95 name: String,
96 reason: String,
97 },
98 RbacUserDisabled,
99 RoleMembershipAlreadyExists {
100 role_name: String,
101 member_name: String,
102 },
103 RoleMembershipDoesNotExists {
104 role_name: String,
105 member_name: String,
106 },
107 AutoRunOnCatalogServerCluster,
108 AlterIndexOwner {
109 name: String,
110 },
111 CannotRevoke {
112 object_description: ErrorMessageObjectDescription,
113 },
114 NonApplicablePrivilegeTypes {
115 non_applicable_privileges: AclMode,
116 object_description: ErrorMessageObjectDescription,
117 },
118 PlanNotice(PlanNotice),
119 UnknownSessionDatabase(String),
120 OptimizerNotice {
121 notice: String,
122 hint: String,
123 },
124 WebhookSourceCreated {
125 url: url::Url,
126 },
127 DroppedInUseIndex(DroppedInUseIndex),
128 PerReplicaLogRead {
129 log_names: Vec<String>,
130 },
131 VarDefaultUpdated {
132 role: Option<String>,
133 var_name: Option<String>,
134 },
135 Welcome(String),
136 PlanInsights(String),
137 IntrospectionClusterUsage,
138 AutoRouteIntrospectionQueriesUsage,
139}
140
141impl AdapterNotice {
142 pub fn into_response(self) -> ErrorResponse {
143 ErrorResponse {
144 severity: self.severity(),
145 code: self.code(),
146 message: self.to_string(),
147 detail: self.detail(),
148 hint: self.hint(),
149 position: None,
150 }
151 }
152
153 pub fn severity(&self) -> Severity {
155 match self {
156 AdapterNotice::DatabaseAlreadyExists { .. } => Severity::Notice,
157 AdapterNotice::SchemaAlreadyExists { .. } => Severity::Notice,
158 AdapterNotice::TableAlreadyExists { .. } => Severity::Notice,
159 AdapterNotice::ObjectAlreadyExists { .. } => Severity::Notice,
160 AdapterNotice::DatabaseDoesNotExist { .. } => Severity::Notice,
161 AdapterNotice::ClusterDoesNotExist { .. } => Severity::Notice,
162 AdapterNotice::DefaultClusterDoesNotExist { .. } => Severity::Notice,
163 AdapterNotice::NoResolvableSearchPathSchema { .. } => Severity::Notice,
164 AdapterNotice::ExistingTransactionInProgress => Severity::Warning,
165 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => Severity::Warning,
166 AdapterNotice::UserRequested { severity } => match severity {
167 NoticeSeverity::Debug => Severity::Debug,
168 NoticeSeverity::Info => Severity::Info,
169 NoticeSeverity::Log => Severity::Log,
170 NoticeSeverity::Notice => Severity::Notice,
171 NoticeSeverity::Warning => Severity::Warning,
172 },
173 AdapterNotice::ClusterReplicaStatusChanged { .. } => Severity::Notice,
174 AdapterNotice::CascadeDroppedObject { .. } => Severity::Notice,
175 AdapterNotice::DroppedActiveDatabase { .. } => Severity::Notice,
176 AdapterNotice::DroppedActiveCluster { .. } => Severity::Notice,
177 AdapterNotice::QueryTimestamp { .. } => Severity::Notice,
178 AdapterNotice::EqualSubscribeBounds { .. } => Severity::Notice,
179 AdapterNotice::QueryTrace { .. } => Severity::Notice,
180 AdapterNotice::UnimplementedIsolationLevel { .. } => Severity::Notice,
181 AdapterNotice::StrongSessionSerializable => Severity::Notice,
182 AdapterNotice::BadStartupSetting { .. } => Severity::Notice,
183 AdapterNotice::RbacUserDisabled => Severity::Notice,
184 AdapterNotice::RoleMembershipAlreadyExists { .. } => Severity::Notice,
185 AdapterNotice::RoleMembershipDoesNotExists { .. } => Severity::Warning,
186 AdapterNotice::AutoRunOnCatalogServerCluster => Severity::Debug,
187 AdapterNotice::AlterIndexOwner { .. } => Severity::Warning,
188 AdapterNotice::CannotRevoke { .. } => Severity::Warning,
189 AdapterNotice::NonApplicablePrivilegeTypes { .. } => Severity::Notice,
190 AdapterNotice::PlanNotice(notice) => match notice {
191 PlanNotice::ObjectDoesNotExist { .. } => Severity::Notice,
192 PlanNotice::ColumnAlreadyExists { .. } => Severity::Notice,
193 PlanNotice::UpsertSinkKeyNotEnforced { .. } => Severity::Warning,
194 },
195 AdapterNotice::UnknownSessionDatabase(_) => Severity::Notice,
196 AdapterNotice::OptimizerNotice { .. } => Severity::Notice,
197 AdapterNotice::WebhookSourceCreated { .. } => Severity::Notice,
198 AdapterNotice::DroppedInUseIndex { .. } => Severity::Notice,
199 AdapterNotice::PerReplicaLogRead { .. } => Severity::Notice,
200 AdapterNotice::VarDefaultUpdated { .. } => Severity::Notice,
201 AdapterNotice::Welcome(_) => Severity::Notice,
202 AdapterNotice::PlanInsights(_) => Severity::Notice,
203 AdapterNotice::IntrospectionClusterUsage => Severity::Warning,
204 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Severity::Warning,
205 }
206 }
207
208 pub fn detail(&self) -> Option<String> {
210 match self {
211 AdapterNotice::PlanNotice(notice) => notice.detail(),
212 AdapterNotice::QueryTimestamp { explanation } => Some(format!("\n{explanation}")),
213 AdapterNotice::CascadeDroppedObject { objects } => Some(
214 objects
215 .iter()
216 .map(|obj_info| format!("drop cascades to {}", obj_info))
217 .join("\n"),
218 ),
219 _ => None,
220 }
221 }
222
223 pub fn hint(&self) -> Option<String> {
225 match self {
226 AdapterNotice::DatabaseDoesNotExist { name: _ } => Some("Create the database with CREATE DATABASE or pick an extant database with SET DATABASE = name. List available databases with SHOW DATABASES.".into()),
227 AdapterNotice::ClusterDoesNotExist { name: _ } => Some("Create the cluster with CREATE CLUSTER or pick an extant cluster with SET CLUSTER = name. List available clusters with SHOW CLUSTERS.".into()),
228 AdapterNotice::DefaultClusterDoesNotExist { name: _, kind: _, suggested_action } => Some(suggested_action.clone()),
229 AdapterNotice::NoResolvableSearchPathSchema { search_path: _ } => Some("Create a schema with CREATE SCHEMA or pick an extant schema with SET SCHEMA = name. List available schemas with SHOW SCHEMAS.".into()),
230 AdapterNotice::DroppedActiveDatabase { name: _ } => Some("Choose a new active database by executing SET DATABASE = <name>.".into()),
231 AdapterNotice::DroppedActiveCluster { name: _ } => Some("Choose a new active cluster by executing SET CLUSTER = <name>.".into()),
232 AdapterNotice::ClusterReplicaStatusChanged { status, .. } => {
233 match status {
234 ServiceStatus::Offline(None)
235 | ServiceStatus::Offline(Some(OfflineReason::Initializing)) => Some("The cluster replica may be restarting or going offline.".into()),
236 ServiceStatus::Offline(Some(OfflineReason::OomKilled)) => Some("The cluster replica may have run out of memory and been killed.".into()),
237 ServiceStatus::Online => None,
238 }
239 },
240 AdapterNotice::RbacUserDisabled => Some("To enable RBAC globally run `ALTER SYSTEM SET enable_rbac_checks TO TRUE` as a superuser. TO enable RBAC for just this session run `SET enable_session_rbac_checks TO TRUE`.".into()),
241 AdapterNotice::AlterIndexOwner {name: _} => Some("Change the ownership of the index's relation, instead.".into()),
242 AdapterNotice::UnknownSessionDatabase(_) => Some(
243 "Create the database with CREATE DATABASE \
244 or pick an extant database with SET DATABASE = name. \
245 List available databases with SHOW DATABASES."
246 .into(),
247 ),
248 AdapterNotice::OptimizerNotice { notice: _, hint } => Some(hint.clone()),
249 AdapterNotice::DroppedInUseIndex(..) => Some("To free up the resources used by the index, recreate all the above-mentioned objects.".into()),
250 AdapterNotice::IntrospectionClusterUsage => Some("Use the new name instead.".into()),
251 AdapterNotice::AutoRouteIntrospectionQueriesUsage => Some("Use the new name instead.".into()),
252 _ => None
253 }
254 }
255
256 pub fn code(&self) -> SqlState {
258 match self {
259 AdapterNotice::DatabaseAlreadyExists { .. } => SqlState::DUPLICATE_DATABASE,
260 AdapterNotice::SchemaAlreadyExists { .. } => SqlState::DUPLICATE_SCHEMA,
261 AdapterNotice::TableAlreadyExists { .. } => SqlState::DUPLICATE_TABLE,
262 AdapterNotice::ObjectAlreadyExists { .. } => SqlState::DUPLICATE_OBJECT,
263 AdapterNotice::DatabaseDoesNotExist { .. } => SqlState::from_code("MZ006"),
264 AdapterNotice::ClusterDoesNotExist { .. } => SqlState::from_code("MZ007"),
265 AdapterNotice::NoResolvableSearchPathSchema { .. } => SqlState::from_code("MZ008"),
266 AdapterNotice::ExistingTransactionInProgress => SqlState::ACTIVE_SQL_TRANSACTION,
267 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
268 SqlState::NO_ACTIVE_SQL_TRANSACTION
269 }
270 AdapterNotice::UserRequested { severity } => match severity {
271 NoticeSeverity::Warning => SqlState::WARNING,
272 _ => SqlState::SUCCESSFUL_COMPLETION,
273 },
274 AdapterNotice::ClusterReplicaStatusChanged { .. } => SqlState::SUCCESSFUL_COMPLETION,
275 AdapterNotice::CascadeDroppedObject { .. } => SqlState::SUCCESSFUL_COMPLETION,
276 AdapterNotice::DroppedActiveDatabase { .. } => SqlState::from_code("MZ002"),
277 AdapterNotice::DroppedActiveCluster { .. } => SqlState::from_code("MZ003"),
278 AdapterNotice::QueryTimestamp { .. } => SqlState::SUCCESSFUL_COMPLETION,
279 AdapterNotice::EqualSubscribeBounds { .. } => SqlState::SUCCESSFUL_COMPLETION,
280 AdapterNotice::QueryTrace { .. } => SqlState::SUCCESSFUL_COMPLETION,
281 AdapterNotice::UnimplementedIsolationLevel { .. } => SqlState::SUCCESSFUL_COMPLETION,
282 AdapterNotice::StrongSessionSerializable => SqlState::SUCCESSFUL_COMPLETION,
283 AdapterNotice::BadStartupSetting { .. } => SqlState::SUCCESSFUL_COMPLETION,
284 AdapterNotice::RbacUserDisabled => SqlState::SUCCESSFUL_COMPLETION,
285 AdapterNotice::RoleMembershipAlreadyExists { .. } => SqlState::SUCCESSFUL_COMPLETION,
286 AdapterNotice::RoleMembershipDoesNotExists { .. } => SqlState::WARNING,
287 AdapterNotice::AutoRunOnCatalogServerCluster => SqlState::SUCCESSFUL_COMPLETION,
288 AdapterNotice::AlterIndexOwner { .. } => SqlState::WARNING,
289 AdapterNotice::CannotRevoke { .. } => SqlState::WARNING_PRIVILEGE_NOT_REVOKED,
290 AdapterNotice::NonApplicablePrivilegeTypes { .. } => SqlState::SUCCESSFUL_COMPLETION,
291 AdapterNotice::PlanNotice(plan) => match plan {
292 PlanNotice::ObjectDoesNotExist { .. } => SqlState::UNDEFINED_OBJECT,
293 PlanNotice::ColumnAlreadyExists { .. } => SqlState::DUPLICATE_COLUMN,
294 PlanNotice::UpsertSinkKeyNotEnforced { .. } => SqlState::WARNING,
295 },
296 AdapterNotice::UnknownSessionDatabase(_) => SqlState::from_code("MZ004"),
297 AdapterNotice::DefaultClusterDoesNotExist { .. } => SqlState::from_code("MZ005"),
298 AdapterNotice::OptimizerNotice { .. } => SqlState::SUCCESSFUL_COMPLETION,
299 AdapterNotice::DroppedInUseIndex { .. } => SqlState::SUCCESSFUL_COMPLETION,
300 AdapterNotice::WebhookSourceCreated { .. } => SqlState::SUCCESSFUL_COMPLETION,
301 AdapterNotice::PerReplicaLogRead { .. } => SqlState::SUCCESSFUL_COMPLETION,
302 AdapterNotice::VarDefaultUpdated { .. } => SqlState::SUCCESSFUL_COMPLETION,
303 AdapterNotice::Welcome(_) => SqlState::SUCCESSFUL_COMPLETION,
304 AdapterNotice::PlanInsights(_) => SqlState::from_code("MZ001"),
305 AdapterNotice::IntrospectionClusterUsage => SqlState::WARNING,
306 AdapterNotice::AutoRouteIntrospectionQueriesUsage => SqlState::WARNING,
307 }
308 }
309}
310
311impl fmt::Display for AdapterNotice {
312 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
313 match self {
314 AdapterNotice::DatabaseAlreadyExists { name } => {
315 write!(f, "database {} already exists, skipping", name.quoted())
316 }
317 AdapterNotice::SchemaAlreadyExists { name } => {
318 write!(f, "schema {} already exists, skipping", name.quoted())
319 }
320 AdapterNotice::TableAlreadyExists { name } => {
321 write!(f, "table {} already exists, skipping", name.quoted())
322 }
323 AdapterNotice::ObjectAlreadyExists { name, ty } => {
324 write!(f, "{} {} already exists, skipping", ty, name.quoted())
325 }
326 AdapterNotice::DatabaseDoesNotExist { name } => {
327 write!(f, "database {} does not exist", name.quoted())
328 }
329 AdapterNotice::CascadeDroppedObject { objects } => {
330 write!(f, "drop cascades to {} other objects", objects.len())
331 }
332 AdapterNotice::ClusterDoesNotExist { name } => {
333 write!(f, "cluster {} does not exist", name.quoted())
334 }
335 AdapterNotice::DefaultClusterDoesNotExist { kind, name, .. } => {
336 write!(f, "{kind} default cluster {} does not exist", name.quoted())
337 }
338 AdapterNotice::NoResolvableSearchPathSchema { search_path } => {
339 write!(
340 f,
341 "no schema on the search path exists: {}",
342 search_path.join(", ")
343 )
344 }
345 AdapterNotice::ExistingTransactionInProgress => {
346 write!(f, "there is already a transaction in progress")
347 }
348 AdapterNotice::ExplicitTransactionControlInImplicitTransaction => {
349 write!(f, "there is no transaction in progress")
350 }
351 AdapterNotice::UserRequested { severity } => {
352 write!(f, "raised a test {}", severity.to_string().to_lowercase())
353 }
354 AdapterNotice::ClusterReplicaStatusChanged {
355 cluster,
356 replica,
357 status,
358 time,
359 } => {
360 let mut time_buf = String::new();
361 strconv::format_timestamptz(&mut time_buf, time);
362 write!(
363 f,
364 "cluster replica {}.{} changed status to {} at {}",
365 cluster,
366 replica,
367 status.as_kebab_case_str().quoted(),
368 time_buf,
369 )?;
370 Ok(())
371 }
372 AdapterNotice::DroppedActiveDatabase { name } => {
373 write!(f, "active database {} has been dropped", name.quoted())
374 }
375 AdapterNotice::DroppedActiveCluster { name } => {
376 write!(f, "active cluster {} has been dropped", name.quoted())
377 }
378 AdapterNotice::QueryTimestamp { .. } => write!(f, "EXPLAIN TIMESTAMP for query"),
379 AdapterNotice::EqualSubscribeBounds { bound } => {
380 write!(
381 f,
382 "subscribe as of {bound} (inclusive) up to the same bound {bound} (exclusive) is guaranteed to be empty"
383 )
384 }
385 AdapterNotice::QueryTrace { trace_id } => {
386 write!(f, "trace id: {}", trace_id)
387 }
388 AdapterNotice::UnimplementedIsolationLevel { isolation_level } => {
389 write!(
390 f,
391 "transaction isolation level {isolation_level} is unimplemented, the session will be upgraded to {}",
392 IsolationLevel::Serializable.as_str()
393 )
394 }
395 AdapterNotice::StrongSessionSerializable => {
396 write!(
397 f,
398 "The Strong Session Serializable isolation level may exhibit consistency violations when reading from catalog objects",
399 )
400 }
401 AdapterNotice::BadStartupSetting { name, reason } => {
402 write!(f, "startup setting {name} not set: {reason}")
403 }
404 AdapterNotice::RbacUserDisabled => {
405 write!(
406 f,
407 "RBAC is disabled so no role attributes or object ownership will be considered \
408 when executing statements"
409 )
410 }
411 AdapterNotice::RoleMembershipAlreadyExists {
412 role_name,
413 member_name,
414 } => write!(
415 f,
416 "role \"{member_name}\" is already a member of role \"{role_name}\""
417 ),
418 AdapterNotice::RoleMembershipDoesNotExists {
419 role_name,
420 member_name,
421 } => write!(
422 f,
423 "role \"{member_name}\" is not a member of role \"{role_name}\""
424 ),
425 AdapterNotice::AutoRunOnCatalogServerCluster => write!(
426 f,
427 "query was automatically run on the \"mz_catalog_server\" cluster"
428 ),
429 AdapterNotice::AlterIndexOwner { name } => {
430 write!(f, "cannot change owner of {}", name.quoted())
431 }
432 AdapterNotice::CannotRevoke { object_description } => {
433 write!(f, "no privileges could be revoked for {object_description}")
434 }
435 AdapterNotice::NonApplicablePrivilegeTypes {
436 non_applicable_privileges,
437 object_description,
438 } => {
439 write!(
440 f,
441 "non-applicable privilege types {} for {}",
442 non_applicable_privileges.to_error_string(),
443 object_description,
444 )
445 }
446 AdapterNotice::PlanNotice(plan) => plan.fmt(f),
447 AdapterNotice::UnknownSessionDatabase(name) => {
448 write!(f, "session database {} does not exist", name.quoted())
449 }
450 AdapterNotice::OptimizerNotice { notice, hint: _ } => notice.fmt(f),
451 AdapterNotice::WebhookSourceCreated { url } => {
452 write!(f, "URL to POST data is '{url}'")
453 }
454 AdapterNotice::DroppedInUseIndex(DroppedInUseIndex {
455 index_name,
456 dependant_objects,
457 }) => {
458 write!(
459 f,
460 "The dropped index {index_name} is being used by the following objects: {}. The index is now dropped from the catalog, but it will continue to be maintained and take up resources until all dependent objects are dropped, altered, or Materialize is restarted!",
461 separated(", ", dependant_objects)
462 )
463 }
464 AdapterNotice::PerReplicaLogRead { log_names } => {
465 write!(
466 f,
467 "Queried introspection relations: {}. Unlike other objects in Materialize, results from querying these objects depend on the current values of the `cluster` and `cluster_replica` session variables.",
468 log_names.join(", ")
469 )
470 }
471 AdapterNotice::VarDefaultUpdated { role, var_name } => {
472 let vars = match var_name {
473 Some(name) => format!("variable {} was", name.quoted()),
474 None => "variables were".to_string(),
475 };
476 let target = match role {
477 Some(role_name) => role_name.quoted().to_string(),
478 None => "the system".to_string(),
479 };
480 write!(
481 f,
482 "{vars} updated for {target}, this will have no effect on the current session"
483 )
484 }
485 AdapterNotice::Welcome(message) => message.fmt(f),
486 AdapterNotice::PlanInsights(message) => message.fmt(f),
487 AdapterNotice::IntrospectionClusterUsage => write!(
488 f,
489 "The mz_introspection cluster has been renamed to mz_catalog_server."
490 ),
491 AdapterNotice::AutoRouteIntrospectionQueriesUsage => write!(
492 f,
493 "The auto_route_introspection_queries variable has been renamed to auto_route_catalog_queries."
494 ),
495 }
496 }
497}
498
499#[derive(Clone, Debug)]
500pub struct DroppedInUseIndex {
501 pub index_name: String,
502 pub dependant_objects: Vec<String>,
503}
504
505impl From<PlanNotice> for AdapterNotice {
506 fn from(notice: PlanNotice) -> AdapterNotice {
507 AdapterNotice::PlanNotice(notice)
508 }
509}