mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::num::NonZeroU32;
20use std::str::FromStr;
21use std::sync::LazyLock;
22use std::time::{Duration, Instant};
23
24use chrono::{DateTime, Utc};
25use mz_auth::password::Password;
26use mz_build_info::BuildInfo;
27use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
28use mz_controller_types::{ClusterId, ReplicaId};
29use mz_expr::MirScalarExpr;
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_ore::str::StrExt;
32use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
33use mz_repr::explain::ExprHumanizer;
34use mz_repr::network_policy_id::NetworkPolicyId;
35use mz_repr::role_id::RoleId;
36use mz_repr::{
37    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
38};
39use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
40use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
41use mz_storage_types::connections::{Connection, ConnectionContext};
42use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
43use proptest_derive::Arbitrary;
44use regex::Regex;
45use serde::{Deserialize, Serialize};
46use uuid::Uuid;
47
48use crate::func::Func;
49use crate::names::{
50    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
51    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
52    SchemaSpecifier, SystemObjectId,
53};
54use crate::plan::statement::StatementDesc;
55use crate::plan::statement::ddl::PlannedRoleAttributes;
56use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
57use crate::session::vars::{OwnedVarInput, SystemVars};
58
59/// A catalog keeps track of SQL objects and session state available to the
60/// planner.
61///
62/// The `sql` crate is agnostic to any particular catalog implementation. This
63/// trait describes the required interface.
64///
65/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
66/// catalog contains databases, databases contain schemas, and schemas contain
67/// catalog items, like sources, sinks, view, and indexes.
68///
69/// There are two classes of operations provided by a catalog:
70///
71///   * Resolution operations, like [`resolve_item`]. These fill in missing name
72///     components based upon connection defaults, e.g., resolving the partial
73///     name `view42` to the fully-specified name `materialize.public.view42`.
74///
75///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
76///     metadata about a catalog entity based on a fully-specified name that is
77///     known to be valid (i.e., because the name was successfully resolved, or
78///     was constructed based on the output of a prior lookup operation). These
79///     functions panic if called with invalid input.
80///
81///   * Session management, such as managing variables' states and adding
82///     notices to the session.
83///
84/// [`get_databases`]: SessionCatalog::get_databases
85/// [`get_item`]: SessionCatalog::get_item
86/// [`resolve_item`]: SessionCatalog::resolve_item
87pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
88    /// Returns the id of the role that is issuing the query.
89    fn active_role_id(&self) -> &RoleId;
90
91    /// Returns the database to use if one is not explicitly specified.
92    fn active_database_name(&self) -> Option<&str> {
93        self.active_database()
94            .map(|id| self.get_database(id))
95            .map(|db| db.name())
96    }
97
98    /// Returns the database to use if one is not explicitly specified.
99    fn active_database(&self) -> Option<&DatabaseId>;
100
101    /// Returns the cluster to use if one is not explicitly specified.
102    fn active_cluster(&self) -> &str;
103
104    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
105    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
106
107    /// Returns the descriptor of the named prepared statement on the session, or
108    /// None if the prepared statement does not exist.
109    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
110
111    /// Retrieves a reference to the specified portal's descriptor.
112    ///
113    /// If there is no such portal, returns `None`.
114    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc>;
115
116    /// Resolves the named database.
117    ///
118    /// If `database_name` exists in the catalog, it returns a reference to the
119    /// resolved database; otherwise it returns an error.
120    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
121
122    /// Gets a database by its ID.
123    ///
124    /// Panics if `id` does not specify a valid database.
125    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
126
127    /// Gets all databases.
128    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
129
130    /// Resolves a partially-specified schema name.
131    ///
132    /// If the schema exists in the catalog, it returns a reference to the
133    /// resolved schema; otherwise it returns an error.
134    fn resolve_schema(
135        &self,
136        database_name: Option<&str>,
137        schema_name: &str,
138    ) -> Result<&dyn CatalogSchema, CatalogError>;
139
140    /// Resolves a schema name within a specified database.
141    ///
142    /// If the schema exists in the database, it returns a reference to the
143    /// resolved schema; otherwise it returns an error.
144    fn resolve_schema_in_database(
145        &self,
146        database_spec: &ResolvedDatabaseSpecifier,
147        schema_name: &str,
148    ) -> Result<&dyn CatalogSchema, CatalogError>;
149
150    /// Gets a schema by its ID.
151    ///
152    /// Panics if `id` does not specify a valid schema.
153    fn get_schema(
154        &self,
155        database_spec: &ResolvedDatabaseSpecifier,
156        schema_spec: &SchemaSpecifier,
157    ) -> &dyn CatalogSchema;
158
159    /// Gets all schemas.
160    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
161
162    /// Gets the mz_internal schema id.
163    fn get_mz_internal_schema_id(&self) -> SchemaId;
164
165    /// Gets the mz_unsafe schema id.
166    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
167
168    /// Returns true if `schema` is an internal system schema, false otherwise
169    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
170
171    /// Resolves the named role.
172    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
173
174    /// Resolves the named network policy.
175    fn resolve_network_policy(
176        &self,
177        network_policy_name: &str,
178    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
179
180    /// Gets a role by its ID.
181    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
182
183    /// Gets a role by its ID.
184    ///
185    /// Panics if `id` does not specify a valid role.
186    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
187
188    /// Gets all roles.
189    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
190
191    /// Gets the id of the `mz_system` role.
192    fn mz_system_role_id(&self) -> RoleId;
193
194    /// Collects all role IDs that `id` is transitively a member of.
195    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
196
197    /// Resolves the named cluster.
198    /// Gets a network_policy by its ID.
199    ///
200    /// Panics if `id` does not specify a valid role.
201    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
202
203    /// Gets all roles.
204    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
205
206    ///
207    /// If the provided name is `None`, resolves the currently active cluster.
208    fn resolve_cluster<'a, 'b>(
209        &'a self,
210        cluster_name: Option<&'b str>,
211    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
212
213    /// Resolves the named cluster replica.
214    fn resolve_cluster_replica<'a, 'b>(
215        &'a self,
216        cluster_replica_name: &'b QualifiedReplica,
217    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
218
219    /// Resolves a partially-specified item name, that is NOT a function or
220    /// type. (For resolving functions or types, please use
221    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
222    ///
223    /// If the partial name has a database component, it searches only the
224    /// specified database; otherwise, it searches the active database. If the
225    /// partial name has a schema component, it searches only the specified
226    /// schema; otherwise, it searches a default set of schemas within the
227    /// selected database. It returns an error if none of the searched schemas
228    /// contain an item whose name matches the item component of the partial
229    /// name.
230    ///
231    /// Note that it is not an error if the named item appears in more than one
232    /// of the search schemas. The catalog implementation must choose one.
233    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
234
235    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
236    /// functions within the catalog.
237    fn resolve_function(
238        &self,
239        item_name: &PartialItemName,
240    ) -> Result<&dyn CatalogItem, CatalogError>;
241
242    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
243    /// types within the catalog.
244    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
245
246    /// Resolves `name` to a type or item, preferring the type if both exist.
247    fn resolve_item_or_type(
248        &self,
249        name: &PartialItemName,
250    ) -> Result<&dyn CatalogItem, CatalogError> {
251        if let Ok(ty) = self.resolve_type(name) {
252            return Ok(ty);
253        }
254        self.resolve_item(name)
255    }
256
257    /// Gets a type named `name` from exactly one of the system schemas.
258    ///
259    /// # Panics
260    /// - If `name` is not an entry in any system schema
261    /// - If more than one system schema has an entry named `name`.
262    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
263
264    /// Gets an item by its ID.
265    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
266
267    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
268    /// exist.
269    ///
270    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
271    fn try_get_item_by_global_id<'a>(
272        &'a self,
273        id: &GlobalId,
274    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
275
276    /// Gets an item by its ID.
277    ///
278    /// Panics if `id` does not specify a valid item.
279    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
280
281    /// Gets an item by a [`GlobalId`].
282    ///
283    /// Panics if `id` does not specify a valid item.
284    ///
285    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
286    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
287
288    /// Gets all items.
289    fn get_items(&self) -> Vec<&dyn CatalogItem>;
290
291    /// Looks up an item by its name.
292    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
293
294    /// Looks up a type by its name.
295    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
296
297    /// Gets a cluster by ID.
298    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster<'_>;
299
300    /// Gets all clusters.
301    fn get_clusters(&self) -> Vec<&dyn CatalogCluster<'_>>;
302
303    /// Gets a cluster replica by ID.
304    fn get_cluster_replica(
305        &self,
306        cluster_id: ClusterId,
307        replica_id: ReplicaId,
308    ) -> &dyn CatalogClusterReplica<'_>;
309
310    /// Gets all cluster replicas.
311    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
312
313    /// Gets all system privileges.
314    fn get_system_privileges(&self) -> &PrivilegeMap;
315
316    /// Gets all default privileges.
317    fn get_default_privileges(
318        &self,
319    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
320
321    /// Finds a name like `name` that is not already in use.
322    ///
323    /// If `name` itself is available, it is returned unchanged.
324    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
325
326    /// Returns a fully qualified human readable name from fully qualified non-human readable name
327    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
328
329    /// Returns a fully qualified human readable schema name from fully qualified non-human
330    /// readable schema name
331    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
332
333    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
334    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
335
336    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
337    fn resolve_global_id(
338        &self,
339        item_id: &CatalogItemId,
340        version: RelationVersionSelector,
341    ) -> GlobalId;
342
343    /// Returns the configuration of the catalog.
344    fn config(&self) -> &CatalogConfig;
345
346    /// Returns the number of milliseconds since the system epoch. For normal use
347    /// this means the Unix epoch. This can safely be mocked in tests and start
348    /// at 0.
349    fn now(&self) -> EpochMillis;
350
351    /// Returns the set of supported AWS PrivateLink availability zone ids.
352    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
353
354    /// Returns system vars
355    fn system_vars(&self) -> &SystemVars;
356
357    /// Returns mutable system vars
358    ///
359    /// Clients should use this this method carefully, as changes to the backing
360    /// state here are not guarateed to be persisted. The motivating use case
361    /// for this method was ensuring that features are temporary turned on so
362    /// catalog rehydration does not break due to unsupported SQL syntax.
363    fn system_vars_mut(&mut self) -> &mut SystemVars;
364
365    /// Returns the [`RoleId`] of the owner of an object by its ID.
366    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
367
368    /// Returns the [`PrivilegeMap`] of the object.
369    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
370
371    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
372    ///
373    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
374    /// earlier in the list than the roots. This is particularly userful for the order to drop
375    /// objects.
376    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
377
378    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
379    ///
380    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
381    /// earlier in the list than `id`. This is particularly userful for the order to drop
382    /// objects.
383    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
384
385    /// Returns all possible privileges associated with an object type.
386    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
387
388    /// Returns the object type of `object_id`.
389    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
390
391    /// Returns the system object type of `id`.
392    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
393
394    /// Returns the minimal qualification required to unambiguously specify
395    /// `qualified_name`.
396    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
397
398    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
399    /// successfully executes.
400    fn add_notice(&self, notice: PlanNotice);
401
402    /// Returns the associated comments for the given `id`
403    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
404
405    /// Reports whether the specified cluster size is a modern "cc" size rather
406    /// than a legacy T-shirt size.
407    fn is_cluster_size_cc(&self, size: &str) -> bool;
408}
409
410/// Configuration associated with a catalog.
411#[derive(Debug, Clone)]
412pub struct CatalogConfig {
413    /// Returns the time at which the catalog booted.
414    pub start_time: DateTime<Utc>,
415    /// Returns the instant at which the catalog booted.
416    pub start_instant: Instant,
417    /// A random integer associated with this instance of the catalog.
418    ///
419    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
420    /// topics. Perhaps we can remove this when database-issues#977 is complete.
421    pub nonce: u64,
422    /// A persistent ID associated with the environment.
423    pub environment_id: EnvironmentId,
424    /// A transient UUID associated with this process.
425    pub session_id: Uuid,
426    /// Information about this build of Materialize.
427    pub build_info: &'static BuildInfo,
428    /// Default timestamp interval.
429    pub timestamp_interval: Duration,
430    /// Function that returns a wall clock now time; can safely be mocked to return
431    /// 0.
432    pub now: NowFn,
433    /// Context for source and sink connections.
434    pub connection_context: ConnectionContext,
435    /// Which system builtins to include. Not allowed to change dynamically.
436    pub builtins_cfg: BuiltinsConfig,
437    /// Helm chart version
438    pub helm_chart_version: Option<String>,
439}
440
441/// A database in a [`SessionCatalog`].
442pub trait CatalogDatabase {
443    /// Returns a fully-specified name of the database.
444    fn name(&self) -> &str;
445
446    /// Returns a stable ID for the database.
447    fn id(&self) -> DatabaseId;
448
449    /// Returns whether the database contains schemas.
450    fn has_schemas(&self) -> bool;
451
452    /// Returns the schemas of the database as a map from schema name to
453    /// schema ID.
454    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
455
456    /// Returns the schemas of the database.
457    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
458
459    /// Returns the ID of the owning role.
460    fn owner_id(&self) -> RoleId;
461
462    /// Returns the privileges associated with the database.
463    fn privileges(&self) -> &PrivilegeMap;
464}
465
466/// A schema in a [`SessionCatalog`].
467pub trait CatalogSchema {
468    /// Returns a fully-specified id of the database
469    fn database(&self) -> &ResolvedDatabaseSpecifier;
470
471    /// Returns a fully-specified name of the schema.
472    fn name(&self) -> &QualifiedSchemaName;
473
474    /// Returns a stable ID for the schema.
475    fn id(&self) -> &SchemaSpecifier;
476
477    /// Lists the `CatalogItem`s for the schema.
478    fn has_items(&self) -> bool;
479
480    /// Returns the IDs of the items in the schema.
481    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
482
483    /// Returns the ID of the owning role.
484    fn owner_id(&self) -> RoleId;
485
486    /// Returns the privileges associated with the schema.
487    fn privileges(&self) -> &PrivilegeMap;
488}
489
490/// Parameters used to modify password
491#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
492pub struct PasswordConfig {
493    /// The Password.
494    pub password: Password,
495    /// a non default iteration count for hashing the password.
496    pub scram_iterations: NonZeroU32,
497}
498
499/// A modification of a role password in the catalog
500#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
501pub enum PasswordAction {
502    /// Set a new password.
503    Set(PasswordConfig),
504    /// Remove the existing password.
505    Clear,
506    /// Leave the existing password unchanged.
507    NoChange,
508}
509
510/// A raw representation of attributes belonging to a [`CatalogRole`] that we might
511/// get as input from the user. This includes the password.
512/// This struct explicitly does not implement `Serialize` or `Deserialize` to avoid
513/// accidentally serializing passwords.
514#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
515pub struct RoleAttributesRaw {
516    /// Indicates whether the role has inheritance of privileges.
517    pub inherit: bool,
518    /// The raw password of the role. This is for self managed auth, not cloud.
519    pub password: Option<Password>,
520    /// Hash iterations used to securely store passwords. This is for self-managed auth
521    pub scram_iterations: Option<NonZeroU32>,
522    /// Whether or not this user is a superuser.
523    pub superuser: Option<bool>,
524    /// Whether this role is login
525    pub login: Option<bool>,
526    // Force use of constructor.
527    _private: (),
528}
529
530/// Attributes belonging to a [`CatalogRole`].
531#[derive(Debug, Clone, Eq, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Arbitrary)]
532pub struct RoleAttributes {
533    /// Indicates whether the role has inheritance of privileges.
534    pub inherit: bool,
535    /// Whether or not this user is a superuser.
536    pub superuser: Option<bool>,
537    /// Whether this role is login
538    pub login: Option<bool>,
539    // Force use of constructor.
540    _private: (),
541}
542
543impl RoleAttributesRaw {
544    /// Creates a new [`RoleAttributesRaw`] with default attributes.
545    pub const fn new() -> RoleAttributesRaw {
546        RoleAttributesRaw {
547            inherit: true,
548            password: None,
549            scram_iterations: None,
550            superuser: None,
551            login: None,
552            _private: (),
553        }
554    }
555
556    /// Adds all attributes excluding password.
557    pub const fn with_all(mut self) -> RoleAttributesRaw {
558        self.inherit = true;
559        self.superuser = Some(true);
560        self.login = Some(true);
561        self
562    }
563}
564
565impl RoleAttributes {
566    /// Creates a new [`RoleAttributes`] with default attributes.
567    pub const fn new() -> RoleAttributes {
568        RoleAttributes {
569            inherit: true,
570            superuser: None,
571            login: None,
572            _private: (),
573        }
574    }
575
576    /// Adds all attributes except password.
577    pub const fn with_all(mut self) -> RoleAttributes {
578        self.inherit = true;
579        self.superuser = Some(true);
580        self.login = Some(true);
581        self
582    }
583
584    /// Returns whether or not the role has inheritence of privileges.
585    pub const fn is_inherit(&self) -> bool {
586        self.inherit
587    }
588}
589
590impl From<RoleAttributesRaw> for RoleAttributes {
591    fn from(
592        RoleAttributesRaw {
593            inherit,
594            superuser,
595            login,
596            ..
597        }: RoleAttributesRaw,
598    ) -> RoleAttributes {
599        RoleAttributes {
600            inherit,
601            superuser,
602            login,
603            _private: (),
604        }
605    }
606}
607
608impl From<RoleAttributes> for RoleAttributesRaw {
609    fn from(
610        RoleAttributes {
611            inherit,
612            superuser,
613            login,
614            ..
615        }: RoleAttributes,
616    ) -> RoleAttributesRaw {
617        RoleAttributesRaw {
618            inherit,
619            password: None,
620            scram_iterations: None,
621            superuser,
622            login,
623            _private: (),
624        }
625    }
626}
627
628impl From<PlannedRoleAttributes> for RoleAttributesRaw {
629    fn from(
630        PlannedRoleAttributes {
631            inherit,
632            password,
633            scram_iterations,
634            superuser,
635            login,
636            ..
637        }: PlannedRoleAttributes,
638    ) -> RoleAttributesRaw {
639        let default_attributes = RoleAttributesRaw::new();
640        RoleAttributesRaw {
641            inherit: inherit.unwrap_or(default_attributes.inherit),
642            password,
643            scram_iterations,
644            superuser,
645            login,
646            _private: (),
647        }
648    }
649}
650
651/// Default variable values for a [`CatalogRole`].
652#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
653pub struct RoleVars {
654    /// Map of variable names to their value.
655    pub map: BTreeMap<String, OwnedVarInput>,
656}
657
658/// A role in a [`SessionCatalog`].
659pub trait CatalogRole {
660    /// Returns a fully-specified name of the role.
661    fn name(&self) -> &str;
662
663    /// Returns a stable ID for the role.
664    fn id(&self) -> RoleId;
665
666    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
667    /// membership.
668    ///
669    /// Key is the role that some role is a member of, value is the grantor role ID.
670    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
671
672    /// Returns the attributes associated with this role.
673    fn attributes(&self) -> &RoleAttributes;
674
675    /// Returns all variables that this role has a default value stored for.
676    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
677}
678
679/// A network policy in a [`SessionCatalog`].
680pub trait CatalogNetworkPolicy {
681    /// Returns a fully-specified name of the NetworkPolicy.
682    fn name(&self) -> &str;
683
684    /// Returns a stable ID for the NetworkPolicy.
685    fn id(&self) -> NetworkPolicyId;
686
687    /// Returns the ID of the owning NetworkPolicy.
688    fn owner_id(&self) -> RoleId;
689
690    /// Returns the privileges associated with the NetworkPolicy.
691    fn privileges(&self) -> &PrivilegeMap;
692}
693
694/// A cluster in a [`SessionCatalog`].
695pub trait CatalogCluster<'a> {
696    /// Returns a fully-specified name of the cluster.
697    fn name(&self) -> &str;
698
699    /// Returns a stable ID for the cluster.
700    fn id(&self) -> ClusterId;
701
702    /// Returns the objects that are bound to this cluster.
703    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
704
705    /// Returns the replicas of the cluster as a map from replica name to
706    /// replica ID.
707    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
708
709    /// Returns the replicas of the cluster.
710    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
711
712    /// Returns the replica belonging to the cluster with replica ID `id`.
713    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_>;
714
715    /// Returns the ID of the owning role.
716    fn owner_id(&self) -> RoleId;
717
718    /// Returns the privileges associated with the cluster.
719    fn privileges(&self) -> &PrivilegeMap;
720
721    /// Returns true if this cluster is a managed cluster.
722    fn is_managed(&self) -> bool;
723
724    /// Returns the size of the cluster, if the cluster is a managed cluster.
725    fn managed_size(&self) -> Option<&str>;
726
727    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
728    fn schedule(&self) -> Option<&ClusterSchedule>;
729
730    /// Try to convert this cluster into a [`CreateClusterPlan`].
731    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
732    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
733}
734
735/// A cluster replica in a [`SessionCatalog`]
736pub trait CatalogClusterReplica<'a>: Debug {
737    /// Returns the name of the cluster replica.
738    fn name(&self) -> &str;
739
740    /// Returns a stable ID for the cluster that the replica belongs to.
741    fn cluster_id(&self) -> ClusterId;
742
743    /// Returns a stable ID for the replica.
744    fn replica_id(&self) -> ReplicaId;
745
746    /// Returns the ID of the owning role.
747    fn owner_id(&self) -> RoleId;
748
749    /// Returns whether or not the replica is internal
750    fn internal(&self) -> bool;
751}
752
753/// An item in a [`SessionCatalog`].
754///
755/// Note that "item" has a very specific meaning in the context of a SQL
756/// catalog, and refers to the various entities that belong to a schema.
757pub trait CatalogItem {
758    /// Returns the fully qualified name of the catalog item.
759    fn name(&self) -> &QualifiedItemName;
760
761    /// Returns the [`CatalogItemId`] for the item.
762    fn id(&self) -> CatalogItemId;
763
764    /// Returns the [`GlobalId`]s associated with this item.
765    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
766
767    /// Returns the catalog item's OID.
768    fn oid(&self) -> u32;
769
770    /// Returns the resolved function.
771    ///
772    /// If the catalog item is not of a type that produces functions (i.e.,
773    /// anything other than a function), it returns an error.
774    fn func(&self) -> Result<&'static Func, CatalogError>;
775
776    /// Returns the resolved source connection.
777    ///
778    /// If the catalog item is not of a type that contains a `SourceDesc`
779    /// (i.e., anything other than sources), it returns an error.
780    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
781
782    /// Returns the resolved connection.
783    ///
784    /// If the catalog item is not a connection, it returns an error.
785    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
786
787    /// Returns the type of the catalog item.
788    fn item_type(&self) -> CatalogItemType;
789
790    /// A normalized SQL statement that describes how to create the catalog
791    /// item.
792    fn create_sql(&self) -> &str;
793
794    /// Returns the IDs of the catalog items upon which this catalog item
795    /// directly references.
796    fn references(&self) -> &ResolvedIds;
797
798    /// Returns the IDs of the catalog items upon which this catalog item
799    /// depends.
800    fn uses(&self) -> BTreeSet<CatalogItemId>;
801
802    /// Returns the IDs of the catalog items that directly reference this catalog item.
803    fn referenced_by(&self) -> &[CatalogItemId];
804
805    /// Returns the IDs of the catalog items that depend upon this catalog item.
806    fn used_by(&self) -> &[CatalogItemId];
807
808    /// Reports whether this catalog entry is a subsource and, if it is, the
809    /// ingestion it is an export of, as well as the item it exports.
810    fn subsource_details(
811        &self,
812    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
813
814    /// Reports whether this catalog entry is a source export and, if it is, the
815    /// ingestion it is an export of, as well as the item it exports.
816    fn source_export_details(
817        &self,
818    ) -> Option<(
819        CatalogItemId,
820        &UnresolvedItemName,
821        &SourceExportDetails,
822        &SourceExportDataConfig<ReferencedConnection>,
823    )>;
824
825    /// Reports whether this catalog item is a progress source.
826    fn is_progress_source(&self) -> bool;
827
828    /// If this catalog item is a source, it return the IDs of its progress collection.
829    fn progress_id(&self) -> Option<CatalogItemId>;
830
831    /// Returns the index details associated with the catalog item, if the
832    /// catalog item is an index.
833    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
834
835    /// Returns the column defaults associated with the catalog item, if the
836    /// catalog item is a table that accepts writes.
837    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
838
839    /// The item this catalog item replaces, if any.
840    fn replacement_target(&self) -> Option<CatalogItemId>;
841
842    /// Returns the type information associated with the catalog item, if the
843    /// catalog item is a type.
844    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
845
846    /// Returns the ID of the owning role.
847    fn owner_id(&self) -> RoleId;
848
849    /// Returns the privileges associated with the item.
850    fn privileges(&self) -> &PrivilegeMap;
851
852    /// Returns the cluster the item belongs to.
853    fn cluster_id(&self) -> Option<ClusterId>;
854
855    /// Returns the [`CatalogCollectionItem`] for a specific version of this
856    /// [`CatalogItem`].
857    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
858
859    /// The latest version of this item, if it's version-able.
860    fn latest_version(&self) -> Option<RelationVersion>;
861}
862
863/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
864/// refers to.
865pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
866    /// Returns a description of the result set produced by the catalog item.
867    ///
868    /// If the catalog item is not of a type that produces data (i.e., a sink or
869    /// an index), it returns an error.
870    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, CatalogError>;
871
872    /// The [`GlobalId`] for this item.
873    fn global_id(&self) -> GlobalId;
874}
875
876/// The type of a [`CatalogItem`].
877#[derive(Debug, Deserialize, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
878pub enum CatalogItemType {
879    /// A table.
880    Table,
881    /// A source.
882    Source,
883    /// A sink.
884    Sink,
885    /// A view.
886    View,
887    /// A materialized view.
888    MaterializedView,
889    /// An index.
890    Index,
891    /// A type.
892    Type,
893    /// A func.
894    Func,
895    /// A secret.
896    Secret,
897    /// A connection.
898    Connection,
899    /// A continual task.
900    ContinualTask,
901}
902
903impl CatalogItemType {
904    /// Reports whether the given type of item conflicts with items of type
905    /// `CatalogItemType::Type`.
906    ///
907    /// In PostgreSQL, even though types live in a separate namespace from other
908    /// schema objects, creating a table, view, or materialized view creates a
909    /// type named after that relation. This prevents creating a type with the
910    /// same name as a relational object, even though types and relational
911    /// objects live in separate namespaces. (Indexes are even weirder; while
912    /// they don't get a type with the same name, they get an entry in
913    /// `pg_class` that prevents *record* types of the same name as the index,
914    /// but not other types of types, like enums.)
915    ///
916    /// We don't presently construct types that mirror relational objects,
917    /// though we likely will need to in the future for full PostgreSQL
918    /// compatibility (see database-issues#7142). For now, we use this method to
919    /// prevent creating types and relational objects that have the same name, so
920    /// that it is a backwards compatible change in the future to introduce a
921    /// type named after each relational object in the system.
922    pub fn conflicts_with_type(&self) -> bool {
923        match self {
924            CatalogItemType::Table => true,
925            CatalogItemType::Source => true,
926            CatalogItemType::View => true,
927            CatalogItemType::MaterializedView => true,
928            CatalogItemType::Index => true,
929            CatalogItemType::Type => true,
930            CatalogItemType::Sink => false,
931            CatalogItemType::Func => false,
932            CatalogItemType::Secret => false,
933            CatalogItemType::Connection => false,
934            CatalogItemType::ContinualTask => true,
935        }
936    }
937}
938
939impl fmt::Display for CatalogItemType {
940    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
941        match self {
942            CatalogItemType::Table => f.write_str("table"),
943            CatalogItemType::Source => f.write_str("source"),
944            CatalogItemType::Sink => f.write_str("sink"),
945            CatalogItemType::View => f.write_str("view"),
946            CatalogItemType::MaterializedView => f.write_str("materialized view"),
947            CatalogItemType::Index => f.write_str("index"),
948            CatalogItemType::Type => f.write_str("type"),
949            CatalogItemType::Func => f.write_str("func"),
950            CatalogItemType::Secret => f.write_str("secret"),
951            CatalogItemType::Connection => f.write_str("connection"),
952            CatalogItemType::ContinualTask => f.write_str("continual task"),
953        }
954    }
955}
956
957impl From<CatalogItemType> for ObjectType {
958    fn from(value: CatalogItemType) -> Self {
959        match value {
960            CatalogItemType::Table => ObjectType::Table,
961            CatalogItemType::Source => ObjectType::Source,
962            CatalogItemType::Sink => ObjectType::Sink,
963            CatalogItemType::View => ObjectType::View,
964            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
965            CatalogItemType::Index => ObjectType::Index,
966            CatalogItemType::Type => ObjectType::Type,
967            CatalogItemType::Func => ObjectType::Func,
968            CatalogItemType::Secret => ObjectType::Secret,
969            CatalogItemType::Connection => ObjectType::Connection,
970            CatalogItemType::ContinualTask => ObjectType::ContinualTask,
971        }
972    }
973}
974
975impl From<CatalogItemType> for mz_audit_log::ObjectType {
976    fn from(value: CatalogItemType) -> Self {
977        match value {
978            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
979            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
980            CatalogItemType::View => mz_audit_log::ObjectType::View,
981            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
982            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
983            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
984            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
985            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
986            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
987            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
988            CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
989        }
990    }
991}
992
993/// Details about a type in the catalog.
994#[derive(Clone, Debug, Eq, PartialEq)]
995pub struct CatalogTypeDetails<T: TypeReference> {
996    /// The ID of the type with this type as the array element, if available.
997    pub array_id: Option<CatalogItemId>,
998    /// The description of this type.
999    pub typ: CatalogType<T>,
1000    /// Additional metadata about the type in PostgreSQL, if relevant.
1001    pub pg_metadata: Option<CatalogTypePgMetadata>,
1002}
1003
1004/// Additional PostgreSQL metadata about a type.
1005#[derive(Clone, Debug, Eq, PartialEq)]
1006pub struct CatalogTypePgMetadata {
1007    /// The OID of the `typinput` function in PostgreSQL.
1008    pub typinput_oid: u32,
1009    /// The OID of the `typreceive` function in PostgreSQL.
1010    pub typreceive_oid: u32,
1011}
1012
1013/// Represents a reference to type in the catalog
1014pub trait TypeReference {
1015    /// The actual type used to reference a `CatalogType`
1016    type Reference: Clone + Debug + Eq + PartialEq;
1017}
1018
1019/// Reference to a type by it's name
1020#[derive(Clone, Debug, Eq, PartialEq)]
1021pub struct NameReference;
1022
1023impl TypeReference for NameReference {
1024    type Reference = &'static str;
1025}
1026
1027/// Reference to a type by it's global ID
1028#[derive(Clone, Debug, Eq, PartialEq)]
1029pub struct IdReference;
1030
1031impl TypeReference for IdReference {
1032    type Reference = CatalogItemId;
1033}
1034
1035/// A type stored in the catalog.
1036///
1037/// The variants correspond one-to-one with [`mz_repr::SqlScalarType`], but with type
1038/// modifiers removed and with embedded types replaced with references to other
1039/// types in the catalog.
1040#[allow(missing_docs)]
1041#[derive(Clone, Debug, Eq, PartialEq)]
1042pub enum CatalogType<T: TypeReference> {
1043    AclItem,
1044    Array {
1045        element_reference: T::Reference,
1046    },
1047    Bool,
1048    Bytes,
1049    Char,
1050    Date,
1051    Float32,
1052    Float64,
1053    Int16,
1054    Int32,
1055    Int64,
1056    UInt16,
1057    UInt32,
1058    UInt64,
1059    MzTimestamp,
1060    Interval,
1061    Jsonb,
1062    List {
1063        element_reference: T::Reference,
1064        element_modifiers: Vec<i64>,
1065    },
1066    Map {
1067        key_reference: T::Reference,
1068        key_modifiers: Vec<i64>,
1069        value_reference: T::Reference,
1070        value_modifiers: Vec<i64>,
1071    },
1072    Numeric,
1073    Oid,
1074    PgLegacyChar,
1075    PgLegacyName,
1076    Pseudo,
1077    Range {
1078        element_reference: T::Reference,
1079    },
1080    Record {
1081        fields: Vec<CatalogRecordField<T>>,
1082    },
1083    RegClass,
1084    RegProc,
1085    RegType,
1086    String,
1087    Time,
1088    Timestamp,
1089    TimestampTz,
1090    Uuid,
1091    VarChar,
1092    Int2Vector,
1093    MzAclItem,
1094}
1095
1096impl CatalogType<IdReference> {
1097    /// Returns the relation description for the type, if the type is a record
1098    /// type.
1099    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1100        match &self {
1101            CatalogType::Record { fields } => {
1102                let mut desc = RelationDesc::builder();
1103                for f in fields {
1104                    let name = f.name.clone();
1105                    let ty = query::scalar_type_from_catalog(
1106                        catalog,
1107                        f.type_reference,
1108                        &f.type_modifiers,
1109                    )?;
1110                    // TODO: support plumbing `NOT NULL` constraints through
1111                    // `CREATE TYPE`.
1112                    let ty = ty.nullable(true);
1113                    desc = desc.with_column(name, ty);
1114                }
1115                Ok(Some(desc.finish()))
1116            }
1117            _ => Ok(None),
1118        }
1119    }
1120}
1121
1122/// A description of a field in a [`CatalogType::Record`].
1123#[derive(Clone, Debug, Eq, PartialEq)]
1124pub struct CatalogRecordField<T: TypeReference> {
1125    /// The name of the field.
1126    pub name: ColumnName,
1127    /// The ID of the type of the field.
1128    pub type_reference: T::Reference,
1129    /// Modifiers to apply to the type.
1130    pub type_modifiers: Vec<i64>,
1131}
1132
1133#[derive(Clone, Debug, Eq, PartialEq)]
1134/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1135///
1136/// Note that Materialize also uses a number of pseudotypes when planning, but
1137/// we have yet to need to integrate them with `TypeCategory`.
1138///
1139/// [typcategory]:
1140/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1141pub enum TypeCategory {
1142    /// Array type.
1143    Array,
1144    /// Bit string type.
1145    BitString,
1146    /// Boolean type.
1147    Boolean,
1148    /// Composite type.
1149    Composite,
1150    /// Date/time type.
1151    DateTime,
1152    /// Enum type.
1153    Enum,
1154    /// Geometric type.
1155    Geometric,
1156    /// List type. Materialize specific.
1157    List,
1158    /// Network address type.
1159    NetworkAddress,
1160    /// Numeric type.
1161    Numeric,
1162    /// Pseudo type.
1163    Pseudo,
1164    /// Range type.
1165    Range,
1166    /// String type.
1167    String,
1168    /// Timestamp type.
1169    Timespan,
1170    /// User-defined type.
1171    UserDefined,
1172    /// Unknown type.
1173    Unknown,
1174}
1175
1176impl fmt::Display for TypeCategory {
1177    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1178        f.write_str(match self {
1179            TypeCategory::Array => "array",
1180            TypeCategory::BitString => "bit-string",
1181            TypeCategory::Boolean => "boolean",
1182            TypeCategory::Composite => "composite",
1183            TypeCategory::DateTime => "date-time",
1184            TypeCategory::Enum => "enum",
1185            TypeCategory::Geometric => "geometric",
1186            TypeCategory::List => "list",
1187            TypeCategory::NetworkAddress => "network-address",
1188            TypeCategory::Numeric => "numeric",
1189            TypeCategory::Pseudo => "pseudo",
1190            TypeCategory::Range => "range",
1191            TypeCategory::String => "string",
1192            TypeCategory::Timespan => "timespan",
1193            TypeCategory::UserDefined => "user-defined",
1194            TypeCategory::Unknown => "unknown",
1195        })
1196    }
1197}
1198
1199/// Identifies an environment.
1200///
1201/// Outside of tests, an environment ID can be constructed only from a string of
1202/// the following form:
1203///
1204/// ```text
1205/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1206/// ```
1207///
1208/// The fields have the following formats:
1209///
1210/// * The cloud provider consists of one or more alphanumeric characters.
1211/// * The cloud provider region consists of one or more alphanumeric or hyphen
1212///   characters.
1213/// * The organization ID is a UUID in its canonical text format.
1214/// * The ordinal is a decimal number with between one and eight digits.
1215///
1216/// There is no way to construct an environment ID from parts, to ensure that
1217/// the `Display` representation is parseable according to the above rules.
1218// NOTE(benesch): ideally we'd have accepted the components of the environment
1219// ID using separate command-line arguments, or at least a string format that
1220// used a field separator that did not appear in the fields. Alas. We can't
1221// easily change it now, as it's used as the e.g. default sink progress topic.
1222#[derive(Debug, Clone, PartialEq)]
1223pub struct EnvironmentId {
1224    cloud_provider: CloudProvider,
1225    cloud_provider_region: String,
1226    organization_id: Uuid,
1227    ordinal: u64,
1228}
1229
1230impl EnvironmentId {
1231    /// Creates a dummy `EnvironmentId` for use in tests.
1232    pub fn for_tests() -> EnvironmentId {
1233        EnvironmentId {
1234            cloud_provider: CloudProvider::Local,
1235            cloud_provider_region: "az1".into(),
1236            organization_id: Uuid::new_v4(),
1237            ordinal: 0,
1238        }
1239    }
1240
1241    /// Returns the cloud provider associated with this environment ID.
1242    pub fn cloud_provider(&self) -> &CloudProvider {
1243        &self.cloud_provider
1244    }
1245
1246    /// Returns the cloud provider region associated with this environment ID.
1247    pub fn cloud_provider_region(&self) -> &str {
1248        &self.cloud_provider_region
1249    }
1250
1251    /// Returns the name of the region associted with this environment ID.
1252    ///
1253    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1254    /// [`EnvironmentId::cloud_provider_region`].
1255    pub fn region(&self) -> String {
1256        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1257    }
1258
1259    /// Returns the organization ID associated with this environment ID.
1260    pub fn organization_id(&self) -> Uuid {
1261        self.organization_id
1262    }
1263
1264    /// Returns the ordinal associated with this environment ID.
1265    pub fn ordinal(&self) -> u64 {
1266        self.ordinal
1267    }
1268}
1269
1270// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1271// populated using this key. Consequently, any changes to that trait
1272// implementation will also have to be reflected in the existing feature
1273// targeting config in LaunchDarkly, otherwise environments might receive
1274// different configs upon restart.
1275impl fmt::Display for EnvironmentId {
1276    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1277        write!(
1278            f,
1279            "{}-{}-{}-{}",
1280            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1281        )
1282    }
1283}
1284
1285impl FromStr for EnvironmentId {
1286    type Err = InvalidEnvironmentIdError;
1287
1288    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1289        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1290            Regex::new(
1291                "^(?P<cloud_provider>[[:alnum:]]+)-\
1292                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1293                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1294                  (?P<ordinal>\\d{1,8})$"
1295            ).unwrap()
1296        });
1297        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1298        Ok(EnvironmentId {
1299            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1300            cloud_provider_region: captures["cloud_provider_region"].into(),
1301            organization_id: captures["organization_id"]
1302                .parse()
1303                .map_err(|_| InvalidEnvironmentIdError)?,
1304            ordinal: captures["ordinal"]
1305                .parse()
1306                .map_err(|_| InvalidEnvironmentIdError)?,
1307        })
1308    }
1309}
1310
1311/// The error type for [`EnvironmentId::from_str`].
1312#[derive(Debug, Clone, PartialEq)]
1313pub struct InvalidEnvironmentIdError;
1314
1315impl fmt::Display for InvalidEnvironmentIdError {
1316    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1317        f.write_str("invalid environment ID")
1318    }
1319}
1320
1321impl Error for InvalidEnvironmentIdError {}
1322
1323impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1324    fn from(_: InvalidCloudProviderError) -> Self {
1325        InvalidEnvironmentIdError
1326    }
1327}
1328
1329/// An error returned by the catalog.
1330#[derive(Clone, Debug, Eq, PartialEq)]
1331pub enum CatalogError {
1332    /// Unknown database.
1333    UnknownDatabase(String),
1334    /// Database already exists.
1335    DatabaseAlreadyExists(String),
1336    /// Unknown schema.
1337    UnknownSchema(String),
1338    /// Schema already exists.
1339    SchemaAlreadyExists(String),
1340    /// Unknown role.
1341    UnknownRole(String),
1342    /// Role already exists.
1343    RoleAlreadyExists(String),
1344    /// Network Policy already exists.
1345    NetworkPolicyAlreadyExists(String),
1346    /// Unknown cluster.
1347    UnknownCluster(String),
1348    /// Unexpected builtin cluster.
1349    UnexpectedBuiltinCluster(String),
1350    /// Unexpected builtin cluster.
1351    UnexpectedBuiltinClusterType(String),
1352    /// Cluster already exists.
1353    ClusterAlreadyExists(String),
1354    /// Unknown cluster replica.
1355    UnknownClusterReplica(String),
1356    /// Unknown cluster replica size.
1357    UnknownClusterReplicaSize(String),
1358    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1359    DuplicateReplica(String, String),
1360    /// Unknown item.
1361    UnknownItem(String),
1362    /// Item already exists.
1363    ItemAlreadyExists(CatalogItemId, String),
1364    /// Unknown function.
1365    UnknownFunction {
1366        /// The identifier of the function we couldn't find
1367        name: String,
1368        /// A suggested alternative to the named function.
1369        alternative: Option<String>,
1370    },
1371    /// Unknown type.
1372    UnknownType {
1373        /// The identifier of the type we couldn't find.
1374        name: String,
1375    },
1376    /// Unknown connection.
1377    UnknownConnection(String),
1378    /// Unknown network policy.
1379    UnknownNetworkPolicy(String),
1380    /// Expected the catalog item to have the given type, but it did not.
1381    UnexpectedType {
1382        /// The item's name.
1383        name: String,
1384        /// The actual type of the item.
1385        actual_type: CatalogItemType,
1386        /// The expected type of the item.
1387        expected_type: CatalogItemType,
1388    },
1389    /// Invalid attempt to depend on a non-dependable item.
1390    InvalidDependency {
1391        /// The invalid item's name.
1392        name: String,
1393        /// The invalid item's type.
1394        typ: CatalogItemType,
1395    },
1396    /// Ran out of unique IDs.
1397    IdExhaustion,
1398    /// Ran out of unique OIDs.
1399    OidExhaustion,
1400    /// Timeline already exists.
1401    TimelineAlreadyExists(String),
1402    /// Id Allocator already exists.
1403    IdAllocatorAlreadyExists(String),
1404    /// Config already exists.
1405    ConfigAlreadyExists(String),
1406    /// Builtin migrations failed.
1407    FailedBuiltinSchemaMigration(String),
1408    /// StorageCollectionMetadata already exists.
1409    StorageCollectionMetadataAlreadyExists(GlobalId),
1410}
1411
1412impl fmt::Display for CatalogError {
1413    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1414        match self {
1415            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1416            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1417            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1418            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1419            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1420            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1421            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1422            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1423            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1424            Self::NetworkPolicyAlreadyExists(name) => {
1425                write!(f, "network policy '{name}' already exists")
1426            }
1427            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1428            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1429            Self::UnexpectedBuiltinCluster(name) => {
1430                write!(f, "Unexpected builtin cluster '{}'", name)
1431            }
1432            Self::UnexpectedBuiltinClusterType(name) => {
1433                write!(f, "Unexpected builtin cluster type'{}'", name)
1434            }
1435            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1436            Self::UnknownClusterReplica(name) => {
1437                write!(f, "unknown cluster replica '{}'", name)
1438            }
1439            Self::UnknownClusterReplicaSize(name) => {
1440                write!(f, "unknown cluster replica size '{}'", name)
1441            }
1442            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1443                f,
1444                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1445            ),
1446            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1447            Self::ItemAlreadyExists(_gid, name) => {
1448                write!(f, "catalog item '{name}' already exists")
1449            }
1450            Self::UnexpectedType {
1451                name,
1452                actual_type,
1453                expected_type,
1454            } => {
1455                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1456            }
1457            Self::InvalidDependency { name, typ } => write!(
1458                f,
1459                "catalog item '{}' is {} {} and so cannot be depended upon",
1460                name,
1461                if matches!(typ, CatalogItemType::Index) {
1462                    "an"
1463                } else {
1464                    "a"
1465                },
1466                typ,
1467            ),
1468            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1469            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1470            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1471            Self::IdAllocatorAlreadyExists(name) => {
1472                write!(f, "ID allocator '{name}' already exists")
1473            }
1474            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1475            Self::FailedBuiltinSchemaMigration(objects) => {
1476                write!(f, "failed to migrate schema of builtin objects: {objects}")
1477            }
1478            Self::StorageCollectionMetadataAlreadyExists(key) => {
1479                write!(f, "storage metadata for '{key}' already exists")
1480            }
1481        }
1482    }
1483}
1484
1485impl CatalogError {
1486    /// Returns any applicable hints for [`CatalogError`].
1487    pub fn hint(&self) -> Option<String> {
1488        match self {
1489            CatalogError::UnknownFunction { alternative, .. } => {
1490                match alternative {
1491                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1492                    Some(alt) => Some(format!("Try using {alt}")),
1493                }
1494            }
1495            _ => None,
1496        }
1497    }
1498}
1499
1500impl Error for CatalogError {}
1501
1502// Enum variant docs would be useless here.
1503#[allow(missing_docs)]
1504#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1505/// The types of objects stored in the catalog.
1506pub enum ObjectType {
1507    Table,
1508    View,
1509    MaterializedView,
1510    Source,
1511    Sink,
1512    Index,
1513    Type,
1514    Role,
1515    Cluster,
1516    ClusterReplica,
1517    Secret,
1518    Connection,
1519    Database,
1520    Schema,
1521    Func,
1522    ContinualTask,
1523    NetworkPolicy,
1524}
1525
1526impl ObjectType {
1527    /// Reports if the object type can be treated as a relation.
1528    pub fn is_relation(&self) -> bool {
1529        match self {
1530            ObjectType::Table
1531            | ObjectType::View
1532            | ObjectType::MaterializedView
1533            | ObjectType::Source
1534            | ObjectType::ContinualTask => true,
1535            ObjectType::Sink
1536            | ObjectType::Index
1537            | ObjectType::Type
1538            | ObjectType::Secret
1539            | ObjectType::Connection
1540            | ObjectType::Func
1541            | ObjectType::Database
1542            | ObjectType::Schema
1543            | ObjectType::Cluster
1544            | ObjectType::ClusterReplica
1545            | ObjectType::Role
1546            | ObjectType::NetworkPolicy => false,
1547        }
1548    }
1549}
1550
1551impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1552    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1553        match value {
1554            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1555            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1556            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1557            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1558            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1559            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1560            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1561            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1562            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1563            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1564            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1565            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1566            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1567            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1568            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1569            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1570            mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask,
1571            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1572        }
1573    }
1574}
1575
1576impl From<CommentObjectId> for ObjectType {
1577    fn from(value: CommentObjectId) -> ObjectType {
1578        match value {
1579            CommentObjectId::Table(_) => ObjectType::Table,
1580            CommentObjectId::View(_) => ObjectType::View,
1581            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1582            CommentObjectId::Source(_) => ObjectType::Source,
1583            CommentObjectId::Sink(_) => ObjectType::Sink,
1584            CommentObjectId::Index(_) => ObjectType::Index,
1585            CommentObjectId::Func(_) => ObjectType::Func,
1586            CommentObjectId::Connection(_) => ObjectType::Connection,
1587            CommentObjectId::Type(_) => ObjectType::Type,
1588            CommentObjectId::Secret(_) => ObjectType::Secret,
1589            CommentObjectId::Role(_) => ObjectType::Role,
1590            CommentObjectId::Database(_) => ObjectType::Database,
1591            CommentObjectId::Schema(_) => ObjectType::Schema,
1592            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1593            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1594            CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1595            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1596        }
1597    }
1598}
1599
1600impl Display for ObjectType {
1601    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1602        f.write_str(match self {
1603            ObjectType::Table => "TABLE",
1604            ObjectType::View => "VIEW",
1605            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1606            ObjectType::Source => "SOURCE",
1607            ObjectType::Sink => "SINK",
1608            ObjectType::Index => "INDEX",
1609            ObjectType::Type => "TYPE",
1610            ObjectType::Role => "ROLE",
1611            ObjectType::Cluster => "CLUSTER",
1612            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1613            ObjectType::Secret => "SECRET",
1614            ObjectType::Connection => "CONNECTION",
1615            ObjectType::Database => "DATABASE",
1616            ObjectType::Schema => "SCHEMA",
1617            ObjectType::Func => "FUNCTION",
1618            ObjectType::ContinualTask => "CONTINUAL TASK",
1619            ObjectType::NetworkPolicy => "NETWORK POLICY",
1620        })
1621    }
1622}
1623
1624#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1625/// The types of objects in the system.
1626pub enum SystemObjectType {
1627    /// Catalog object type.
1628    Object(ObjectType),
1629    /// Entire system.
1630    System,
1631}
1632
1633impl SystemObjectType {
1634    /// Reports if the object type can be treated as a relation.
1635    pub fn is_relation(&self) -> bool {
1636        match self {
1637            SystemObjectType::Object(object_type) => object_type.is_relation(),
1638            SystemObjectType::System => false,
1639        }
1640    }
1641}
1642
1643impl Display for SystemObjectType {
1644    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1645        match self {
1646            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1647            SystemObjectType::System => f.write_str("SYSTEM"),
1648        }
1649    }
1650}
1651
1652/// Enum used to format object names in error messages.
1653#[derive(Debug, Clone, PartialEq, Eq)]
1654pub enum ErrorMessageObjectDescription {
1655    /// The name of a specific object.
1656    Object {
1657        /// Type of object.
1658        object_type: ObjectType,
1659        /// Name of object.
1660        object_name: Option<String>,
1661    },
1662    /// The name of the entire system.
1663    System,
1664}
1665
1666impl ErrorMessageObjectDescription {
1667    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1668    pub fn from_id(
1669        object_id: &ObjectId,
1670        catalog: &dyn SessionCatalog,
1671    ) -> ErrorMessageObjectDescription {
1672        let object_name = match object_id {
1673            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1674            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1675                .get_cluster_replica(*cluster_id, *replica_id)
1676                .name()
1677                .to_string(),
1678            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1679            ObjectId::Schema((database_spec, schema_spec)) => {
1680                let name = catalog.get_schema(database_spec, schema_spec).name();
1681                catalog.resolve_full_schema_name(name).to_string()
1682            }
1683            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1684            ObjectId::Item(id) => {
1685                let name = catalog.get_item(id).name();
1686                catalog.resolve_full_name(name).to_string()
1687            }
1688            ObjectId::NetworkPolicy(network_policy_id) => catalog
1689                .get_network_policy(network_policy_id)
1690                .name()
1691                .to_string(),
1692        };
1693        ErrorMessageObjectDescription::Object {
1694            object_type: catalog.get_object_type(object_id),
1695            object_name: Some(object_name),
1696        }
1697    }
1698
1699    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1700    pub fn from_sys_id(
1701        object_id: &SystemObjectId,
1702        catalog: &dyn SessionCatalog,
1703    ) -> ErrorMessageObjectDescription {
1704        match object_id {
1705            SystemObjectId::Object(object_id) => {
1706                ErrorMessageObjectDescription::from_id(object_id, catalog)
1707            }
1708            SystemObjectId::System => ErrorMessageObjectDescription::System,
1709        }
1710    }
1711
1712    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1713    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1714        match object_type {
1715            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1716                object_type,
1717                object_name: None,
1718            },
1719            SystemObjectType::System => ErrorMessageObjectDescription::System,
1720        }
1721    }
1722}
1723
1724impl Display for ErrorMessageObjectDescription {
1725    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1726        match self {
1727            ErrorMessageObjectDescription::Object {
1728                object_type,
1729                object_name,
1730            } => {
1731                let object_name = object_name
1732                    .as_ref()
1733                    .map(|object_name| format!(" {}", object_name.quoted()))
1734                    .unwrap_or_else(|| "".to_string());
1735                write!(f, "{object_type}{object_name}")
1736            }
1737            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1738        }
1739    }
1740}
1741
1742#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1743// These attributes are needed because the key of a map must be a string. We also
1744// get the added benefit of flattening this struct in it's serialized form.
1745#[serde(into = "BTreeMap<String, RoleId>")]
1746#[serde(try_from = "BTreeMap<String, RoleId>")]
1747/// Represents the grantee and a grantor of a role membership.
1748pub struct RoleMembership {
1749    /// Key is the role that some role is a member of, value is the grantor role ID.
1750    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1751    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1752    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1753    // migration.
1754    pub map: BTreeMap<RoleId, RoleId>,
1755}
1756
1757impl RoleMembership {
1758    /// Creates a new [`RoleMembership`].
1759    pub fn new() -> RoleMembership {
1760        RoleMembership {
1761            map: BTreeMap::new(),
1762        }
1763    }
1764}
1765
1766impl From<RoleMembership> for BTreeMap<String, RoleId> {
1767    fn from(value: RoleMembership) -> Self {
1768        value
1769            .map
1770            .into_iter()
1771            .map(|(k, v)| (k.to_string(), v))
1772            .collect()
1773    }
1774}
1775
1776impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1777    type Error = anyhow::Error;
1778
1779    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1780        Ok(RoleMembership {
1781            map: value
1782                .into_iter()
1783                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1784                .collect::<Result<_, anyhow::Error>>()?,
1785        })
1786    }
1787}
1788
1789/// Specification for objects that will be affected by a default privilege.
1790#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1791pub struct DefaultPrivilegeObject {
1792    /// The role id that created the object.
1793    pub role_id: RoleId,
1794    /// The database that the object is created in if Some, otherwise all databases.
1795    pub database_id: Option<DatabaseId>,
1796    /// The schema that the object is created in if Some, otherwise all databases.
1797    pub schema_id: Option<SchemaId>,
1798    /// The type of object.
1799    pub object_type: ObjectType,
1800}
1801
1802impl DefaultPrivilegeObject {
1803    /// Creates a new [`DefaultPrivilegeObject`].
1804    pub fn new(
1805        role_id: RoleId,
1806        database_id: Option<DatabaseId>,
1807        schema_id: Option<SchemaId>,
1808        object_type: ObjectType,
1809    ) -> DefaultPrivilegeObject {
1810        DefaultPrivilegeObject {
1811            role_id,
1812            database_id,
1813            schema_id,
1814            object_type,
1815        }
1816    }
1817}
1818
1819impl std::fmt::Display for DefaultPrivilegeObject {
1820    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1821        // TODO: Don't just wrap Debug.
1822        write!(f, "{self:?}")
1823    }
1824}
1825
1826/// Specification for the privileges that will be granted from default privileges.
1827#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1828pub struct DefaultPrivilegeAclItem {
1829    /// The role that will receive the privileges.
1830    pub grantee: RoleId,
1831    /// The specific privileges granted.
1832    pub acl_mode: AclMode,
1833}
1834
1835impl DefaultPrivilegeAclItem {
1836    /// Creates a new [`DefaultPrivilegeAclItem`].
1837    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1838        DefaultPrivilegeAclItem { grantee, acl_mode }
1839    }
1840
1841    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1842    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1843        MzAclItem {
1844            grantee: self.grantee,
1845            grantor,
1846            acl_mode: self.acl_mode,
1847        }
1848    }
1849}
1850
1851/// Which builtins to return in `BUILTINS::iter`.
1852///
1853/// All calls to `BUILTINS::iter` within the lifetime of an environmentd process
1854/// must provide an equal `BuiltinsConfig`. It is not allowed to change
1855/// dynamically.
1856#[derive(Debug, Clone)]
1857pub struct BuiltinsConfig {
1858    /// If true, include system builtin continual tasks.
1859    pub include_continual_tasks: bool,
1860}
1861
1862#[cfg(test)]
1863mod tests {
1864    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1865
1866    #[mz_ore::test]
1867    fn test_environment_id() {
1868        for (input, expected) in [
1869            (
1870                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1871                Ok(EnvironmentId {
1872                    cloud_provider: CloudProvider::Local,
1873                    cloud_provider_region: "az1".into(),
1874                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1875                    ordinal: 452,
1876                }),
1877            ),
1878            (
1879                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1880                Ok(EnvironmentId {
1881                    cloud_provider: CloudProvider::Aws,
1882                    cloud_provider_region: "us-east-1".into(),
1883                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1884                    ordinal: 0,
1885                }),
1886            ),
1887            (
1888                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1889                Ok(EnvironmentId {
1890                    cloud_provider: CloudProvider::Gcp,
1891                    cloud_provider_region: "us-central1".into(),
1892                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1893                    ordinal: 0,
1894                }),
1895            ),
1896            (
1897                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1898                Ok(EnvironmentId {
1899                    cloud_provider: CloudProvider::Azure,
1900                    cloud_provider_region: "australiaeast".into(),
1901                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1902                    ordinal: 0,
1903                }),
1904            ),
1905            (
1906                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1907                Ok(EnvironmentId {
1908                    cloud_provider: CloudProvider::Generic,
1909                    cloud_provider_region: "moon-station-11-darkside".into(),
1910                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1911                    ordinal: 0,
1912                }),
1913            ),
1914            ("", Err(InvalidEnvironmentIdError)),
1915            (
1916                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1917                Err(InvalidEnvironmentIdError),
1918            ),
1919            (
1920                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1921                Err(InvalidEnvironmentIdError),
1922            ),
1923            (
1924                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1925                Err(InvalidEnvironmentIdError),
1926            ),
1927        ] {
1928            let actual = input.parse();
1929            assert_eq!(expected, actual, "input = {}", input);
1930            if let Ok(actual) = actual {
1931                assert_eq!(input, actual.to_string(), "input = {}", input);
1932            }
1933        }
1934    }
1935}