mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::str::FromStr;
20use std::sync::LazyLock;
21use std::time::{Duration, Instant};
22
23use chrono::{DateTime, Utc};
24use mz_auth::password::Password;
25use mz_build_info::BuildInfo;
26use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::MirScalarExpr;
29use mz_ore::now::{EpochMillis, NowFn};
30use mz_ore::str::StrExt;
31use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
32use mz_repr::explain::ExprHumanizer;
33use mz_repr::network_policy_id::NetworkPolicyId;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
37};
38use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
39use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
40use mz_storage_types::connections::{Connection, ConnectionContext};
41use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
42use proptest_derive::Arbitrary;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use uuid::Uuid;
46
47use crate::func::Func;
48use crate::names::{
49    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
50    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
51    SchemaSpecifier, SystemObjectId,
52};
53use crate::plan::statement::StatementDesc;
54use crate::plan::statement::ddl::PlannedRoleAttributes;
55use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
56use crate::session::vars::{OwnedVarInput, SystemVars};
57
58/// A catalog keeps track of SQL objects and session state available to the
59/// planner.
60///
61/// The `sql` crate is agnostic to any particular catalog implementation. This
62/// trait describes the required interface.
63///
64/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
65/// catalog contains databases, databases contain schemas, and schemas contain
66/// catalog items, like sources, sinks, view, and indexes.
67///
68/// There are two classes of operations provided by a catalog:
69///
70///   * Resolution operations, like [`resolve_item`]. These fill in missing name
71///     components based upon connection defaults, e.g., resolving the partial
72///     name `view42` to the fully-specified name `materialize.public.view42`.
73///
74///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
75///     metadata about a catalog entity based on a fully-specified name that is
76///     known to be valid (i.e., because the name was successfully resolved, or
77///     was constructed based on the output of a prior lookup operation). These
78///     functions panic if called with invalid input.
79///
80///   * Session management, such as managing variables' states and adding
81///     notices to the session.
82///
83/// [`get_databases`]: SessionCatalog::get_databases
84/// [`get_item`]: SessionCatalog::get_item
85/// [`resolve_item`]: SessionCatalog::resolve_item
86pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
87    /// Returns the id of the role that is issuing the query.
88    fn active_role_id(&self) -> &RoleId;
89
90    /// Returns the database to use if one is not explicitly specified.
91    fn active_database_name(&self) -> Option<&str> {
92        self.active_database()
93            .map(|id| self.get_database(id))
94            .map(|db| db.name())
95    }
96
97    /// Returns the database to use if one is not explicitly specified.
98    fn active_database(&self) -> Option<&DatabaseId>;
99
100    /// Returns the cluster to use if one is not explicitly specified.
101    fn active_cluster(&self) -> &str;
102
103    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
104    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
105
106    /// Returns the descriptor of the named prepared statement on the session, or
107    /// None if the prepared statement does not exist.
108    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
109
110    /// Resolves the named database.
111    ///
112    /// If `database_name` exists in the catalog, it returns a reference to the
113    /// resolved database; otherwise it returns an error.
114    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
115
116    /// Gets a database by its ID.
117    ///
118    /// Panics if `id` does not specify a valid database.
119    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
120
121    /// Gets all databases.
122    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
123
124    /// Resolves a partially-specified schema name.
125    ///
126    /// If the schema exists in the catalog, it returns a reference to the
127    /// resolved schema; otherwise it returns an error.
128    fn resolve_schema(
129        &self,
130        database_name: Option<&str>,
131        schema_name: &str,
132    ) -> Result<&dyn CatalogSchema, CatalogError>;
133
134    /// Resolves a schema name within a specified database.
135    ///
136    /// If the schema exists in the database, it returns a reference to the
137    /// resolved schema; otherwise it returns an error.
138    fn resolve_schema_in_database(
139        &self,
140        database_spec: &ResolvedDatabaseSpecifier,
141        schema_name: &str,
142    ) -> Result<&dyn CatalogSchema, CatalogError>;
143
144    /// Gets a schema by its ID.
145    ///
146    /// Panics if `id` does not specify a valid schema.
147    fn get_schema(
148        &self,
149        database_spec: &ResolvedDatabaseSpecifier,
150        schema_spec: &SchemaSpecifier,
151    ) -> &dyn CatalogSchema;
152
153    /// Gets all schemas.
154    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
155
156    /// Gets the mz_internal schema id.
157    fn get_mz_internal_schema_id(&self) -> SchemaId;
158
159    /// Gets the mz_unsafe schema id.
160    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
161
162    /// Returns true if `schema` is an internal system schema, false otherwise
163    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
164
165    /// Resolves the named role.
166    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
167
168    /// Resolves the named network policy.
169    fn resolve_network_policy(
170        &self,
171        network_policy_name: &str,
172    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
173
174    /// Gets a role by its ID.
175    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
176
177    /// Gets a role by its ID.
178    ///
179    /// Panics if `id` does not specify a valid role.
180    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
181
182    /// Gets all roles.
183    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
184
185    /// Gets the id of the `mz_system` role.
186    fn mz_system_role_id(&self) -> RoleId;
187
188    /// Collects all role IDs that `id` is transitively a member of.
189    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
190
191    /// Resolves the named cluster.
192    /// Gets a network_policy by its ID.
193    ///
194    /// Panics if `id` does not specify a valid role.
195    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
196
197    /// Gets all roles.
198    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
199
200    ///
201    /// If the provided name is `None`, resolves the currently active cluster.
202    fn resolve_cluster<'a, 'b>(
203        &'a self,
204        cluster_name: Option<&'b str>,
205    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
206
207    /// Resolves the named cluster replica.
208    fn resolve_cluster_replica<'a, 'b>(
209        &'a self,
210        cluster_replica_name: &'b QualifiedReplica,
211    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
212
213    /// Resolves a partially-specified item name, that is NOT a function or
214    /// type. (For resolving functions or types, please use
215    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
216    ///
217    /// If the partial name has a database component, it searches only the
218    /// specified database; otherwise, it searches the active database. If the
219    /// partial name has a schema component, it searches only the specified
220    /// schema; otherwise, it searches a default set of schemas within the
221    /// selected database. It returns an error if none of the searched schemas
222    /// contain an item whose name matches the item component of the partial
223    /// name.
224    ///
225    /// Note that it is not an error if the named item appears in more than one
226    /// of the search schemas. The catalog implementation must choose one.
227    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
228
229    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
230    /// functions within the catalog.
231    fn resolve_function(
232        &self,
233        item_name: &PartialItemName,
234    ) -> Result<&dyn CatalogItem, CatalogError>;
235
236    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
237    /// types within the catalog.
238    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
239
240    /// Resolves `name` to a type or item, preferring the type if both exist.
241    fn resolve_item_or_type(
242        &self,
243        name: &PartialItemName,
244    ) -> Result<&dyn CatalogItem, CatalogError> {
245        if let Ok(ty) = self.resolve_type(name) {
246            return Ok(ty);
247        }
248        self.resolve_item(name)
249    }
250
251    /// Gets a type named `name` from exactly one of the system schemas.
252    ///
253    /// # Panics
254    /// - If `name` is not an entry in any system schema
255    /// - If more than one system schema has an entry named `name`.
256    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
257
258    /// Gets an item by its ID.
259    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
260
261    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
262    /// exist.
263    ///
264    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
265    fn try_get_item_by_global_id<'a>(
266        &'a self,
267        id: &GlobalId,
268    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
269
270    /// Gets an item by its ID.
271    ///
272    /// Panics if `id` does not specify a valid item.
273    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
274
275    /// Gets an item by a [`GlobalId`].
276    ///
277    /// Panics if `id` does not specify a valid item.
278    ///
279    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
280    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
281
282    /// Gets all items.
283    fn get_items(&self) -> Vec<&dyn CatalogItem>;
284
285    /// Looks up an item by its name.
286    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
287
288    /// Looks up a type by its name.
289    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
290
291    /// Gets a cluster by ID.
292    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster;
293
294    /// Gets all clusters.
295    fn get_clusters(&self) -> Vec<&dyn CatalogCluster>;
296
297    /// Gets a cluster replica by ID.
298    fn get_cluster_replica(
299        &self,
300        cluster_id: ClusterId,
301        replica_id: ReplicaId,
302    ) -> &dyn CatalogClusterReplica;
303
304    /// Gets all cluster replicas.
305    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica>;
306
307    /// Gets all system privileges.
308    fn get_system_privileges(&self) -> &PrivilegeMap;
309
310    /// Gets all default privileges.
311    fn get_default_privileges(
312        &self,
313    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
314
315    /// Finds a name like `name` that is not already in use.
316    ///
317    /// If `name` itself is available, it is returned unchanged.
318    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
319
320    /// Returns a fully qualified human readable name from fully qualified non-human readable name
321    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
322
323    /// Returns a fully qualified human readable schema name from fully qualified non-human
324    /// readable schema name
325    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
326
327    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
328    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
329
330    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
331    fn resolve_global_id(
332        &self,
333        item_id: &CatalogItemId,
334        version: RelationVersionSelector,
335    ) -> GlobalId;
336
337    /// Returns the configuration of the catalog.
338    fn config(&self) -> &CatalogConfig;
339
340    /// Returns the number of milliseconds since the system epoch. For normal use
341    /// this means the Unix epoch. This can safely be mocked in tests and start
342    /// at 0.
343    fn now(&self) -> EpochMillis;
344
345    /// Returns the set of supported AWS PrivateLink availability zone ids.
346    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
347
348    /// Returns system vars
349    fn system_vars(&self) -> &SystemVars;
350
351    /// Returns mutable system vars
352    ///
353    /// Clients should use this this method carefully, as changes to the backing
354    /// state here are not guarateed to be persisted. The motivating use case
355    /// for this method was ensuring that features are temporary turned on so
356    /// catalog rehydration does not break due to unsupported SQL syntax.
357    fn system_vars_mut(&mut self) -> &mut SystemVars;
358
359    /// Returns the [`RoleId`] of the owner of an object by its ID.
360    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
361
362    /// Returns the [`PrivilegeMap`] of the object.
363    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
364
365    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
366    ///
367    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
368    /// earlier in the list than the roots. This is particularly userful for the order to drop
369    /// objects.
370    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
371
372    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
373    ///
374    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
375    /// earlier in the list than `id`. This is particularly userful for the order to drop
376    /// objects.
377    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
378
379    /// Returns all possible privileges associated with an object type.
380    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
381
382    /// Returns the object type of `object_id`.
383    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
384
385    /// Returns the system object type of `id`.
386    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
387
388    /// Returns the minimal qualification required to unambiguously specify
389    /// `qualified_name`.
390    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
391
392    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
393    /// successfully executes.
394    fn add_notice(&self, notice: PlanNotice);
395
396    /// Returns the associated comments for the given `id`
397    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
398
399    /// Reports whether the specified cluster size is a modern "cc" size rather
400    /// than a legacy T-shirt size.
401    fn is_cluster_size_cc(&self, size: &str) -> bool;
402}
403
404/// Configuration associated with a catalog.
405#[derive(Debug, Clone)]
406pub struct CatalogConfig {
407    /// Returns the time at which the catalog booted.
408    pub start_time: DateTime<Utc>,
409    /// Returns the instant at which the catalog booted.
410    pub start_instant: Instant,
411    /// A random integer associated with this instance of the catalog.
412    ///
413    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
414    /// topics. Perhaps we can remove this when database-issues#977 is complete.
415    pub nonce: u64,
416    /// A persistent ID associated with the environment.
417    pub environment_id: EnvironmentId,
418    /// A transient UUID associated with this process.
419    pub session_id: Uuid,
420    /// Information about this build of Materialize.
421    pub build_info: &'static BuildInfo,
422    /// Default timestamp interval.
423    pub timestamp_interval: Duration,
424    /// Function that returns a wall clock now time; can safely be mocked to return
425    /// 0.
426    pub now: NowFn,
427    /// Context for source and sink connections.
428    pub connection_context: ConnectionContext,
429    /// Which system builtins to include. Not allowed to change dynamically.
430    pub builtins_cfg: BuiltinsConfig,
431    /// Helm chart version
432    pub helm_chart_version: Option<String>,
433}
434
435/// A database in a [`SessionCatalog`].
436pub trait CatalogDatabase {
437    /// Returns a fully-specified name of the database.
438    fn name(&self) -> &str;
439
440    /// Returns a stable ID for the database.
441    fn id(&self) -> DatabaseId;
442
443    /// Returns whether the database contains schemas.
444    fn has_schemas(&self) -> bool;
445
446    /// Returns the schemas of the database as a map from schema name to
447    /// schema ID.
448    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
449
450    /// Returns the schemas of the database.
451    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
452
453    /// Returns the ID of the owning role.
454    fn owner_id(&self) -> RoleId;
455
456    /// Returns the privileges associated with the database.
457    fn privileges(&self) -> &PrivilegeMap;
458}
459
460/// A schema in a [`SessionCatalog`].
461pub trait CatalogSchema {
462    /// Returns a fully-specified id of the database
463    fn database(&self) -> &ResolvedDatabaseSpecifier;
464
465    /// Returns a fully-specified name of the schema.
466    fn name(&self) -> &QualifiedSchemaName;
467
468    /// Returns a stable ID for the schema.
469    fn id(&self) -> &SchemaSpecifier;
470
471    /// Lists the `CatalogItem`s for the schema.
472    fn has_items(&self) -> bool;
473
474    /// Returns the IDs of the items in the schema.
475    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
476
477    /// Returns the ID of the owning role.
478    fn owner_id(&self) -> RoleId;
479
480    /// Returns the privileges associated with the schema.
481    fn privileges(&self) -> &PrivilegeMap;
482}
483
484/// Attributes belonging to a [`CatalogRole`].
485#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
486pub struct RoleAttributes {
487    /// Indicates whether the role has inheritance of privileges.
488    pub inherit: bool,
489    /// The raw password of the role. This is for self managed auth, not cloud.
490    pub password: Option<Password>,
491    /// Whether or not this user is a superuser.
492    pub superuser: Option<bool>,
493    /// Whether this role is login
494    pub login: Option<bool>,
495    // Force use of constructor.
496    _private: (),
497}
498
499impl RoleAttributes {
500    /// Creates a new [`RoleAttributes`] with default attributes.
501    pub const fn new() -> RoleAttributes {
502        RoleAttributes {
503            inherit: true,
504            password: None,
505            superuser: None,
506            login: None,
507            _private: (),
508        }
509    }
510
511    /// Adds all attributes except password.
512    pub const fn with_all(mut self) -> RoleAttributes {
513        self.inherit = true;
514        self.superuser = Some(true);
515        self.login = Some(true);
516        self
517    }
518
519    /// Returns whether or not the role has inheritence of privileges.
520    pub const fn is_inherit(&self) -> bool {
521        self.inherit
522    }
523
524    /// Returns whether or not the role has a password.
525    pub const fn has_password(&self) -> bool {
526        self.password.is_some()
527    }
528
529    /// Returns self without the password.
530    pub fn without_password(self) -> RoleAttributes {
531        RoleAttributes {
532            inherit: self.inherit,
533            password: None,
534            superuser: self.superuser,
535            login: self.login,
536            _private: (),
537        }
538    }
539}
540
541impl From<PlannedRoleAttributes> for RoleAttributes {
542    fn from(
543        PlannedRoleAttributes {
544            inherit,
545            password,
546            superuser,
547            login,
548            ..
549        }: PlannedRoleAttributes,
550    ) -> RoleAttributes {
551        let default_attributes = RoleAttributes::new();
552        RoleAttributes {
553            inherit: inherit.unwrap_or(default_attributes.inherit),
554            password,
555            superuser,
556            login,
557            _private: (),
558        }
559    }
560}
561
562/// Default variable values for a [`CatalogRole`].
563#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
564pub struct RoleVars {
565    /// Map of variable names to their value.
566    pub map: BTreeMap<String, OwnedVarInput>,
567}
568
569/// A role in a [`SessionCatalog`].
570pub trait CatalogRole {
571    /// Returns a fully-specified name of the role.
572    fn name(&self) -> &str;
573
574    /// Returns a stable ID for the role.
575    fn id(&self) -> RoleId;
576
577    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
578    /// membership.
579    ///
580    /// Key is the role that some role is a member of, value is the grantor role ID.
581    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
582
583    /// Returns the attributes associated with this role.
584    fn attributes(&self) -> &RoleAttributes;
585
586    /// Returns all variables that this role has a default value stored for.
587    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
588}
589
590/// A network policy in a [`SessionCatalog`].
591pub trait CatalogNetworkPolicy {
592    /// Returns a fully-specified name of the NetworkPolicy.
593    fn name(&self) -> &str;
594
595    /// Returns a stable ID for the NetworkPolicy.
596    fn id(&self) -> NetworkPolicyId;
597
598    /// Returns the ID of the owning NetworkPolicy.
599    fn owner_id(&self) -> RoleId;
600
601    /// Returns the privileges associated with the NetworkPolicy.
602    fn privileges(&self) -> &PrivilegeMap;
603}
604
605/// A cluster in a [`SessionCatalog`].
606pub trait CatalogCluster<'a> {
607    /// Returns a fully-specified name of the cluster.
608    fn name(&self) -> &str;
609
610    /// Returns a stable ID for the cluster.
611    fn id(&self) -> ClusterId;
612
613    /// Returns the objects that are bound to this cluster.
614    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
615
616    /// Returns the replicas of the cluster as a map from replica name to
617    /// replica ID.
618    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
619
620    /// Returns the replicas of the cluster.
621    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica>;
622
623    /// Returns the replica belonging to the cluster with replica ID `id`.
624    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica;
625
626    /// Returns the ID of the owning role.
627    fn owner_id(&self) -> RoleId;
628
629    /// Returns the privileges associated with the cluster.
630    fn privileges(&self) -> &PrivilegeMap;
631
632    /// Returns true if this cluster is a managed cluster.
633    fn is_managed(&self) -> bool;
634
635    /// Returns the size of the cluster, if the cluster is a managed cluster.
636    fn managed_size(&self) -> Option<&str>;
637
638    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
639    fn schedule(&self) -> Option<&ClusterSchedule>;
640
641    /// Try to convert this cluster into a [`CreateClusterPlan`].
642    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
643    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
644}
645
646/// A cluster replica in a [`SessionCatalog`]
647pub trait CatalogClusterReplica<'a>: Debug {
648    /// Returns the name of the cluster replica.
649    fn name(&self) -> &str;
650
651    /// Returns a stable ID for the cluster that the replica belongs to.
652    fn cluster_id(&self) -> ClusterId;
653
654    /// Returns a stable ID for the replica.
655    fn replica_id(&self) -> ReplicaId;
656
657    /// Returns the ID of the owning role.
658    fn owner_id(&self) -> RoleId;
659
660    /// Returns whether or not the replica is internal
661    fn internal(&self) -> bool;
662}
663
664/// An item in a [`SessionCatalog`].
665///
666/// Note that "item" has a very specific meaning in the context of a SQL
667/// catalog, and refers to the various entities that belong to a schema.
668pub trait CatalogItem {
669    /// Returns the fully qualified name of the catalog item.
670    fn name(&self) -> &QualifiedItemName;
671
672    /// Returns the [`CatalogItemId`] for the item.
673    fn id(&self) -> CatalogItemId;
674
675    /// Returns the [`GlobalId`]s associated with this item.
676    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
677
678    /// Returns the catalog item's OID.
679    fn oid(&self) -> u32;
680
681    /// Returns the resolved function.
682    ///
683    /// If the catalog item is not of a type that produces functions (i.e.,
684    /// anything other than a function), it returns an error.
685    fn func(&self) -> Result<&'static Func, CatalogError>;
686
687    /// Returns the resolved source connection.
688    ///
689    /// If the catalog item is not of a type that contains a `SourceDesc`
690    /// (i.e., anything other than sources), it returns an error.
691    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
692
693    /// Returns the resolved connection.
694    ///
695    /// If the catalog item is not a connection, it returns an error.
696    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
697
698    /// Returns the type of the catalog item.
699    fn item_type(&self) -> CatalogItemType;
700
701    /// A normalized SQL statement that describes how to create the catalog
702    /// item.
703    fn create_sql(&self) -> &str;
704
705    /// Returns the IDs of the catalog items upon which this catalog item
706    /// directly references.
707    fn references(&self) -> &ResolvedIds;
708
709    /// Returns the IDs of the catalog items upon which this catalog item
710    /// depends.
711    fn uses(&self) -> BTreeSet<CatalogItemId>;
712
713    /// Returns the IDs of the catalog items that directly reference this catalog item.
714    fn referenced_by(&self) -> &[CatalogItemId];
715
716    /// Returns the IDs of the catalog items that depend upon this catalog item.
717    fn used_by(&self) -> &[CatalogItemId];
718
719    /// Reports whether this catalog entry is a subsource and, if it is, the
720    /// ingestion it is an export of, as well as the item it exports.
721    fn subsource_details(
722        &self,
723    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
724
725    /// Reports whether this catalog entry is a source export and, if it is, the
726    /// ingestion it is an export of, as well as the item it exports.
727    fn source_export_details(
728        &self,
729    ) -> Option<(
730        CatalogItemId,
731        &UnresolvedItemName,
732        &SourceExportDetails,
733        &SourceExportDataConfig<ReferencedConnection>,
734    )>;
735
736    /// Reports whether this catalog item is a progress source.
737    fn is_progress_source(&self) -> bool;
738
739    /// If this catalog item is a source, it return the IDs of its progress collection.
740    fn progress_id(&self) -> Option<CatalogItemId>;
741
742    /// Returns the index details associated with the catalog item, if the
743    /// catalog item is an index.
744    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
745
746    /// Returns the column defaults associated with the catalog item, if the
747    /// catalog item is a table that accepts writes.
748    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
749
750    /// Returns the type information associated with the catalog item, if the
751    /// catalog item is a type.
752    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
753
754    /// Returns the ID of the owning role.
755    fn owner_id(&self) -> RoleId;
756
757    /// Returns the privileges associated with the item.
758    fn privileges(&self) -> &PrivilegeMap;
759
760    /// Returns the cluster the item belongs to.
761    fn cluster_id(&self) -> Option<ClusterId>;
762
763    /// Returns the [`CatalogCollectionItem`] for a specific version of this
764    /// [`CatalogItem`].
765    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
766
767    /// The latest version of this item, if it's version-able.
768    fn latest_version(&self) -> Option<RelationVersion>;
769}
770
771/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
772/// refers to.
773pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
774    /// Returns a description of the result set produced by the catalog item.
775    ///
776    /// If the catalog item is not of a type that produces data (i.e., a sink or
777    /// an index), it returns an error.
778    fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, CatalogError>;
779
780    /// The [`GlobalId`] for this item.
781    fn global_id(&self) -> GlobalId;
782}
783
784/// The type of a [`CatalogItem`].
785#[derive(Debug, Deserialize, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
786pub enum CatalogItemType {
787    /// A table.
788    Table,
789    /// A source.
790    Source,
791    /// A sink.
792    Sink,
793    /// A view.
794    View,
795    /// A materialized view.
796    MaterializedView,
797    /// An index.
798    Index,
799    /// A type.
800    Type,
801    /// A func.
802    Func,
803    /// A secret.
804    Secret,
805    /// A connection.
806    Connection,
807    /// A continual task.
808    ContinualTask,
809}
810
811impl CatalogItemType {
812    /// Reports whether the given type of item conflicts with items of type
813    /// `CatalogItemType::Type`.
814    ///
815    /// In PostgreSQL, even though types live in a separate namespace from other
816    /// schema objects, creating a table, view, or materialized view creates a
817    /// type named after that relation. This prevents creating a type with the
818    /// same name as a relational object, even though types and relational
819    /// objects live in separate namespaces. (Indexes are even weirder; while
820    /// they don't get a type with the same name, they get an entry in
821    /// `pg_class` that prevents *record* types of the same name as the index,
822    /// but not other types of types, like enums.)
823    ///
824    /// We don't presently construct types that mirror relational objects,
825    /// though we likely will need to in the future for full PostgreSQL
826    /// compatibility (see database-issues#7142). For now, we use this method to
827    /// prevent creating types and relational objects that have the same name, so
828    /// that it is a backwards compatible change in the future to introduce a
829    /// type named after each relational object in the system.
830    pub fn conflicts_with_type(&self) -> bool {
831        match self {
832            CatalogItemType::Table => true,
833            CatalogItemType::Source => true,
834            CatalogItemType::View => true,
835            CatalogItemType::MaterializedView => true,
836            CatalogItemType::Index => true,
837            CatalogItemType::Type => true,
838            CatalogItemType::Sink => false,
839            CatalogItemType::Func => false,
840            CatalogItemType::Secret => false,
841            CatalogItemType::Connection => false,
842            CatalogItemType::ContinualTask => true,
843        }
844    }
845}
846
847impl fmt::Display for CatalogItemType {
848    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
849        match self {
850            CatalogItemType::Table => f.write_str("table"),
851            CatalogItemType::Source => f.write_str("source"),
852            CatalogItemType::Sink => f.write_str("sink"),
853            CatalogItemType::View => f.write_str("view"),
854            CatalogItemType::MaterializedView => f.write_str("materialized view"),
855            CatalogItemType::Index => f.write_str("index"),
856            CatalogItemType::Type => f.write_str("type"),
857            CatalogItemType::Func => f.write_str("func"),
858            CatalogItemType::Secret => f.write_str("secret"),
859            CatalogItemType::Connection => f.write_str("connection"),
860            CatalogItemType::ContinualTask => f.write_str("continual task"),
861        }
862    }
863}
864
865impl From<CatalogItemType> for ObjectType {
866    fn from(value: CatalogItemType) -> Self {
867        match value {
868            CatalogItemType::Table => ObjectType::Table,
869            CatalogItemType::Source => ObjectType::Source,
870            CatalogItemType::Sink => ObjectType::Sink,
871            CatalogItemType::View => ObjectType::View,
872            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
873            CatalogItemType::Index => ObjectType::Index,
874            CatalogItemType::Type => ObjectType::Type,
875            CatalogItemType::Func => ObjectType::Func,
876            CatalogItemType::Secret => ObjectType::Secret,
877            CatalogItemType::Connection => ObjectType::Connection,
878            CatalogItemType::ContinualTask => ObjectType::ContinualTask,
879        }
880    }
881}
882
883impl From<CatalogItemType> for mz_audit_log::ObjectType {
884    fn from(value: CatalogItemType) -> Self {
885        match value {
886            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
887            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
888            CatalogItemType::View => mz_audit_log::ObjectType::View,
889            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
890            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
891            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
892            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
893            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
894            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
895            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
896            CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
897        }
898    }
899}
900
901/// Details about a type in the catalog.
902#[derive(Clone, Debug, Eq, PartialEq)]
903pub struct CatalogTypeDetails<T: TypeReference> {
904    /// The ID of the type with this type as the array element, if available.
905    pub array_id: Option<CatalogItemId>,
906    /// The description of this type.
907    pub typ: CatalogType<T>,
908    /// Additional metadata about the type in PostgreSQL, if relevant.
909    pub pg_metadata: Option<CatalogTypePgMetadata>,
910}
911
912/// Additional PostgreSQL metadata about a type.
913#[derive(Clone, Debug, Eq, PartialEq)]
914pub struct CatalogTypePgMetadata {
915    /// The OID of the `typinput` function in PostgreSQL.
916    pub typinput_oid: u32,
917    /// The OID of the `typreceive` function in PostgreSQL.
918    pub typreceive_oid: u32,
919}
920
921/// Represents a reference to type in the catalog
922pub trait TypeReference {
923    /// The actual type used to reference a `CatalogType`
924    type Reference: Clone + Debug + Eq + PartialEq;
925}
926
927/// Reference to a type by it's name
928#[derive(Clone, Debug, Eq, PartialEq)]
929pub struct NameReference;
930
931impl TypeReference for NameReference {
932    type Reference = &'static str;
933}
934
935/// Reference to a type by it's global ID
936#[derive(Clone, Debug, Eq, PartialEq)]
937pub struct IdReference;
938
939impl TypeReference for IdReference {
940    type Reference = CatalogItemId;
941}
942
943/// A type stored in the catalog.
944///
945/// The variants correspond one-to-one with [`mz_repr::ScalarType`], but with type
946/// modifiers removed and with embedded types replaced with references to other
947/// types in the catalog.
948#[allow(missing_docs)]
949#[derive(Clone, Debug, Eq, PartialEq)]
950pub enum CatalogType<T: TypeReference> {
951    AclItem,
952    Array {
953        element_reference: T::Reference,
954    },
955    Bool,
956    Bytes,
957    Char,
958    Date,
959    Float32,
960    Float64,
961    Int16,
962    Int32,
963    Int64,
964    UInt16,
965    UInt32,
966    UInt64,
967    MzTimestamp,
968    Interval,
969    Jsonb,
970    List {
971        element_reference: T::Reference,
972        element_modifiers: Vec<i64>,
973    },
974    Map {
975        key_reference: T::Reference,
976        key_modifiers: Vec<i64>,
977        value_reference: T::Reference,
978        value_modifiers: Vec<i64>,
979    },
980    Numeric,
981    Oid,
982    PgLegacyChar,
983    PgLegacyName,
984    Pseudo,
985    Range {
986        element_reference: T::Reference,
987    },
988    Record {
989        fields: Vec<CatalogRecordField<T>>,
990    },
991    RegClass,
992    RegProc,
993    RegType,
994    String,
995    Time,
996    Timestamp,
997    TimestampTz,
998    Uuid,
999    VarChar,
1000    Int2Vector,
1001    MzAclItem,
1002}
1003
1004impl CatalogType<IdReference> {
1005    /// Returns the relation description for the type, if the type is a record
1006    /// type.
1007    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1008        match &self {
1009            CatalogType::Record { fields } => {
1010                let mut desc = RelationDesc::builder();
1011                for f in fields {
1012                    let name = f.name.clone();
1013                    let ty = query::scalar_type_from_catalog(
1014                        catalog,
1015                        f.type_reference,
1016                        &f.type_modifiers,
1017                    )?;
1018                    // TODO: support plumbing `NOT NULL` constraints through
1019                    // `CREATE TYPE`.
1020                    let ty = ty.nullable(true);
1021                    desc = desc.with_column(name, ty);
1022                }
1023                Ok(Some(desc.finish()))
1024            }
1025            _ => Ok(None),
1026        }
1027    }
1028}
1029
1030/// A description of a field in a [`CatalogType::Record`].
1031#[derive(Clone, Debug, Eq, PartialEq)]
1032pub struct CatalogRecordField<T: TypeReference> {
1033    /// The name of the field.
1034    pub name: ColumnName,
1035    /// The ID of the type of the field.
1036    pub type_reference: T::Reference,
1037    /// Modifiers to apply to the type.
1038    pub type_modifiers: Vec<i64>,
1039}
1040
1041#[derive(Clone, Debug, Eq, PartialEq)]
1042/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1043///
1044/// Note that Materialize also uses a number of pseudotypes when planning, but
1045/// we have yet to need to integrate them with `TypeCategory`.
1046///
1047/// [typcategory]:
1048/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1049pub enum TypeCategory {
1050    /// Array type.
1051    Array,
1052    /// Bit string type.
1053    BitString,
1054    /// Boolean type.
1055    Boolean,
1056    /// Composite type.
1057    Composite,
1058    /// Date/time type.
1059    DateTime,
1060    /// Enum type.
1061    Enum,
1062    /// Geometric type.
1063    Geometric,
1064    /// List type. Materialize specific.
1065    List,
1066    /// Network address type.
1067    NetworkAddress,
1068    /// Numeric type.
1069    Numeric,
1070    /// Pseudo type.
1071    Pseudo,
1072    /// Range type.
1073    Range,
1074    /// String type.
1075    String,
1076    /// Timestamp type.
1077    Timespan,
1078    /// User-defined type.
1079    UserDefined,
1080    /// Unknown type.
1081    Unknown,
1082}
1083
1084impl fmt::Display for TypeCategory {
1085    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1086        f.write_str(match self {
1087            TypeCategory::Array => "array",
1088            TypeCategory::BitString => "bit-string",
1089            TypeCategory::Boolean => "boolean",
1090            TypeCategory::Composite => "composite",
1091            TypeCategory::DateTime => "date-time",
1092            TypeCategory::Enum => "enum",
1093            TypeCategory::Geometric => "geometric",
1094            TypeCategory::List => "list",
1095            TypeCategory::NetworkAddress => "network-address",
1096            TypeCategory::Numeric => "numeric",
1097            TypeCategory::Pseudo => "pseudo",
1098            TypeCategory::Range => "range",
1099            TypeCategory::String => "string",
1100            TypeCategory::Timespan => "timespan",
1101            TypeCategory::UserDefined => "user-defined",
1102            TypeCategory::Unknown => "unknown",
1103        })
1104    }
1105}
1106
1107/// Identifies an environment.
1108///
1109/// Outside of tests, an environment ID can be constructed only from a string of
1110/// the following form:
1111///
1112/// ```text
1113/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1114/// ```
1115///
1116/// The fields have the following formats:
1117///
1118/// * The cloud provider consists of one or more alphanumeric characters.
1119/// * The cloud provider region consists of one or more alphanumeric or hyphen
1120///   characters.
1121/// * The organization ID is a UUID in its canonical text format.
1122/// * The ordinal is a decimal number with between one and eight digits.
1123///
1124/// There is no way to construct an environment ID from parts, to ensure that
1125/// the `Display` representation is parseable according to the above rules.
1126// NOTE(benesch): ideally we'd have accepted the components of the environment
1127// ID using separate command-line arguments, or at least a string format that
1128// used a field separator that did not appear in the fields. Alas. We can't
1129// easily change it now, as it's used as the e.g. default sink progress topic.
1130#[derive(Debug, Clone, PartialEq)]
1131pub struct EnvironmentId {
1132    cloud_provider: CloudProvider,
1133    cloud_provider_region: String,
1134    organization_id: Uuid,
1135    ordinal: u64,
1136}
1137
1138impl EnvironmentId {
1139    /// Creates a dummy `EnvironmentId` for use in tests.
1140    pub fn for_tests() -> EnvironmentId {
1141        EnvironmentId {
1142            cloud_provider: CloudProvider::Local,
1143            cloud_provider_region: "az1".into(),
1144            organization_id: Uuid::new_v4(),
1145            ordinal: 0,
1146        }
1147    }
1148
1149    /// Returns the cloud provider associated with this environment ID.
1150    pub fn cloud_provider(&self) -> &CloudProvider {
1151        &self.cloud_provider
1152    }
1153
1154    /// Returns the cloud provider region associated with this environment ID.
1155    pub fn cloud_provider_region(&self) -> &str {
1156        &self.cloud_provider_region
1157    }
1158
1159    /// Returns the name of the region associted with this environment ID.
1160    ///
1161    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1162    /// [`EnvironmentId::cloud_provider_region`].
1163    pub fn region(&self) -> String {
1164        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1165    }
1166
1167    /// Returns the organization ID associated with this environment ID.
1168    pub fn organization_id(&self) -> Uuid {
1169        self.organization_id
1170    }
1171
1172    /// Returns the ordinal associated with this environment ID.
1173    pub fn ordinal(&self) -> u64 {
1174        self.ordinal
1175    }
1176}
1177
1178// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1179// populated using this key. Consequently, any changes to that trait
1180// implementation will also have to be reflected in the existing feature
1181// targeting config in LaunchDarkly, otherwise environments might receive
1182// different configs upon restart.
1183impl fmt::Display for EnvironmentId {
1184    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1185        write!(
1186            f,
1187            "{}-{}-{}-{}",
1188            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1189        )
1190    }
1191}
1192
1193impl FromStr for EnvironmentId {
1194    type Err = InvalidEnvironmentIdError;
1195
1196    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1197        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1198            Regex::new(
1199                "^(?P<cloud_provider>[[:alnum:]]+)-\
1200                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1201                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1202                  (?P<ordinal>\\d{1,8})$"
1203            ).unwrap()
1204        });
1205        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1206        Ok(EnvironmentId {
1207            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1208            cloud_provider_region: captures["cloud_provider_region"].into(),
1209            organization_id: captures["organization_id"]
1210                .parse()
1211                .map_err(|_| InvalidEnvironmentIdError)?,
1212            ordinal: captures["ordinal"]
1213                .parse()
1214                .map_err(|_| InvalidEnvironmentIdError)?,
1215        })
1216    }
1217}
1218
1219/// The error type for [`EnvironmentId::from_str`].
1220#[derive(Debug, Clone, PartialEq)]
1221pub struct InvalidEnvironmentIdError;
1222
1223impl fmt::Display for InvalidEnvironmentIdError {
1224    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1225        f.write_str("invalid environment ID")
1226    }
1227}
1228
1229impl Error for InvalidEnvironmentIdError {}
1230
1231impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1232    fn from(_: InvalidCloudProviderError) -> Self {
1233        InvalidEnvironmentIdError
1234    }
1235}
1236
1237/// An error returned by the catalog.
1238#[derive(Clone, Debug, Eq, PartialEq)]
1239pub enum CatalogError {
1240    /// Unknown database.
1241    UnknownDatabase(String),
1242    /// Database already exists.
1243    DatabaseAlreadyExists(String),
1244    /// Unknown schema.
1245    UnknownSchema(String),
1246    /// Schema already exists.
1247    SchemaAlreadyExists(String),
1248    /// Unknown role.
1249    UnknownRole(String),
1250    /// Role already exists.
1251    RoleAlreadyExists(String),
1252    /// Network Policy already exists.
1253    NetworkPolicyAlreadyExists(String),
1254    /// Unknown cluster.
1255    UnknownCluster(String),
1256    /// Unexpected builtin cluster.
1257    UnexpectedBuiltinCluster(String),
1258    /// Unexpected builtin cluster.
1259    UnexpectedBuiltinClusterType(String),
1260    /// Cluster already exists.
1261    ClusterAlreadyExists(String),
1262    /// Unknown cluster replica.
1263    UnknownClusterReplica(String),
1264    /// Unknown cluster replica size.
1265    UnknownClusterReplicaSize(String),
1266    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1267    DuplicateReplica(String, String),
1268    /// Unknown item.
1269    UnknownItem(String),
1270    /// Item already exists.
1271    ItemAlreadyExists(CatalogItemId, String),
1272    /// Unknown function.
1273    UnknownFunction {
1274        /// The identifier of the function we couldn't find
1275        name: String,
1276        /// A suggested alternative to the named function.
1277        alternative: Option<String>,
1278    },
1279    /// Unknown type.
1280    UnknownType {
1281        /// The identifier of the type we couldn't find.
1282        name: String,
1283    },
1284    /// Unknown connection.
1285    UnknownConnection(String),
1286    /// Unknown network policy.
1287    UnknownNetworkPolicy(String),
1288    /// Expected the catalog item to have the given type, but it did not.
1289    UnexpectedType {
1290        /// The item's name.
1291        name: String,
1292        /// The actual type of the item.
1293        actual_type: CatalogItemType,
1294        /// The expected type of the item.
1295        expected_type: CatalogItemType,
1296    },
1297    /// Invalid attempt to depend on a non-dependable item.
1298    InvalidDependency {
1299        /// The invalid item's name.
1300        name: String,
1301        /// The invalid item's type.
1302        typ: CatalogItemType,
1303    },
1304    /// Ran out of unique IDs.
1305    IdExhaustion,
1306    /// Ran out of unique OIDs.
1307    OidExhaustion,
1308    /// Timeline already exists.
1309    TimelineAlreadyExists(String),
1310    /// Id Allocator already exists.
1311    IdAllocatorAlreadyExists(String),
1312    /// Config already exists.
1313    ConfigAlreadyExists(String),
1314    /// Builtin migrations failed.
1315    FailedBuiltinSchemaMigration(String),
1316    /// StorageCollectionMetadata already exists.
1317    StorageCollectionMetadataAlreadyExists(GlobalId),
1318}
1319
1320impl fmt::Display for CatalogError {
1321    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1322        match self {
1323            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1324            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1325            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1326            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1327            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1328            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1329            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1330            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1331            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1332            Self::NetworkPolicyAlreadyExists(name) => {
1333                write!(f, "network policy '{name}' already exists")
1334            }
1335            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1336            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1337            Self::UnexpectedBuiltinCluster(name) => {
1338                write!(f, "Unexpected builtin cluster '{}'", name)
1339            }
1340            Self::UnexpectedBuiltinClusterType(name) => {
1341                write!(f, "Unexpected builtin cluster type'{}'", name)
1342            }
1343            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1344            Self::UnknownClusterReplica(name) => {
1345                write!(f, "unknown cluster replica '{}'", name)
1346            }
1347            Self::UnknownClusterReplicaSize(name) => {
1348                write!(f, "unknown cluster replica size '{}'", name)
1349            }
1350            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1351                f,
1352                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1353            ),
1354            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1355            Self::ItemAlreadyExists(_gid, name) => {
1356                write!(f, "catalog item '{name}' already exists")
1357            }
1358            Self::UnexpectedType {
1359                name,
1360                actual_type,
1361                expected_type,
1362            } => {
1363                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1364            }
1365            Self::InvalidDependency { name, typ } => write!(
1366                f,
1367                "catalog item '{}' is {} {} and so cannot be depended upon",
1368                name,
1369                if matches!(typ, CatalogItemType::Index) {
1370                    "an"
1371                } else {
1372                    "a"
1373                },
1374                typ,
1375            ),
1376            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1377            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1378            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1379            Self::IdAllocatorAlreadyExists(name) => {
1380                write!(f, "ID allocator '{name}' already exists")
1381            }
1382            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1383            Self::FailedBuiltinSchemaMigration(objects) => {
1384                write!(f, "failed to migrate schema of builtin objects: {objects}")
1385            }
1386            Self::StorageCollectionMetadataAlreadyExists(key) => {
1387                write!(f, "storage metadata for '{key}' already exists")
1388            }
1389        }
1390    }
1391}
1392
1393impl CatalogError {
1394    /// Returns any applicable hints for [`CatalogError`].
1395    pub fn hint(&self) -> Option<String> {
1396        match self {
1397            CatalogError::UnknownFunction { alternative, .. } => {
1398                match alternative {
1399                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1400                    Some(alt) => Some(format!("Try using {alt}")),
1401                }
1402            }
1403            _ => None,
1404        }
1405    }
1406}
1407
1408impl Error for CatalogError {}
1409
1410// Enum variant docs would be useless here.
1411#[allow(missing_docs)]
1412#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1413/// The types of objects stored in the catalog.
1414pub enum ObjectType {
1415    Table,
1416    View,
1417    MaterializedView,
1418    Source,
1419    Sink,
1420    Index,
1421    Type,
1422    Role,
1423    Cluster,
1424    ClusterReplica,
1425    Secret,
1426    Connection,
1427    Database,
1428    Schema,
1429    Func,
1430    ContinualTask,
1431    NetworkPolicy,
1432}
1433
1434impl ObjectType {
1435    /// Reports if the object type can be treated as a relation.
1436    pub fn is_relation(&self) -> bool {
1437        match self {
1438            ObjectType::Table
1439            | ObjectType::View
1440            | ObjectType::MaterializedView
1441            | ObjectType::Source
1442            | ObjectType::ContinualTask => true,
1443            ObjectType::Sink
1444            | ObjectType::Index
1445            | ObjectType::Type
1446            | ObjectType::Secret
1447            | ObjectType::Connection
1448            | ObjectType::Func
1449            | ObjectType::Database
1450            | ObjectType::Schema
1451            | ObjectType::Cluster
1452            | ObjectType::ClusterReplica
1453            | ObjectType::Role
1454            | ObjectType::NetworkPolicy => false,
1455        }
1456    }
1457}
1458
1459impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1460    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1461        match value {
1462            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1463            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1464            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1465            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1466            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1467            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1468            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1469            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1470            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1471            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1472            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1473            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1474            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1475            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1476            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1477            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1478            mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask,
1479            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1480        }
1481    }
1482}
1483
1484impl From<CommentObjectId> for ObjectType {
1485    fn from(value: CommentObjectId) -> ObjectType {
1486        match value {
1487            CommentObjectId::Table(_) => ObjectType::Table,
1488            CommentObjectId::View(_) => ObjectType::View,
1489            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1490            CommentObjectId::Source(_) => ObjectType::Source,
1491            CommentObjectId::Sink(_) => ObjectType::Sink,
1492            CommentObjectId::Index(_) => ObjectType::Index,
1493            CommentObjectId::Func(_) => ObjectType::Func,
1494            CommentObjectId::Connection(_) => ObjectType::Connection,
1495            CommentObjectId::Type(_) => ObjectType::Type,
1496            CommentObjectId::Secret(_) => ObjectType::Secret,
1497            CommentObjectId::Role(_) => ObjectType::Role,
1498            CommentObjectId::Database(_) => ObjectType::Database,
1499            CommentObjectId::Schema(_) => ObjectType::Schema,
1500            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1501            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1502            CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1503            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1504        }
1505    }
1506}
1507
1508impl Display for ObjectType {
1509    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1510        f.write_str(match self {
1511            ObjectType::Table => "TABLE",
1512            ObjectType::View => "VIEW",
1513            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1514            ObjectType::Source => "SOURCE",
1515            ObjectType::Sink => "SINK",
1516            ObjectType::Index => "INDEX",
1517            ObjectType::Type => "TYPE",
1518            ObjectType::Role => "ROLE",
1519            ObjectType::Cluster => "CLUSTER",
1520            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1521            ObjectType::Secret => "SECRET",
1522            ObjectType::Connection => "CONNECTION",
1523            ObjectType::Database => "DATABASE",
1524            ObjectType::Schema => "SCHEMA",
1525            ObjectType::Func => "FUNCTION",
1526            ObjectType::ContinualTask => "CONTINUAL TASK",
1527            ObjectType::NetworkPolicy => "NETWORK POLICY",
1528        })
1529    }
1530}
1531
1532#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1533/// The types of objects in the system.
1534pub enum SystemObjectType {
1535    /// Catalog object type.
1536    Object(ObjectType),
1537    /// Entire system.
1538    System,
1539}
1540
1541impl SystemObjectType {
1542    /// Reports if the object type can be treated as a relation.
1543    pub fn is_relation(&self) -> bool {
1544        match self {
1545            SystemObjectType::Object(object_type) => object_type.is_relation(),
1546            SystemObjectType::System => false,
1547        }
1548    }
1549}
1550
1551impl Display for SystemObjectType {
1552    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1553        match self {
1554            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1555            SystemObjectType::System => f.write_str("SYSTEM"),
1556        }
1557    }
1558}
1559
1560/// Enum used to format object names in error messages.
1561#[derive(Debug, Clone, PartialEq, Eq)]
1562pub enum ErrorMessageObjectDescription {
1563    /// The name of a specific object.
1564    Object {
1565        /// Type of object.
1566        object_type: ObjectType,
1567        /// Name of object.
1568        object_name: Option<String>,
1569    },
1570    /// The name of the entire system.
1571    System,
1572}
1573
1574impl ErrorMessageObjectDescription {
1575    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1576    pub fn from_id(
1577        object_id: &ObjectId,
1578        catalog: &dyn SessionCatalog,
1579    ) -> ErrorMessageObjectDescription {
1580        let object_name = match object_id {
1581            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1582            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1583                .get_cluster_replica(*cluster_id, *replica_id)
1584                .name()
1585                .to_string(),
1586            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1587            ObjectId::Schema((database_spec, schema_spec)) => {
1588                let name = catalog.get_schema(database_spec, schema_spec).name();
1589                catalog.resolve_full_schema_name(name).to_string()
1590            }
1591            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1592            ObjectId::Item(id) => {
1593                let name = catalog.get_item(id).name();
1594                catalog.resolve_full_name(name).to_string()
1595            }
1596            ObjectId::NetworkPolicy(network_policy_id) => catalog
1597                .get_network_policy(network_policy_id)
1598                .name()
1599                .to_string(),
1600        };
1601        ErrorMessageObjectDescription::Object {
1602            object_type: catalog.get_object_type(object_id),
1603            object_name: Some(object_name),
1604        }
1605    }
1606
1607    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1608    pub fn from_sys_id(
1609        object_id: &SystemObjectId,
1610        catalog: &dyn SessionCatalog,
1611    ) -> ErrorMessageObjectDescription {
1612        match object_id {
1613            SystemObjectId::Object(object_id) => {
1614                ErrorMessageObjectDescription::from_id(object_id, catalog)
1615            }
1616            SystemObjectId::System => ErrorMessageObjectDescription::System,
1617        }
1618    }
1619
1620    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1621    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1622        match object_type {
1623            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1624                object_type,
1625                object_name: None,
1626            },
1627            SystemObjectType::System => ErrorMessageObjectDescription::System,
1628        }
1629    }
1630}
1631
1632impl Display for ErrorMessageObjectDescription {
1633    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1634        match self {
1635            ErrorMessageObjectDescription::Object {
1636                object_type,
1637                object_name,
1638            } => {
1639                let object_name = object_name
1640                    .as_ref()
1641                    .map(|object_name| format!(" {}", object_name.quoted()))
1642                    .unwrap_or_else(|| "".to_string());
1643                write!(f, "{object_type}{object_name}")
1644            }
1645            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1646        }
1647    }
1648}
1649
1650#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1651// These attributes are needed because the key of a map must be a string. We also
1652// get the added benefit of flattening this struct in it's serialized form.
1653#[serde(into = "BTreeMap<String, RoleId>")]
1654#[serde(try_from = "BTreeMap<String, RoleId>")]
1655/// Represents the grantee and a grantor of a role membership.
1656pub struct RoleMembership {
1657    /// Key is the role that some role is a member of, value is the grantor role ID.
1658    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1659    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1660    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1661    // migration.
1662    pub map: BTreeMap<RoleId, RoleId>,
1663}
1664
1665impl RoleMembership {
1666    /// Creates a new [`RoleMembership`].
1667    pub fn new() -> RoleMembership {
1668        RoleMembership {
1669            map: BTreeMap::new(),
1670        }
1671    }
1672}
1673
1674impl From<RoleMembership> for BTreeMap<String, RoleId> {
1675    fn from(value: RoleMembership) -> Self {
1676        value
1677            .map
1678            .into_iter()
1679            .map(|(k, v)| (k.to_string(), v))
1680            .collect()
1681    }
1682}
1683
1684impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1685    type Error = anyhow::Error;
1686
1687    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1688        Ok(RoleMembership {
1689            map: value
1690                .into_iter()
1691                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1692                .collect::<Result<_, anyhow::Error>>()?,
1693        })
1694    }
1695}
1696
1697/// Specification for objects that will be affected by a default privilege.
1698#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1699pub struct DefaultPrivilegeObject {
1700    /// The role id that created the object.
1701    pub role_id: RoleId,
1702    /// The database that the object is created in if Some, otherwise all databases.
1703    pub database_id: Option<DatabaseId>,
1704    /// The schema that the object is created in if Some, otherwise all databases.
1705    pub schema_id: Option<SchemaId>,
1706    /// The type of object.
1707    pub object_type: ObjectType,
1708}
1709
1710impl DefaultPrivilegeObject {
1711    /// Creates a new [`DefaultPrivilegeObject`].
1712    pub fn new(
1713        role_id: RoleId,
1714        database_id: Option<DatabaseId>,
1715        schema_id: Option<SchemaId>,
1716        object_type: ObjectType,
1717    ) -> DefaultPrivilegeObject {
1718        DefaultPrivilegeObject {
1719            role_id,
1720            database_id,
1721            schema_id,
1722            object_type,
1723        }
1724    }
1725}
1726
1727impl std::fmt::Display for DefaultPrivilegeObject {
1728    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1729        // TODO: Don't just wrap Debug.
1730        write!(f, "{self:?}")
1731    }
1732}
1733
1734/// Specification for the privileges that will be granted from default privileges.
1735#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1736pub struct DefaultPrivilegeAclItem {
1737    /// The role that will receive the privileges.
1738    pub grantee: RoleId,
1739    /// The specific privileges granted.
1740    pub acl_mode: AclMode,
1741}
1742
1743impl DefaultPrivilegeAclItem {
1744    /// Creates a new [`DefaultPrivilegeAclItem`].
1745    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1746        DefaultPrivilegeAclItem { grantee, acl_mode }
1747    }
1748
1749    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1750    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1751        MzAclItem {
1752            grantee: self.grantee,
1753            grantor,
1754            acl_mode: self.acl_mode,
1755        }
1756    }
1757}
1758
1759/// Which builtins to return in `BUILTINS::iter`.
1760///
1761/// All calls to `BUILTINS::iter` within the lifetime of an environmentd process
1762/// must provide an equal `BuiltinsConfig`. It is not allowed to change
1763/// dynamically.
1764#[derive(Debug, Clone)]
1765pub struct BuiltinsConfig {
1766    /// If true, include system builtin continual tasks.
1767    pub include_continual_tasks: bool,
1768}
1769
1770#[cfg(test)]
1771mod tests {
1772    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1773
1774    #[mz_ore::test]
1775    fn test_environment_id() {
1776        for (input, expected) in [
1777            (
1778                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1779                Ok(EnvironmentId {
1780                    cloud_provider: CloudProvider::Local,
1781                    cloud_provider_region: "az1".into(),
1782                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1783                    ordinal: 452,
1784                }),
1785            ),
1786            (
1787                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1788                Ok(EnvironmentId {
1789                    cloud_provider: CloudProvider::Aws,
1790                    cloud_provider_region: "us-east-1".into(),
1791                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1792                    ordinal: 0,
1793                }),
1794            ),
1795            (
1796                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1797                Ok(EnvironmentId {
1798                    cloud_provider: CloudProvider::Gcp,
1799                    cloud_provider_region: "us-central1".into(),
1800                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1801                    ordinal: 0,
1802                }),
1803            ),
1804            (
1805                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1806                Ok(EnvironmentId {
1807                    cloud_provider: CloudProvider::Azure,
1808                    cloud_provider_region: "australiaeast".into(),
1809                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1810                    ordinal: 0,
1811                }),
1812            ),
1813            (
1814                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1815                Ok(EnvironmentId {
1816                    cloud_provider: CloudProvider::Generic,
1817                    cloud_provider_region: "moon-station-11-darkside".into(),
1818                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1819                    ordinal: 0,
1820                }),
1821            ),
1822            ("", Err(InvalidEnvironmentIdError)),
1823            (
1824                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1825                Err(InvalidEnvironmentIdError),
1826            ),
1827            (
1828                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1829                Err(InvalidEnvironmentIdError),
1830            ),
1831            (
1832                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1833                Err(InvalidEnvironmentIdError),
1834            ),
1835        ] {
1836            let actual = input.parse();
1837            assert_eq!(expected, actual, "input = {}", input);
1838            if let Ok(actual) = actual {
1839                assert_eq!(input, actual.to_string(), "input = {}", input);
1840            }
1841        }
1842    }
1843}