mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::str::FromStr;
20use std::sync::LazyLock;
21use std::time::{Duration, Instant};
22
23use chrono::{DateTime, Utc};
24use mz_auth::password::Password;
25use mz_build_info::BuildInfo;
26use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::MirScalarExpr;
29use mz_ore::now::{EpochMillis, NowFn};
30use mz_ore::str::StrExt;
31use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
32use mz_repr::explain::ExprHumanizer;
33use mz_repr::network_policy_id::NetworkPolicyId;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
37};
38use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
39use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
40use mz_storage_types::connections::{Connection, ConnectionContext};
41use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
42use proptest_derive::Arbitrary;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use uuid::Uuid;
46
47use crate::func::Func;
48use crate::names::{
49    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
50    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
51    SchemaSpecifier, SystemObjectId,
52};
53use crate::plan::statement::StatementDesc;
54use crate::plan::statement::ddl::PlannedRoleAttributes;
55use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
56use crate::session::vars::{OwnedVarInput, SystemVars};
57
58/// A catalog keeps track of SQL objects and session state available to the
59/// planner.
60///
61/// The `sql` crate is agnostic to any particular catalog implementation. This
62/// trait describes the required interface.
63///
64/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
65/// catalog contains databases, databases contain schemas, and schemas contain
66/// catalog items, like sources, sinks, view, and indexes.
67///
68/// There are two classes of operations provided by a catalog:
69///
70///   * Resolution operations, like [`resolve_item`]. These fill in missing name
71///     components based upon connection defaults, e.g., resolving the partial
72///     name `view42` to the fully-specified name `materialize.public.view42`.
73///
74///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
75///     metadata about a catalog entity based on a fully-specified name that is
76///     known to be valid (i.e., because the name was successfully resolved, or
77///     was constructed based on the output of a prior lookup operation). These
78///     functions panic if called with invalid input.
79///
80///   * Session management, such as managing variables' states and adding
81///     notices to the session.
82///
83/// [`get_databases`]: SessionCatalog::get_databases
84/// [`get_item`]: SessionCatalog::get_item
85/// [`resolve_item`]: SessionCatalog::resolve_item
86pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
87    /// Returns the id of the role that is issuing the query.
88    fn active_role_id(&self) -> &RoleId;
89
90    /// Returns the database to use if one is not explicitly specified.
91    fn active_database_name(&self) -> Option<&str> {
92        self.active_database()
93            .map(|id| self.get_database(id))
94            .map(|db| db.name())
95    }
96
97    /// Returns the database to use if one is not explicitly specified.
98    fn active_database(&self) -> Option<&DatabaseId>;
99
100    /// Returns the cluster to use if one is not explicitly specified.
101    fn active_cluster(&self) -> &str;
102
103    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
104    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
105
106    /// Returns the descriptor of the named prepared statement on the session, or
107    /// None if the prepared statement does not exist.
108    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
109
110    /// Resolves the named database.
111    ///
112    /// If `database_name` exists in the catalog, it returns a reference to the
113    /// resolved database; otherwise it returns an error.
114    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
115
116    /// Gets a database by its ID.
117    ///
118    /// Panics if `id` does not specify a valid database.
119    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
120
121    /// Gets all databases.
122    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
123
124    /// Resolves a partially-specified schema name.
125    ///
126    /// If the schema exists in the catalog, it returns a reference to the
127    /// resolved schema; otherwise it returns an error.
128    fn resolve_schema(
129        &self,
130        database_name: Option<&str>,
131        schema_name: &str,
132    ) -> Result<&dyn CatalogSchema, CatalogError>;
133
134    /// Resolves a schema name within a specified database.
135    ///
136    /// If the schema exists in the database, it returns a reference to the
137    /// resolved schema; otherwise it returns an error.
138    fn resolve_schema_in_database(
139        &self,
140        database_spec: &ResolvedDatabaseSpecifier,
141        schema_name: &str,
142    ) -> Result<&dyn CatalogSchema, CatalogError>;
143
144    /// Gets a schema by its ID.
145    ///
146    /// Panics if `id` does not specify a valid schema.
147    fn get_schema(
148        &self,
149        database_spec: &ResolvedDatabaseSpecifier,
150        schema_spec: &SchemaSpecifier,
151    ) -> &dyn CatalogSchema;
152
153    /// Gets all schemas.
154    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
155
156    /// Gets the mz_internal schema id.
157    fn get_mz_internal_schema_id(&self) -> SchemaId;
158
159    /// Gets the mz_unsafe schema id.
160    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
161
162    /// Returns true if `schema` is an internal system schema, false otherwise
163    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
164
165    /// Resolves the named role.
166    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
167
168    /// Resolves the named network policy.
169    fn resolve_network_policy(
170        &self,
171        network_policy_name: &str,
172    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
173
174    /// Gets a role by its ID.
175    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
176
177    /// Gets a role by its ID.
178    ///
179    /// Panics if `id` does not specify a valid role.
180    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
181
182    /// Gets all roles.
183    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
184
185    /// Gets the id of the `mz_system` role.
186    fn mz_system_role_id(&self) -> RoleId;
187
188    /// Collects all role IDs that `id` is transitively a member of.
189    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
190
191    /// Resolves the named cluster.
192    /// Gets a network_policy by its ID.
193    ///
194    /// Panics if `id` does not specify a valid role.
195    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
196
197    /// Gets all roles.
198    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
199
200    ///
201    /// If the provided name is `None`, resolves the currently active cluster.
202    fn resolve_cluster<'a, 'b>(
203        &'a self,
204        cluster_name: Option<&'b str>,
205    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
206
207    /// Resolves the named cluster replica.
208    fn resolve_cluster_replica<'a, 'b>(
209        &'a self,
210        cluster_replica_name: &'b QualifiedReplica,
211    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
212
213    /// Resolves a partially-specified item name, that is NOT a function or
214    /// type. (For resolving functions or types, please use
215    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
216    ///
217    /// If the partial name has a database component, it searches only the
218    /// specified database; otherwise, it searches the active database. If the
219    /// partial name has a schema component, it searches only the specified
220    /// schema; otherwise, it searches a default set of schemas within the
221    /// selected database. It returns an error if none of the searched schemas
222    /// contain an item whose name matches the item component of the partial
223    /// name.
224    ///
225    /// Note that it is not an error if the named item appears in more than one
226    /// of the search schemas. The catalog implementation must choose one.
227    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
228
229    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
230    /// functions within the catalog.
231    fn resolve_function(
232        &self,
233        item_name: &PartialItemName,
234    ) -> Result<&dyn CatalogItem, CatalogError>;
235
236    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
237    /// types within the catalog.
238    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
239
240    /// Resolves `name` to a type or item, preferring the type if both exist.
241    fn resolve_item_or_type(
242        &self,
243        name: &PartialItemName,
244    ) -> Result<&dyn CatalogItem, CatalogError> {
245        if let Ok(ty) = self.resolve_type(name) {
246            return Ok(ty);
247        }
248        self.resolve_item(name)
249    }
250
251    /// Gets a type named `name` from exactly one of the system schemas.
252    ///
253    /// # Panics
254    /// - If `name` is not an entry in any system schema
255    /// - If more than one system schema has an entry named `name`.
256    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
257
258    /// Gets an item by its ID.
259    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
260
261    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
262    /// exist.
263    ///
264    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
265    fn try_get_item_by_global_id<'a>(
266        &'a self,
267        id: &GlobalId,
268    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
269
270    /// Gets an item by its ID.
271    ///
272    /// Panics if `id` does not specify a valid item.
273    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
274
275    /// Gets an item by a [`GlobalId`].
276    ///
277    /// Panics if `id` does not specify a valid item.
278    ///
279    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
280    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
281
282    /// Gets all items.
283    fn get_items(&self) -> Vec<&dyn CatalogItem>;
284
285    /// Looks up an item by its name.
286    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
287
288    /// Looks up a type by its name.
289    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
290
291    /// Gets a cluster by ID.
292    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster;
293
294    /// Gets all clusters.
295    fn get_clusters(&self) -> Vec<&dyn CatalogCluster>;
296
297    /// Gets a cluster replica by ID.
298    fn get_cluster_replica(
299        &self,
300        cluster_id: ClusterId,
301        replica_id: ReplicaId,
302    ) -> &dyn CatalogClusterReplica;
303
304    /// Gets all cluster replicas.
305    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica>;
306
307    /// Gets all system privileges.
308    fn get_system_privileges(&self) -> &PrivilegeMap;
309
310    /// Gets all default privileges.
311    fn get_default_privileges(
312        &self,
313    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
314
315    /// Finds a name like `name` that is not already in use.
316    ///
317    /// If `name` itself is available, it is returned unchanged.
318    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
319
320    /// Returns a fully qualified human readable name from fully qualified non-human readable name
321    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
322
323    /// Returns a fully qualified human readable schema name from fully qualified non-human
324    /// readable schema name
325    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
326
327    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
328    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
329
330    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
331    fn resolve_global_id(
332        &self,
333        item_id: &CatalogItemId,
334        version: RelationVersionSelector,
335    ) -> GlobalId;
336
337    /// Returns the configuration of the catalog.
338    fn config(&self) -> &CatalogConfig;
339
340    /// Returns the number of milliseconds since the system epoch. For normal use
341    /// this means the Unix epoch. This can safely be mocked in tests and start
342    /// at 0.
343    fn now(&self) -> EpochMillis;
344
345    /// Returns the set of supported AWS PrivateLink availability zone ids.
346    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
347
348    /// Returns system vars
349    fn system_vars(&self) -> &SystemVars;
350
351    /// Returns mutable system vars
352    ///
353    /// Clients should use this this method carefully, as changes to the backing
354    /// state here are not guarateed to be persisted. The motivating use case
355    /// for this method was ensuring that features are temporary turned on so
356    /// catalog rehydration does not break due to unsupported SQL syntax.
357    fn system_vars_mut(&mut self) -> &mut SystemVars;
358
359    /// Returns the [`RoleId`] of the owner of an object by its ID.
360    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
361
362    /// Returns the [`PrivilegeMap`] of the object.
363    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
364
365    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
366    ///
367    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
368    /// earlier in the list than the roots. This is particularly userful for the order to drop
369    /// objects.
370    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
371
372    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
373    ///
374    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
375    /// earlier in the list than `id`. This is particularly userful for the order to drop
376    /// objects.
377    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
378
379    /// Returns all possible privileges associated with an object type.
380    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
381
382    /// Returns the object type of `object_id`.
383    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
384
385    /// Returns the system object type of `id`.
386    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
387
388    /// Returns the minimal qualification required to unambiguously specify
389    /// `qualified_name`.
390    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
391
392    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
393    /// successfully executes.
394    fn add_notice(&self, notice: PlanNotice);
395
396    /// Returns the associated comments for the given `id`
397    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
398
399    /// Reports whether the specified cluster size is a modern "cc" size rather
400    /// than a legacy T-shirt size.
401    fn is_cluster_size_cc(&self, size: &str) -> bool;
402}
403
404/// Configuration associated with a catalog.
405#[derive(Debug, Clone)]
406pub struct CatalogConfig {
407    /// Returns the time at which the catalog booted.
408    pub start_time: DateTime<Utc>,
409    /// Returns the instant at which the catalog booted.
410    pub start_instant: Instant,
411    /// A random integer associated with this instance of the catalog.
412    ///
413    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
414    /// topics. Perhaps we can remove this when database-issues#977 is complete.
415    pub nonce: u64,
416    /// A persistent ID associated with the environment.
417    pub environment_id: EnvironmentId,
418    /// A transient UUID associated with this process.
419    pub session_id: Uuid,
420    /// Information about this build of Materialize.
421    pub build_info: &'static BuildInfo,
422    /// Default timestamp interval.
423    pub timestamp_interval: Duration,
424    /// Function that returns a wall clock now time; can safely be mocked to return
425    /// 0.
426    pub now: NowFn,
427    /// Context for source and sink connections.
428    pub connection_context: ConnectionContext,
429    /// Which system builtins to include. Not allowed to change dynamically.
430    pub builtins_cfg: BuiltinsConfig,
431    /// Helm chart version
432    pub helm_chart_version: Option<String>,
433}
434
435/// A database in a [`SessionCatalog`].
436pub trait CatalogDatabase {
437    /// Returns a fully-specified name of the database.
438    fn name(&self) -> &str;
439
440    /// Returns a stable ID for the database.
441    fn id(&self) -> DatabaseId;
442
443    /// Returns whether the database contains schemas.
444    fn has_schemas(&self) -> bool;
445
446    /// Returns the schemas of the database as a map from schema name to
447    /// schema ID.
448    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
449
450    /// Returns the schemas of the database.
451    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
452
453    /// Returns the ID of the owning role.
454    fn owner_id(&self) -> RoleId;
455
456    /// Returns the privileges associated with the database.
457    fn privileges(&self) -> &PrivilegeMap;
458}
459
460/// A schema in a [`SessionCatalog`].
461pub trait CatalogSchema {
462    /// Returns a fully-specified id of the database
463    fn database(&self) -> &ResolvedDatabaseSpecifier;
464
465    /// Returns a fully-specified name of the schema.
466    fn name(&self) -> &QualifiedSchemaName;
467
468    /// Returns a stable ID for the schema.
469    fn id(&self) -> &SchemaSpecifier;
470
471    /// Lists the `CatalogItem`s for the schema.
472    fn has_items(&self) -> bool;
473
474    /// Returns the IDs of the items in the schema.
475    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
476
477    /// Returns the ID of the owning role.
478    fn owner_id(&self) -> RoleId;
479
480    /// Returns the privileges associated with the schema.
481    fn privileges(&self) -> &PrivilegeMap;
482}
483
484/// Attributes belonging to a [`CatalogRole`].
485#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
486pub struct RoleAttributes {
487    /// Indicates whether the role has inheritance of privileges.
488    pub inherit: bool,
489    /// The raw password of the role. This is for self managed auth, not cloud.
490    pub password: Option<Password>,
491    /// Whether or not this user is a superuser.
492    pub superuser: Option<bool>,
493    /// Whether this role is login
494    pub login: Option<bool>,
495    // Force use of constructor.
496    _private: (),
497}
498
499impl RoleAttributes {
500    /// Creates a new [`RoleAttributes`] with default attributes.
501    pub const fn new() -> RoleAttributes {
502        RoleAttributes {
503            inherit: true,
504            password: None,
505            superuser: None,
506            login: None,
507            _private: (),
508        }
509    }
510
511    /// Adds all attributes except password.
512    pub const fn with_all(mut self) -> RoleAttributes {
513        self.inherit = true;
514        self
515    }
516
517    /// Returns whether or not the role has inheritence of privileges.
518    pub const fn is_inherit(&self) -> bool {
519        self.inherit
520    }
521
522    /// Returns whether or not the role has a password.
523    pub const fn has_password(&self) -> bool {
524        self.password.is_some()
525    }
526
527    /// Returns self without the password.
528    pub fn without_password(self) -> RoleAttributes {
529        RoleAttributes {
530            inherit: self.inherit,
531            password: None,
532            superuser: self.superuser,
533            login: self.login,
534            _private: (),
535        }
536    }
537}
538
539impl From<PlannedRoleAttributes> for RoleAttributes {
540    fn from(
541        PlannedRoleAttributes {
542            inherit,
543            password,
544            superuser,
545            login,
546            ..
547        }: PlannedRoleAttributes,
548    ) -> RoleAttributes {
549        let default_attributes = RoleAttributes::new();
550        RoleAttributes {
551            inherit: inherit.unwrap_or(default_attributes.inherit),
552            password,
553            superuser,
554            login,
555            _private: (),
556        }
557    }
558}
559
560/// Default variable values for a [`CatalogRole`].
561#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
562pub struct RoleVars {
563    /// Map of variable names to their value.
564    pub map: BTreeMap<String, OwnedVarInput>,
565}
566
567/// A role in a [`SessionCatalog`].
568pub trait CatalogRole {
569    /// Returns a fully-specified name of the role.
570    fn name(&self) -> &str;
571
572    /// Returns a stable ID for the role.
573    fn id(&self) -> RoleId;
574
575    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
576    /// membership.
577    ///
578    /// Key is the role that some role is a member of, value is the grantor role ID.
579    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
580
581    /// Returns the attributes associated with this role.
582    fn attributes(&self) -> &RoleAttributes;
583
584    /// Returns all variables that this role has a default value stored for.
585    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
586}
587
588/// A network policy in a [`SessionCatalog`].
589pub trait CatalogNetworkPolicy {
590    /// Returns a fully-specified name of the NetworkPolicy.
591    fn name(&self) -> &str;
592
593    /// Returns a stable ID for the NetworkPolicy.
594    fn id(&self) -> NetworkPolicyId;
595
596    /// Returns the ID of the owning NetworkPolicy.
597    fn owner_id(&self) -> RoleId;
598
599    /// Returns the privileges associated with the NetworkPolicy.
600    fn privileges(&self) -> &PrivilegeMap;
601}
602
603/// A cluster in a [`SessionCatalog`].
604pub trait CatalogCluster<'a> {
605    /// Returns a fully-specified name of the cluster.
606    fn name(&self) -> &str;
607
608    /// Returns a stable ID for the cluster.
609    fn id(&self) -> ClusterId;
610
611    /// Returns the objects that are bound to this cluster.
612    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
613
614    /// Returns the replicas of the cluster as a map from replica name to
615    /// replica ID.
616    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
617
618    /// Returns the replicas of the cluster.
619    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica>;
620
621    /// Returns the replica belonging to the cluster with replica ID `id`.
622    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica;
623
624    /// Returns the ID of the owning role.
625    fn owner_id(&self) -> RoleId;
626
627    /// Returns the privileges associated with the cluster.
628    fn privileges(&self) -> &PrivilegeMap;
629
630    /// Returns true if this cluster is a managed cluster.
631    fn is_managed(&self) -> bool;
632
633    /// Returns the size of the cluster, if the cluster is a managed cluster.
634    fn managed_size(&self) -> Option<&str>;
635
636    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
637    fn schedule(&self) -> Option<&ClusterSchedule>;
638
639    /// Try to convert this cluster into a [`CreateClusterPlan`].
640    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
641    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
642}
643
644/// A cluster replica in a [`SessionCatalog`]
645pub trait CatalogClusterReplica<'a>: Debug {
646    /// Returns the name of the cluster replica.
647    fn name(&self) -> &str;
648
649    /// Returns a stable ID for the cluster that the replica belongs to.
650    fn cluster_id(&self) -> ClusterId;
651
652    /// Returns a stable ID for the replica.
653    fn replica_id(&self) -> ReplicaId;
654
655    /// Returns the ID of the owning role.
656    fn owner_id(&self) -> RoleId;
657
658    /// Returns whether or not the replica is internal
659    fn internal(&self) -> bool;
660}
661
662/// An item in a [`SessionCatalog`].
663///
664/// Note that "item" has a very specific meaning in the context of a SQL
665/// catalog, and refers to the various entities that belong to a schema.
666pub trait CatalogItem {
667    /// Returns the fully qualified name of the catalog item.
668    fn name(&self) -> &QualifiedItemName;
669
670    /// Returns the [`CatalogItemId`] for the item.
671    fn id(&self) -> CatalogItemId;
672
673    /// Returns the [`GlobalId`]s associated with this item.
674    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
675
676    /// Returns the catalog item's OID.
677    fn oid(&self) -> u32;
678
679    /// Returns the resolved function.
680    ///
681    /// If the catalog item is not of a type that produces functions (i.e.,
682    /// anything other than a function), it returns an error.
683    fn func(&self) -> Result<&'static Func, CatalogError>;
684
685    /// Returns the resolved source connection.
686    ///
687    /// If the catalog item is not of a type that contains a `SourceDesc`
688    /// (i.e., anything other than sources), it returns an error.
689    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
690
691    /// Returns the resolved connection.
692    ///
693    /// If the catalog item is not a connection, it returns an error.
694    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
695
696    /// Returns the type of the catalog item.
697    fn item_type(&self) -> CatalogItemType;
698
699    /// A normalized SQL statement that describes how to create the catalog
700    /// item.
701    fn create_sql(&self) -> &str;
702
703    /// Returns the IDs of the catalog items upon which this catalog item
704    /// directly references.
705    fn references(&self) -> &ResolvedIds;
706
707    /// Returns the IDs of the catalog items upon which this catalog item
708    /// depends.
709    fn uses(&self) -> BTreeSet<CatalogItemId>;
710
711    /// Returns the IDs of the catalog items that directly reference this catalog item.
712    fn referenced_by(&self) -> &[CatalogItemId];
713
714    /// Returns the IDs of the catalog items that depend upon this catalog item.
715    fn used_by(&self) -> &[CatalogItemId];
716
717    /// Reports whether this catalog entry is a subsource and, if it is, the
718    /// ingestion it is an export of, as well as the item it exports.
719    fn subsource_details(
720        &self,
721    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
722
723    /// Reports whether this catalog entry is a source export and, if it is, the
724    /// ingestion it is an export of, as well as the item it exports.
725    fn source_export_details(
726        &self,
727    ) -> Option<(
728        CatalogItemId,
729        &UnresolvedItemName,
730        &SourceExportDetails,
731        &SourceExportDataConfig<ReferencedConnection>,
732    )>;
733
734    /// Reports whether this catalog item is a progress source.
735    fn is_progress_source(&self) -> bool;
736
737    /// If this catalog item is a source, it return the IDs of its progress collection.
738    fn progress_id(&self) -> Option<CatalogItemId>;
739
740    /// Returns the index details associated with the catalog item, if the
741    /// catalog item is an index.
742    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
743
744    /// Returns the column defaults associated with the catalog item, if the
745    /// catalog item is a table that accepts writes.
746    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
747
748    /// Returns the type information associated with the catalog item, if the
749    /// catalog item is a type.
750    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
751
752    /// Returns the ID of the owning role.
753    fn owner_id(&self) -> RoleId;
754
755    /// Returns the privileges associated with the item.
756    fn privileges(&self) -> &PrivilegeMap;
757
758    /// Returns the cluster the item belongs to.
759    fn cluster_id(&self) -> Option<ClusterId>;
760
761    /// Returns the [`CatalogCollectionItem`] for a specific version of this
762    /// [`CatalogItem`].
763    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
764
765    /// The latest version of this item, if it's version-able.
766    fn latest_version(&self) -> Option<RelationVersion>;
767}
768
769/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
770/// refers to.
771pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
772    /// Returns a description of the result set produced by the catalog item.
773    ///
774    /// If the catalog item is not of a type that produces data (i.e., a sink or
775    /// an index), it returns an error.
776    fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, CatalogError>;
777
778    /// The [`GlobalId`] for this item.
779    fn global_id(&self) -> GlobalId;
780}
781
782/// The type of a [`CatalogItem`].
783#[derive(Debug, Deserialize, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
784pub enum CatalogItemType {
785    /// A table.
786    Table,
787    /// A source.
788    Source,
789    /// A sink.
790    Sink,
791    /// A view.
792    View,
793    /// A materialized view.
794    MaterializedView,
795    /// An index.
796    Index,
797    /// A type.
798    Type,
799    /// A func.
800    Func,
801    /// A secret.
802    Secret,
803    /// A connection.
804    Connection,
805    /// A continual task.
806    ContinualTask,
807}
808
809impl CatalogItemType {
810    /// Reports whether the given type of item conflicts with items of type
811    /// `CatalogItemType::Type`.
812    ///
813    /// In PostgreSQL, even though types live in a separate namespace from other
814    /// schema objects, creating a table, view, or materialized view creates a
815    /// type named after that relation. This prevents creating a type with the
816    /// same name as a relational object, even though types and relational
817    /// objects live in separate namespaces. (Indexes are even weirder; while
818    /// they don't get a type with the same name, they get an entry in
819    /// `pg_class` that prevents *record* types of the same name as the index,
820    /// but not other types of types, like enums.)
821    ///
822    /// We don't presently construct types that mirror relational objects,
823    /// though we likely will need to in the future for full PostgreSQL
824    /// compatibility (see database-issues#7142). For now, we use this method to
825    /// prevent creating types and relational objects that have the same name, so
826    /// that it is a backwards compatible change in the future to introduce a
827    /// type named after each relational object in the system.
828    pub fn conflicts_with_type(&self) -> bool {
829        match self {
830            CatalogItemType::Table => true,
831            CatalogItemType::Source => true,
832            CatalogItemType::View => true,
833            CatalogItemType::MaterializedView => true,
834            CatalogItemType::Index => true,
835            CatalogItemType::Type => true,
836            CatalogItemType::Sink => false,
837            CatalogItemType::Func => false,
838            CatalogItemType::Secret => false,
839            CatalogItemType::Connection => false,
840            CatalogItemType::ContinualTask => true,
841        }
842    }
843}
844
845impl fmt::Display for CatalogItemType {
846    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
847        match self {
848            CatalogItemType::Table => f.write_str("table"),
849            CatalogItemType::Source => f.write_str("source"),
850            CatalogItemType::Sink => f.write_str("sink"),
851            CatalogItemType::View => f.write_str("view"),
852            CatalogItemType::MaterializedView => f.write_str("materialized view"),
853            CatalogItemType::Index => f.write_str("index"),
854            CatalogItemType::Type => f.write_str("type"),
855            CatalogItemType::Func => f.write_str("func"),
856            CatalogItemType::Secret => f.write_str("secret"),
857            CatalogItemType::Connection => f.write_str("connection"),
858            CatalogItemType::ContinualTask => f.write_str("continual task"),
859        }
860    }
861}
862
863impl From<CatalogItemType> for ObjectType {
864    fn from(value: CatalogItemType) -> Self {
865        match value {
866            CatalogItemType::Table => ObjectType::Table,
867            CatalogItemType::Source => ObjectType::Source,
868            CatalogItemType::Sink => ObjectType::Sink,
869            CatalogItemType::View => ObjectType::View,
870            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
871            CatalogItemType::Index => ObjectType::Index,
872            CatalogItemType::Type => ObjectType::Type,
873            CatalogItemType::Func => ObjectType::Func,
874            CatalogItemType::Secret => ObjectType::Secret,
875            CatalogItemType::Connection => ObjectType::Connection,
876            CatalogItemType::ContinualTask => ObjectType::ContinualTask,
877        }
878    }
879}
880
881impl From<CatalogItemType> for mz_audit_log::ObjectType {
882    fn from(value: CatalogItemType) -> Self {
883        match value {
884            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
885            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
886            CatalogItemType::View => mz_audit_log::ObjectType::View,
887            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
888            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
889            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
890            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
891            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
892            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
893            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
894            CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
895        }
896    }
897}
898
899/// Details about a type in the catalog.
900#[derive(Clone, Debug, Eq, PartialEq)]
901pub struct CatalogTypeDetails<T: TypeReference> {
902    /// The ID of the type with this type as the array element, if available.
903    pub array_id: Option<CatalogItemId>,
904    /// The description of this type.
905    pub typ: CatalogType<T>,
906    /// Additional metadata about the type in PostgreSQL, if relevant.
907    pub pg_metadata: Option<CatalogTypePgMetadata>,
908}
909
910/// Additional PostgreSQL metadata about a type.
911#[derive(Clone, Debug, Eq, PartialEq)]
912pub struct CatalogTypePgMetadata {
913    /// The OID of the `typinput` function in PostgreSQL.
914    pub typinput_oid: u32,
915    /// The OID of the `typreceive` function in PostgreSQL.
916    pub typreceive_oid: u32,
917}
918
919/// Represents a reference to type in the catalog
920pub trait TypeReference {
921    /// The actual type used to reference a `CatalogType`
922    type Reference: Clone + Debug + Eq + PartialEq;
923}
924
925/// Reference to a type by it's name
926#[derive(Clone, Debug, Eq, PartialEq)]
927pub struct NameReference;
928
929impl TypeReference for NameReference {
930    type Reference = &'static str;
931}
932
933/// Reference to a type by it's global ID
934#[derive(Clone, Debug, Eq, PartialEq)]
935pub struct IdReference;
936
937impl TypeReference for IdReference {
938    type Reference = CatalogItemId;
939}
940
941/// A type stored in the catalog.
942///
943/// The variants correspond one-to-one with [`mz_repr::ScalarType`], but with type
944/// modifiers removed and with embedded types replaced with references to other
945/// types in the catalog.
946#[allow(missing_docs)]
947#[derive(Clone, Debug, Eq, PartialEq)]
948pub enum CatalogType<T: TypeReference> {
949    AclItem,
950    Array {
951        element_reference: T::Reference,
952    },
953    Bool,
954    Bytes,
955    Char,
956    Date,
957    Float32,
958    Float64,
959    Int16,
960    Int32,
961    Int64,
962    UInt16,
963    UInt32,
964    UInt64,
965    MzTimestamp,
966    Interval,
967    Jsonb,
968    List {
969        element_reference: T::Reference,
970        element_modifiers: Vec<i64>,
971    },
972    Map {
973        key_reference: T::Reference,
974        key_modifiers: Vec<i64>,
975        value_reference: T::Reference,
976        value_modifiers: Vec<i64>,
977    },
978    Numeric,
979    Oid,
980    PgLegacyChar,
981    PgLegacyName,
982    Pseudo,
983    Range {
984        element_reference: T::Reference,
985    },
986    Record {
987        fields: Vec<CatalogRecordField<T>>,
988    },
989    RegClass,
990    RegProc,
991    RegType,
992    String,
993    Time,
994    Timestamp,
995    TimestampTz,
996    Uuid,
997    VarChar,
998    Int2Vector,
999    MzAclItem,
1000}
1001
1002impl CatalogType<IdReference> {
1003    /// Returns the relation description for the type, if the type is a record
1004    /// type.
1005    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1006        match &self {
1007            CatalogType::Record { fields } => {
1008                let mut desc = RelationDesc::builder();
1009                for f in fields {
1010                    let name = f.name.clone();
1011                    let ty = query::scalar_type_from_catalog(
1012                        catalog,
1013                        f.type_reference,
1014                        &f.type_modifiers,
1015                    )?;
1016                    // TODO: support plumbing `NOT NULL` constraints through
1017                    // `CREATE TYPE`.
1018                    let ty = ty.nullable(true);
1019                    desc = desc.with_column(name, ty);
1020                }
1021                Ok(Some(desc.finish()))
1022            }
1023            _ => Ok(None),
1024        }
1025    }
1026}
1027
1028/// A description of a field in a [`CatalogType::Record`].
1029#[derive(Clone, Debug, Eq, PartialEq)]
1030pub struct CatalogRecordField<T: TypeReference> {
1031    /// The name of the field.
1032    pub name: ColumnName,
1033    /// The ID of the type of the field.
1034    pub type_reference: T::Reference,
1035    /// Modifiers to apply to the type.
1036    pub type_modifiers: Vec<i64>,
1037}
1038
1039#[derive(Clone, Debug, Eq, PartialEq)]
1040/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1041///
1042/// Note that Materialize also uses a number of pseudotypes when planning, but
1043/// we have yet to need to integrate them with `TypeCategory`.
1044///
1045/// [typcategory]:
1046/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1047pub enum TypeCategory {
1048    /// Array type.
1049    Array,
1050    /// Bit string type.
1051    BitString,
1052    /// Boolean type.
1053    Boolean,
1054    /// Composite type.
1055    Composite,
1056    /// Date/time type.
1057    DateTime,
1058    /// Enum type.
1059    Enum,
1060    /// Geometric type.
1061    Geometric,
1062    /// List type. Materialize specific.
1063    List,
1064    /// Network address type.
1065    NetworkAddress,
1066    /// Numeric type.
1067    Numeric,
1068    /// Pseudo type.
1069    Pseudo,
1070    /// Range type.
1071    Range,
1072    /// String type.
1073    String,
1074    /// Timestamp type.
1075    Timespan,
1076    /// User-defined type.
1077    UserDefined,
1078    /// Unknown type.
1079    Unknown,
1080}
1081
1082impl fmt::Display for TypeCategory {
1083    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1084        f.write_str(match self {
1085            TypeCategory::Array => "array",
1086            TypeCategory::BitString => "bit-string",
1087            TypeCategory::Boolean => "boolean",
1088            TypeCategory::Composite => "composite",
1089            TypeCategory::DateTime => "date-time",
1090            TypeCategory::Enum => "enum",
1091            TypeCategory::Geometric => "geometric",
1092            TypeCategory::List => "list",
1093            TypeCategory::NetworkAddress => "network-address",
1094            TypeCategory::Numeric => "numeric",
1095            TypeCategory::Pseudo => "pseudo",
1096            TypeCategory::Range => "range",
1097            TypeCategory::String => "string",
1098            TypeCategory::Timespan => "timespan",
1099            TypeCategory::UserDefined => "user-defined",
1100            TypeCategory::Unknown => "unknown",
1101        })
1102    }
1103}
1104
1105/// Identifies an environment.
1106///
1107/// Outside of tests, an environment ID can be constructed only from a string of
1108/// the following form:
1109///
1110/// ```text
1111/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1112/// ```
1113///
1114/// The fields have the following formats:
1115///
1116/// * The cloud provider consists of one or more alphanumeric characters.
1117/// * The cloud provider region consists of one or more alphanumeric or hyphen
1118///   characters.
1119/// * The organization ID is a UUID in its canonical text format.
1120/// * The ordinal is a decimal number with between one and eight digits.
1121///
1122/// There is no way to construct an environment ID from parts, to ensure that
1123/// the `Display` representation is parseable according to the above rules.
1124// NOTE(benesch): ideally we'd have accepted the components of the environment
1125// ID using separate command-line arguments, or at least a string format that
1126// used a field separator that did not appear in the fields. Alas. We can't
1127// easily change it now, as it's used as the e.g. default sink progress topic.
1128#[derive(Debug, Clone, PartialEq)]
1129pub struct EnvironmentId {
1130    cloud_provider: CloudProvider,
1131    cloud_provider_region: String,
1132    organization_id: Uuid,
1133    ordinal: u64,
1134}
1135
1136impl EnvironmentId {
1137    /// Creates a dummy `EnvironmentId` for use in tests.
1138    pub fn for_tests() -> EnvironmentId {
1139        EnvironmentId {
1140            cloud_provider: CloudProvider::Local,
1141            cloud_provider_region: "az1".into(),
1142            organization_id: Uuid::new_v4(),
1143            ordinal: 0,
1144        }
1145    }
1146
1147    /// Returns the cloud provider associated with this environment ID.
1148    pub fn cloud_provider(&self) -> &CloudProvider {
1149        &self.cloud_provider
1150    }
1151
1152    /// Returns the cloud provider region associated with this environment ID.
1153    pub fn cloud_provider_region(&self) -> &str {
1154        &self.cloud_provider_region
1155    }
1156
1157    /// Returns the name of the region associted with this environment ID.
1158    ///
1159    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1160    /// [`EnvironmentId::cloud_provider_region`].
1161    pub fn region(&self) -> String {
1162        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1163    }
1164
1165    /// Returns the organization ID associated with this environment ID.
1166    pub fn organization_id(&self) -> Uuid {
1167        self.organization_id
1168    }
1169
1170    /// Returns the ordinal associated with this environment ID.
1171    pub fn ordinal(&self) -> u64 {
1172        self.ordinal
1173    }
1174}
1175
1176// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1177// populated using this key. Consequently, any changes to that trait
1178// implementation will also have to be reflected in the existing feature
1179// targeting config in LaunchDarkly, otherwise environments might receive
1180// different configs upon restart.
1181impl fmt::Display for EnvironmentId {
1182    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1183        write!(
1184            f,
1185            "{}-{}-{}-{}",
1186            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1187        )
1188    }
1189}
1190
1191impl FromStr for EnvironmentId {
1192    type Err = InvalidEnvironmentIdError;
1193
1194    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1195        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1196            Regex::new(
1197                "^(?P<cloud_provider>[[:alnum:]]+)-\
1198                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1199                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1200                  (?P<ordinal>\\d{1,8})$"
1201            ).unwrap()
1202        });
1203        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1204        Ok(EnvironmentId {
1205            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1206            cloud_provider_region: captures["cloud_provider_region"].into(),
1207            organization_id: captures["organization_id"]
1208                .parse()
1209                .map_err(|_| InvalidEnvironmentIdError)?,
1210            ordinal: captures["ordinal"]
1211                .parse()
1212                .map_err(|_| InvalidEnvironmentIdError)?,
1213        })
1214    }
1215}
1216
1217/// The error type for [`EnvironmentId::from_str`].
1218#[derive(Debug, Clone, PartialEq)]
1219pub struct InvalidEnvironmentIdError;
1220
1221impl fmt::Display for InvalidEnvironmentIdError {
1222    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1223        f.write_str("invalid environment ID")
1224    }
1225}
1226
1227impl Error for InvalidEnvironmentIdError {}
1228
1229impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1230    fn from(_: InvalidCloudProviderError) -> Self {
1231        InvalidEnvironmentIdError
1232    }
1233}
1234
1235/// An error returned by the catalog.
1236#[derive(Clone, Debug, Eq, PartialEq)]
1237pub enum CatalogError {
1238    /// Unknown database.
1239    UnknownDatabase(String),
1240    /// Database already exists.
1241    DatabaseAlreadyExists(String),
1242    /// Unknown schema.
1243    UnknownSchema(String),
1244    /// Schema already exists.
1245    SchemaAlreadyExists(String),
1246    /// Unknown role.
1247    UnknownRole(String),
1248    /// Role already exists.
1249    RoleAlreadyExists(String),
1250    /// Network Policy already exists.
1251    NetworkPolicyAlreadyExists(String),
1252    /// Unknown cluster.
1253    UnknownCluster(String),
1254    /// Unexpected builtin cluster.
1255    UnexpectedBuiltinCluster(String),
1256    /// Unexpected builtin cluster.
1257    UnexpectedBuiltinClusterType(String),
1258    /// Cluster already exists.
1259    ClusterAlreadyExists(String),
1260    /// Unknown cluster replica.
1261    UnknownClusterReplica(String),
1262    /// Unknown cluster replica size.
1263    UnknownClusterReplicaSize(String),
1264    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1265    DuplicateReplica(String, String),
1266    /// Unknown item.
1267    UnknownItem(String),
1268    /// Item already exists.
1269    ItemAlreadyExists(CatalogItemId, String),
1270    /// Unknown function.
1271    UnknownFunction {
1272        /// The identifier of the function we couldn't find
1273        name: String,
1274        /// A suggested alternative to the named function.
1275        alternative: Option<String>,
1276    },
1277    /// Unknown type.
1278    UnknownType {
1279        /// The identifier of the type we couldn't find.
1280        name: String,
1281    },
1282    /// Unknown connection.
1283    UnknownConnection(String),
1284    /// Unknown network policy.
1285    UnknownNetworkPolicy(String),
1286    /// Expected the catalog item to have the given type, but it did not.
1287    UnexpectedType {
1288        /// The item's name.
1289        name: String,
1290        /// The actual type of the item.
1291        actual_type: CatalogItemType,
1292        /// The expected type of the item.
1293        expected_type: CatalogItemType,
1294    },
1295    /// Invalid attempt to depend on a non-dependable item.
1296    InvalidDependency {
1297        /// The invalid item's name.
1298        name: String,
1299        /// The invalid item's type.
1300        typ: CatalogItemType,
1301    },
1302    /// Ran out of unique IDs.
1303    IdExhaustion,
1304    /// Ran out of unique OIDs.
1305    OidExhaustion,
1306    /// Timeline already exists.
1307    TimelineAlreadyExists(String),
1308    /// Id Allocator already exists.
1309    IdAllocatorAlreadyExists(String),
1310    /// Config already exists.
1311    ConfigAlreadyExists(String),
1312    /// Builtin migrations failed.
1313    FailedBuiltinSchemaMigration(String),
1314    /// StorageCollectionMetadata already exists.
1315    StorageCollectionMetadataAlreadyExists(GlobalId),
1316}
1317
1318impl fmt::Display for CatalogError {
1319    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1320        match self {
1321            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1322            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1323            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1324            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1325            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1326            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1327            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1328            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1329            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1330            Self::NetworkPolicyAlreadyExists(name) => {
1331                write!(f, "network policy '{name}' already exists")
1332            }
1333            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1334            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1335            Self::UnexpectedBuiltinCluster(name) => {
1336                write!(f, "Unexpected builtin cluster '{}'", name)
1337            }
1338            Self::UnexpectedBuiltinClusterType(name) => {
1339                write!(f, "Unexpected builtin cluster type'{}'", name)
1340            }
1341            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1342            Self::UnknownClusterReplica(name) => {
1343                write!(f, "unknown cluster replica '{}'", name)
1344            }
1345            Self::UnknownClusterReplicaSize(name) => {
1346                write!(f, "unknown cluster replica size '{}'", name)
1347            }
1348            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1349                f,
1350                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1351            ),
1352            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1353            Self::ItemAlreadyExists(_gid, name) => {
1354                write!(f, "catalog item '{name}' already exists")
1355            }
1356            Self::UnexpectedType {
1357                name,
1358                actual_type,
1359                expected_type,
1360            } => {
1361                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1362            }
1363            Self::InvalidDependency { name, typ } => write!(
1364                f,
1365                "catalog item '{}' is {} {} and so cannot be depended upon",
1366                name,
1367                if matches!(typ, CatalogItemType::Index) {
1368                    "an"
1369                } else {
1370                    "a"
1371                },
1372                typ,
1373            ),
1374            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1375            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1376            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1377            Self::IdAllocatorAlreadyExists(name) => {
1378                write!(f, "ID allocator '{name}' already exists")
1379            }
1380            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1381            Self::FailedBuiltinSchemaMigration(objects) => {
1382                write!(f, "failed to migrate schema of builtin objects: {objects}")
1383            }
1384            Self::StorageCollectionMetadataAlreadyExists(key) => {
1385                write!(f, "storage metadata for '{key}' already exists")
1386            }
1387        }
1388    }
1389}
1390
1391impl CatalogError {
1392    /// Returns any applicable hints for [`CatalogError`].
1393    pub fn hint(&self) -> Option<String> {
1394        match self {
1395            CatalogError::UnknownFunction { alternative, .. } => {
1396                match alternative {
1397                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1398                    Some(alt) => Some(format!("Try using {alt}")),
1399                }
1400            }
1401            _ => None,
1402        }
1403    }
1404}
1405
1406impl Error for CatalogError {}
1407
1408// Enum variant docs would be useless here.
1409#[allow(missing_docs)]
1410#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1411/// The types of objects stored in the catalog.
1412pub enum ObjectType {
1413    Table,
1414    View,
1415    MaterializedView,
1416    Source,
1417    Sink,
1418    Index,
1419    Type,
1420    Role,
1421    Cluster,
1422    ClusterReplica,
1423    Secret,
1424    Connection,
1425    Database,
1426    Schema,
1427    Func,
1428    ContinualTask,
1429    NetworkPolicy,
1430}
1431
1432impl ObjectType {
1433    /// Reports if the object type can be treated as a relation.
1434    pub fn is_relation(&self) -> bool {
1435        match self {
1436            ObjectType::Table
1437            | ObjectType::View
1438            | ObjectType::MaterializedView
1439            | ObjectType::Source
1440            | ObjectType::ContinualTask => true,
1441            ObjectType::Sink
1442            | ObjectType::Index
1443            | ObjectType::Type
1444            | ObjectType::Secret
1445            | ObjectType::Connection
1446            | ObjectType::Func
1447            | ObjectType::Database
1448            | ObjectType::Schema
1449            | ObjectType::Cluster
1450            | ObjectType::ClusterReplica
1451            | ObjectType::Role
1452            | ObjectType::NetworkPolicy => false,
1453        }
1454    }
1455}
1456
1457impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1458    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1459        match value {
1460            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1461            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1462            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1463            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1464            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1465            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1466            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1467            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1468            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1469            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1470            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1471            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1472            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1473            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1474            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1475            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1476            mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask,
1477            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1478        }
1479    }
1480}
1481
1482impl From<CommentObjectId> for ObjectType {
1483    fn from(value: CommentObjectId) -> ObjectType {
1484        match value {
1485            CommentObjectId::Table(_) => ObjectType::Table,
1486            CommentObjectId::View(_) => ObjectType::View,
1487            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1488            CommentObjectId::Source(_) => ObjectType::Source,
1489            CommentObjectId::Sink(_) => ObjectType::Sink,
1490            CommentObjectId::Index(_) => ObjectType::Index,
1491            CommentObjectId::Func(_) => ObjectType::Func,
1492            CommentObjectId::Connection(_) => ObjectType::Connection,
1493            CommentObjectId::Type(_) => ObjectType::Type,
1494            CommentObjectId::Secret(_) => ObjectType::Secret,
1495            CommentObjectId::Role(_) => ObjectType::Role,
1496            CommentObjectId::Database(_) => ObjectType::Database,
1497            CommentObjectId::Schema(_) => ObjectType::Schema,
1498            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1499            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1500            CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1501            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1502        }
1503    }
1504}
1505
1506impl Display for ObjectType {
1507    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1508        f.write_str(match self {
1509            ObjectType::Table => "TABLE",
1510            ObjectType::View => "VIEW",
1511            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1512            ObjectType::Source => "SOURCE",
1513            ObjectType::Sink => "SINK",
1514            ObjectType::Index => "INDEX",
1515            ObjectType::Type => "TYPE",
1516            ObjectType::Role => "ROLE",
1517            ObjectType::Cluster => "CLUSTER",
1518            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1519            ObjectType::Secret => "SECRET",
1520            ObjectType::Connection => "CONNECTION",
1521            ObjectType::Database => "DATABASE",
1522            ObjectType::Schema => "SCHEMA",
1523            ObjectType::Func => "FUNCTION",
1524            ObjectType::ContinualTask => "CONTINUAL TASK",
1525            ObjectType::NetworkPolicy => "NETWORK POLICY",
1526        })
1527    }
1528}
1529
1530#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1531/// The types of objects in the system.
1532pub enum SystemObjectType {
1533    /// Catalog object type.
1534    Object(ObjectType),
1535    /// Entire system.
1536    System,
1537}
1538
1539impl SystemObjectType {
1540    /// Reports if the object type can be treated as a relation.
1541    pub fn is_relation(&self) -> bool {
1542        match self {
1543            SystemObjectType::Object(object_type) => object_type.is_relation(),
1544            SystemObjectType::System => false,
1545        }
1546    }
1547}
1548
1549impl Display for SystemObjectType {
1550    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1551        match self {
1552            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1553            SystemObjectType::System => f.write_str("SYSTEM"),
1554        }
1555    }
1556}
1557
1558/// Enum used to format object names in error messages.
1559#[derive(Debug, Clone, PartialEq, Eq)]
1560pub enum ErrorMessageObjectDescription {
1561    /// The name of a specific object.
1562    Object {
1563        /// Type of object.
1564        object_type: ObjectType,
1565        /// Name of object.
1566        object_name: Option<String>,
1567    },
1568    /// The name of the entire system.
1569    System,
1570}
1571
1572impl ErrorMessageObjectDescription {
1573    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1574    pub fn from_id(
1575        object_id: &ObjectId,
1576        catalog: &dyn SessionCatalog,
1577    ) -> ErrorMessageObjectDescription {
1578        let object_name = match object_id {
1579            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1580            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1581                .get_cluster_replica(*cluster_id, *replica_id)
1582                .name()
1583                .to_string(),
1584            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1585            ObjectId::Schema((database_spec, schema_spec)) => {
1586                let name = catalog.get_schema(database_spec, schema_spec).name();
1587                catalog.resolve_full_schema_name(name).to_string()
1588            }
1589            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1590            ObjectId::Item(id) => {
1591                let name = catalog.get_item(id).name();
1592                catalog.resolve_full_name(name).to_string()
1593            }
1594            ObjectId::NetworkPolicy(network_policy_id) => catalog
1595                .get_network_policy(network_policy_id)
1596                .name()
1597                .to_string(),
1598        };
1599        ErrorMessageObjectDescription::Object {
1600            object_type: catalog.get_object_type(object_id),
1601            object_name: Some(object_name),
1602        }
1603    }
1604
1605    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1606    pub fn from_sys_id(
1607        object_id: &SystemObjectId,
1608        catalog: &dyn SessionCatalog,
1609    ) -> ErrorMessageObjectDescription {
1610        match object_id {
1611            SystemObjectId::Object(object_id) => {
1612                ErrorMessageObjectDescription::from_id(object_id, catalog)
1613            }
1614            SystemObjectId::System => ErrorMessageObjectDescription::System,
1615        }
1616    }
1617
1618    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1619    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1620        match object_type {
1621            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1622                object_type,
1623                object_name: None,
1624            },
1625            SystemObjectType::System => ErrorMessageObjectDescription::System,
1626        }
1627    }
1628}
1629
1630impl Display for ErrorMessageObjectDescription {
1631    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1632        match self {
1633            ErrorMessageObjectDescription::Object {
1634                object_type,
1635                object_name,
1636            } => {
1637                let object_name = object_name
1638                    .as_ref()
1639                    .map(|object_name| format!(" {}", object_name.quoted()))
1640                    .unwrap_or_else(|| "".to_string());
1641                write!(f, "{object_type}{object_name}")
1642            }
1643            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1644        }
1645    }
1646}
1647
1648#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1649// These attributes are needed because the key of a map must be a string. We also
1650// get the added benefit of flattening this struct in it's serialized form.
1651#[serde(into = "BTreeMap<String, RoleId>")]
1652#[serde(try_from = "BTreeMap<String, RoleId>")]
1653/// Represents the grantee and a grantor of a role membership.
1654pub struct RoleMembership {
1655    /// Key is the role that some role is a member of, value is the grantor role ID.
1656    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1657    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1658    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1659    // migration.
1660    pub map: BTreeMap<RoleId, RoleId>,
1661}
1662
1663impl RoleMembership {
1664    /// Creates a new [`RoleMembership`].
1665    pub fn new() -> RoleMembership {
1666        RoleMembership {
1667            map: BTreeMap::new(),
1668        }
1669    }
1670}
1671
1672impl From<RoleMembership> for BTreeMap<String, RoleId> {
1673    fn from(value: RoleMembership) -> Self {
1674        value
1675            .map
1676            .into_iter()
1677            .map(|(k, v)| (k.to_string(), v))
1678            .collect()
1679    }
1680}
1681
1682impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1683    type Error = anyhow::Error;
1684
1685    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1686        Ok(RoleMembership {
1687            map: value
1688                .into_iter()
1689                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1690                .collect::<Result<_, anyhow::Error>>()?,
1691        })
1692    }
1693}
1694
1695/// Specification for objects that will be affected by a default privilege.
1696#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1697pub struct DefaultPrivilegeObject {
1698    /// The role id that created the object.
1699    pub role_id: RoleId,
1700    /// The database that the object is created in if Some, otherwise all databases.
1701    pub database_id: Option<DatabaseId>,
1702    /// The schema that the object is created in if Some, otherwise all databases.
1703    pub schema_id: Option<SchemaId>,
1704    /// The type of object.
1705    pub object_type: ObjectType,
1706}
1707
1708impl DefaultPrivilegeObject {
1709    /// Creates a new [`DefaultPrivilegeObject`].
1710    pub fn new(
1711        role_id: RoleId,
1712        database_id: Option<DatabaseId>,
1713        schema_id: Option<SchemaId>,
1714        object_type: ObjectType,
1715    ) -> DefaultPrivilegeObject {
1716        DefaultPrivilegeObject {
1717            role_id,
1718            database_id,
1719            schema_id,
1720            object_type,
1721        }
1722    }
1723}
1724
1725impl std::fmt::Display for DefaultPrivilegeObject {
1726    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1727        // TODO: Don't just wrap Debug.
1728        write!(f, "{self:?}")
1729    }
1730}
1731
1732/// Specification for the privileges that will be granted from default privileges.
1733#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1734pub struct DefaultPrivilegeAclItem {
1735    /// The role that will receive the privileges.
1736    pub grantee: RoleId,
1737    /// The specific privileges granted.
1738    pub acl_mode: AclMode,
1739}
1740
1741impl DefaultPrivilegeAclItem {
1742    /// Creates a new [`DefaultPrivilegeAclItem`].
1743    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1744        DefaultPrivilegeAclItem { grantee, acl_mode }
1745    }
1746
1747    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1748    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1749        MzAclItem {
1750            grantee: self.grantee,
1751            grantor,
1752            acl_mode: self.acl_mode,
1753        }
1754    }
1755}
1756
1757/// Which builtins to return in `BUILTINS::iter`.
1758///
1759/// All calls to `BUILTINS::iter` within the lifetime of an environmentd process
1760/// must provide an equal `BuiltinsConfig`. It is not allowed to change
1761/// dynamically.
1762#[derive(Debug, Clone)]
1763pub struct BuiltinsConfig {
1764    /// If true, include system builtin continual tasks.
1765    pub include_continual_tasks: bool,
1766}
1767
1768#[cfg(test)]
1769mod tests {
1770    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1771
1772    #[mz_ore::test]
1773    fn test_environment_id() {
1774        for (input, expected) in [
1775            (
1776                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1777                Ok(EnvironmentId {
1778                    cloud_provider: CloudProvider::Local,
1779                    cloud_provider_region: "az1".into(),
1780                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1781                    ordinal: 452,
1782                }),
1783            ),
1784            (
1785                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1786                Ok(EnvironmentId {
1787                    cloud_provider: CloudProvider::Aws,
1788                    cloud_provider_region: "us-east-1".into(),
1789                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1790                    ordinal: 0,
1791                }),
1792            ),
1793            (
1794                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1795                Ok(EnvironmentId {
1796                    cloud_provider: CloudProvider::Gcp,
1797                    cloud_provider_region: "us-central1".into(),
1798                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1799                    ordinal: 0,
1800                }),
1801            ),
1802            (
1803                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1804                Ok(EnvironmentId {
1805                    cloud_provider: CloudProvider::Azure,
1806                    cloud_provider_region: "australiaeast".into(),
1807                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1808                    ordinal: 0,
1809                }),
1810            ),
1811            (
1812                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1813                Ok(EnvironmentId {
1814                    cloud_provider: CloudProvider::Generic,
1815                    cloud_provider_region: "moon-station-11-darkside".into(),
1816                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1817                    ordinal: 0,
1818                }),
1819            ),
1820            ("", Err(InvalidEnvironmentIdError)),
1821            (
1822                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1823                Err(InvalidEnvironmentIdError),
1824            ),
1825            (
1826                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1827                Err(InvalidEnvironmentIdError),
1828            ),
1829            (
1830                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1831                Err(InvalidEnvironmentIdError),
1832            ),
1833        ] {
1834            let actual = input.parse();
1835            assert_eq!(expected, actual, "input = {}", input);
1836            if let Ok(actual) = actual {
1837                assert_eq!(input, actual.to_string(), "input = {}", input);
1838            }
1839        }
1840    }
1841}