Skip to main content

mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::num::NonZeroU32;
20use std::str::FromStr;
21use std::sync::LazyLock;
22use std::time::{Duration, Instant};
23
24use chrono::{DateTime, Utc};
25use mz_auth::password::Password;
26use mz_build_info::BuildInfo;
27use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
28use mz_controller_types::{ClusterId, ReplicaId};
29use mz_expr::MirScalarExpr;
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_ore::str::StrExt;
32use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
33use mz_repr::explain::ExprHumanizer;
34use mz_repr::network_policy_id::NetworkPolicyId;
35use mz_repr::role_id::RoleId;
36use mz_repr::{
37    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
38};
39use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
40use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
41use mz_storage_types::connections::{Connection, ConnectionContext};
42use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
43use proptest_derive::Arbitrary;
44use regex::Regex;
45use serde::{Deserialize, Serialize};
46use uuid::Uuid;
47
48use crate::func::Func;
49use crate::names::{
50    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
51    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
52    SchemaSpecifier, SystemObjectId,
53};
54use crate::plan::statement::StatementDesc;
55use crate::plan::statement::ddl::PlannedRoleAttributes;
56use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
57use crate::session::vars::{OwnedVarInput, SystemVars};
58
59/// A catalog keeps track of SQL objects and session state available to the
60/// planner.
61///
62/// The `sql` crate is agnostic to any particular catalog implementation. This
63/// trait describes the required interface.
64///
65/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
66/// catalog contains databases, databases contain schemas, and schemas contain
67/// catalog items, like sources, sinks, view, and indexes.
68///
69/// There are two classes of operations provided by a catalog:
70///
71///   * Resolution operations, like [`resolve_item`]. These fill in missing name
72///     components based upon connection defaults, e.g., resolving the partial
73///     name `view42` to the fully-specified name `materialize.public.view42`.
74///
75///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
76///     metadata about a catalog entity based on a fully-specified name that is
77///     known to be valid (i.e., because the name was successfully resolved, or
78///     was constructed based on the output of a prior lookup operation). These
79///     functions panic if called with invalid input.
80///
81///   * Session management, such as managing variables' states and adding
82///     notices to the session.
83///
84/// [`get_databases`]: SessionCatalog::get_databases
85/// [`get_item`]: SessionCatalog::get_item
86/// [`resolve_item`]: SessionCatalog::resolve_item
87pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
88    /// Returns the id of the role that is issuing the query.
89    fn active_role_id(&self) -> &RoleId;
90
91    /// Returns the database to use if one is not explicitly specified.
92    fn active_database_name(&self) -> Option<&str> {
93        self.active_database()
94            .map(|id| self.get_database(id))
95            .map(|db| db.name())
96    }
97
98    /// Returns the database to use if one is not explicitly specified.
99    fn active_database(&self) -> Option<&DatabaseId>;
100
101    /// Returns the cluster to use if one is not explicitly specified.
102    fn active_cluster(&self) -> &str;
103
104    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
105    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
106
107    /// Returns the descriptor of the named prepared statement on the session, or
108    /// None if the prepared statement does not exist.
109    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
110
111    /// Retrieves a reference to the specified portal's descriptor.
112    ///
113    /// If there is no such portal, returns `None`.
114    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc>;
115
116    /// Resolves the named database.
117    ///
118    /// If `database_name` exists in the catalog, it returns a reference to the
119    /// resolved database; otherwise it returns an error.
120    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
121
122    /// Gets a database by its ID.
123    ///
124    /// Panics if `id` does not specify a valid database.
125    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
126
127    /// Gets all databases.
128    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
129
130    /// Resolves a partially-specified schema name.
131    ///
132    /// If the schema exists in the catalog, it returns a reference to the
133    /// resolved schema; otherwise it returns an error.
134    fn resolve_schema(
135        &self,
136        database_name: Option<&str>,
137        schema_name: &str,
138    ) -> Result<&dyn CatalogSchema, CatalogError>;
139
140    /// Resolves a schema name within a specified database.
141    ///
142    /// If the schema exists in the database, it returns a reference to the
143    /// resolved schema; otherwise it returns an error.
144    fn resolve_schema_in_database(
145        &self,
146        database_spec: &ResolvedDatabaseSpecifier,
147        schema_name: &str,
148    ) -> Result<&dyn CatalogSchema, CatalogError>;
149
150    /// Gets a schema by its ID.
151    ///
152    /// Panics if `id` does not specify a valid schema.
153    fn get_schema(
154        &self,
155        database_spec: &ResolvedDatabaseSpecifier,
156        schema_spec: &SchemaSpecifier,
157    ) -> &dyn CatalogSchema;
158
159    /// Gets all schemas.
160    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
161
162    /// Gets the mz_internal schema id.
163    fn get_mz_internal_schema_id(&self) -> SchemaId;
164
165    /// Gets the mz_unsafe schema id.
166    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
167
168    /// Returns true if `schema` is an internal system schema, false otherwise
169    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
170
171    /// Resolves the named role.
172    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
173
174    /// Resolves the named network policy.
175    fn resolve_network_policy(
176        &self,
177        network_policy_name: &str,
178    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
179
180    /// Gets a role by its ID.
181    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
182
183    /// Gets a role by its ID.
184    ///
185    /// Panics if `id` does not specify a valid role.
186    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
187
188    /// Gets all roles.
189    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
190
191    /// Gets the id of the `mz_system` role.
192    fn mz_system_role_id(&self) -> RoleId;
193
194    /// Collects all role IDs that `id` is transitively a member of.
195    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
196
197    /// Resolves the named cluster.
198    /// Gets a network_policy by its ID.
199    ///
200    /// Panics if `id` does not specify a valid role.
201    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
202
203    /// Gets all roles.
204    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
205
206    ///
207    /// If the provided name is `None`, resolves the currently active cluster.
208    fn resolve_cluster<'a, 'b>(
209        &'a self,
210        cluster_name: Option<&'b str>,
211    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
212
213    /// Resolves the named cluster replica.
214    fn resolve_cluster_replica<'a, 'b>(
215        &'a self,
216        cluster_replica_name: &'b QualifiedReplica,
217    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
218
219    /// Resolves a partially-specified item name, that is NOT a function or
220    /// type. (For resolving functions or types, please use
221    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
222    ///
223    /// If the partial name has a database component, it searches only the
224    /// specified database; otherwise, it searches the active database. If the
225    /// partial name has a schema component, it searches only the specified
226    /// schema; otherwise, it searches a default set of schemas within the
227    /// selected database. It returns an error if none of the searched schemas
228    /// contain an item whose name matches the item component of the partial
229    /// name.
230    ///
231    /// Note that it is not an error if the named item appears in more than one
232    /// of the search schemas. The catalog implementation must choose one.
233    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
234
235    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
236    /// functions within the catalog.
237    fn resolve_function(
238        &self,
239        item_name: &PartialItemName,
240    ) -> Result<&dyn CatalogItem, CatalogError>;
241
242    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
243    /// types within the catalog.
244    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
245
246    /// Resolves `name` to a type or item, preferring the type if both exist.
247    fn resolve_item_or_type(
248        &self,
249        name: &PartialItemName,
250    ) -> Result<&dyn CatalogItem, CatalogError> {
251        if let Ok(ty) = self.resolve_type(name) {
252            return Ok(ty);
253        }
254        self.resolve_item(name)
255    }
256
257    /// Gets a type named `name` from exactly one of the system schemas.
258    ///
259    /// # Panics
260    /// - If `name` is not an entry in any system schema
261    /// - If more than one system schema has an entry named `name`.
262    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
263
264    /// Gets an item by its ID.
265    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
266
267    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
268    /// exist.
269    ///
270    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
271    fn try_get_item_by_global_id<'a>(
272        &'a self,
273        id: &GlobalId,
274    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
275
276    /// Gets an item by its ID.
277    ///
278    /// Panics if `id` does not specify a valid item.
279    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
280
281    /// Gets an item by a [`GlobalId`].
282    ///
283    /// Panics if `id` does not specify a valid item.
284    ///
285    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
286    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
287
288    /// Gets all items.
289    fn get_items(&self) -> Vec<&dyn CatalogItem>;
290
291    /// Looks up an item by its name.
292    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
293
294    /// Looks up a type by its name.
295    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
296
297    /// Gets a cluster by ID.
298    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster<'_>;
299
300    /// Gets all clusters.
301    fn get_clusters(&self) -> Vec<&dyn CatalogCluster<'_>>;
302
303    /// Gets a cluster replica by ID.
304    fn get_cluster_replica(
305        &self,
306        cluster_id: ClusterId,
307        replica_id: ReplicaId,
308    ) -> &dyn CatalogClusterReplica<'_>;
309
310    /// Gets all cluster replicas.
311    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
312
313    /// Gets all system privileges.
314    fn get_system_privileges(&self) -> &PrivilegeMap;
315
316    /// Gets all default privileges.
317    fn get_default_privileges(
318        &self,
319    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
320
321    /// Finds a name like `name` that is not already in use.
322    ///
323    /// If `name` itself is available, it is returned unchanged.
324    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
325
326    /// Returns a fully qualified human readable name from fully qualified non-human readable name
327    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
328
329    /// Returns a fully qualified human readable schema name from fully qualified non-human
330    /// readable schema name
331    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
332
333    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
334    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
335
336    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
337    fn resolve_global_id(
338        &self,
339        item_id: &CatalogItemId,
340        version: RelationVersionSelector,
341    ) -> GlobalId;
342
343    /// Returns the configuration of the catalog.
344    fn config(&self) -> &CatalogConfig;
345
346    /// Returns the number of milliseconds since the system epoch. For normal use
347    /// this means the Unix epoch. This can safely be mocked in tests and start
348    /// at 0.
349    fn now(&self) -> EpochMillis;
350
351    /// Returns the set of supported AWS PrivateLink availability zone ids.
352    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
353
354    /// Returns system vars
355    fn system_vars(&self) -> &SystemVars;
356
357    /// Returns mutable system vars
358    ///
359    /// Clients should use this this method carefully, as changes to the backing
360    /// state here are not guarateed to be persisted. The motivating use case
361    /// for this method was ensuring that features are temporary turned on so
362    /// catalog rehydration does not break due to unsupported SQL syntax.
363    fn system_vars_mut(&mut self) -> &mut SystemVars;
364
365    /// Returns the [`RoleId`] of the owner of an object by its ID.
366    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
367
368    /// Returns the [`PrivilegeMap`] of the object.
369    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
370
371    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
372    ///
373    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
374    /// earlier in the list than the roots. This is particularly userful for the order to drop
375    /// objects.
376    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
377
378    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
379    ///
380    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
381    /// earlier in the list than `id`. This is particularly userful for the order to drop
382    /// objects.
383    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
384
385    /// Returns all possible privileges associated with an object type.
386    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
387
388    /// Returns the object type of `object_id`.
389    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
390
391    /// Returns the system object type of `id`.
392    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
393
394    /// Returns the minimal qualification required to unambiguously specify
395    /// `qualified_name`.
396    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
397
398    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
399    /// successfully executes.
400    fn add_notice(&self, notice: PlanNotice);
401
402    /// Returns the associated comments for the given `id`
403    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
404
405    /// Reports whether the specified cluster size is a modern "cc" size rather
406    /// than a legacy T-shirt size.
407    fn is_cluster_size_cc(&self, size: &str) -> bool;
408}
409
410/// Configuration associated with a catalog.
411#[derive(Debug, Clone)]
412pub struct CatalogConfig {
413    /// Returns the time at which the catalog booted.
414    pub start_time: DateTime<Utc>,
415    /// Returns the instant at which the catalog booted.
416    pub start_instant: Instant,
417    /// A random integer associated with this instance of the catalog.
418    ///
419    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
420    /// topics. Perhaps we can remove this when database-issues#977 is complete.
421    pub nonce: u64,
422    /// A persistent ID associated with the environment.
423    pub environment_id: EnvironmentId,
424    /// A transient UUID associated with this process.
425    pub session_id: Uuid,
426    /// Information about this build of Materialize.
427    pub build_info: &'static BuildInfo,
428    /// Default timestamp interval.
429    pub timestamp_interval: Duration,
430    /// Function that returns a wall clock now time; can safely be mocked to return
431    /// 0.
432    pub now: NowFn,
433    /// Context for source and sink connections.
434    pub connection_context: ConnectionContext,
435    /// Which system builtins to include. Not allowed to change dynamically.
436    pub builtins_cfg: BuiltinsConfig,
437    /// Helm chart version
438    pub helm_chart_version: Option<String>,
439}
440
441/// A database in a [`SessionCatalog`].
442pub trait CatalogDatabase {
443    /// Returns a fully-specified name of the database.
444    fn name(&self) -> &str;
445
446    /// Returns a stable ID for the database.
447    fn id(&self) -> DatabaseId;
448
449    /// Returns whether the database contains schemas.
450    fn has_schemas(&self) -> bool;
451
452    /// Returns the schemas of the database as a map from schema name to
453    /// schema ID.
454    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
455
456    /// Returns the schemas of the database.
457    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
458
459    /// Returns the ID of the owning role.
460    fn owner_id(&self) -> RoleId;
461
462    /// Returns the privileges associated with the database.
463    fn privileges(&self) -> &PrivilegeMap;
464}
465
466/// A schema in a [`SessionCatalog`].
467pub trait CatalogSchema {
468    /// Returns a fully-specified id of the database
469    fn database(&self) -> &ResolvedDatabaseSpecifier;
470
471    /// Returns a fully-specified name of the schema.
472    fn name(&self) -> &QualifiedSchemaName;
473
474    /// Returns a stable ID for the schema.
475    fn id(&self) -> &SchemaSpecifier;
476
477    /// Lists the `CatalogItem`s for the schema.
478    fn has_items(&self) -> bool;
479
480    /// Returns the IDs of the items in the schema.
481    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
482
483    /// Returns the ID of the owning role.
484    fn owner_id(&self) -> RoleId;
485
486    /// Returns the privileges associated with the schema.
487    fn privileges(&self) -> &PrivilegeMap;
488}
489
490/// Parameters used to modify password
491#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
492pub struct PasswordConfig {
493    /// The Password.
494    pub password: Password,
495    /// a non default iteration count for hashing the password.
496    pub scram_iterations: NonZeroU32,
497}
498
499/// A modification of a role password in the catalog
500#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
501pub enum PasswordAction {
502    /// Set a new password.
503    Set(PasswordConfig),
504    /// Remove the existing password.
505    Clear,
506    /// Leave the existing password unchanged.
507    NoChange,
508}
509
510/// A raw representation of attributes belonging to a [`CatalogRole`] that we might
511/// get as input from the user. This includes the password.
512/// This struct explicitly does not implement `Serialize` or `Deserialize` to avoid
513/// accidentally serializing passwords.
514#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
515pub struct RoleAttributesRaw {
516    /// Indicates whether the role has inheritance of privileges.
517    pub inherit: bool,
518    /// The raw password of the role. This is for self managed auth, not cloud.
519    pub password: Option<Password>,
520    /// Hash iterations used to securely store passwords. This is for self-managed auth
521    pub scram_iterations: Option<NonZeroU32>,
522    /// Whether or not this user is a superuser.
523    pub superuser: Option<bool>,
524    /// Whether this role is login
525    pub login: Option<bool>,
526    // Force use of constructor.
527    _private: (),
528}
529
530/// Attributes belonging to a [`CatalogRole`].
531#[derive(
532    Debug,
533    Clone,
534    Eq,
535    Serialize,
536    Deserialize,
537    PartialEq,
538    Ord,
539    PartialOrd,
540    Arbitrary
541)]
542pub struct RoleAttributes {
543    /// Indicates whether the role has inheritance of privileges.
544    pub inherit: bool,
545    /// Whether or not this user is a superuser.
546    pub superuser: Option<bool>,
547    /// Whether this role is login
548    pub login: Option<bool>,
549    // Force use of constructor.
550    _private: (),
551}
552
553impl RoleAttributesRaw {
554    /// Creates a new [`RoleAttributesRaw`] with default attributes.
555    pub const fn new() -> RoleAttributesRaw {
556        RoleAttributesRaw {
557            inherit: true,
558            password: None,
559            scram_iterations: None,
560            superuser: None,
561            login: None,
562            _private: (),
563        }
564    }
565
566    /// Adds all attributes excluding password.
567    pub const fn with_all(mut self) -> RoleAttributesRaw {
568        self.inherit = true;
569        self.superuser = Some(true);
570        self.login = Some(true);
571        self
572    }
573}
574
575impl RoleAttributes {
576    /// Creates a new [`RoleAttributes`] with default attributes.
577    pub const fn new() -> RoleAttributes {
578        RoleAttributes {
579            inherit: true,
580            superuser: None,
581            login: None,
582            _private: (),
583        }
584    }
585
586    /// Adds all attributes except password.
587    pub const fn with_all(mut self) -> RoleAttributes {
588        self.inherit = true;
589        self.superuser = Some(true);
590        self.login = Some(true);
591        self
592    }
593
594    /// Returns whether or not the role has inheritence of privileges.
595    pub const fn is_inherit(&self) -> bool {
596        self.inherit
597    }
598}
599
600impl From<RoleAttributesRaw> for RoleAttributes {
601    fn from(
602        RoleAttributesRaw {
603            inherit,
604            superuser,
605            login,
606            ..
607        }: RoleAttributesRaw,
608    ) -> RoleAttributes {
609        RoleAttributes {
610            inherit,
611            superuser,
612            login,
613            _private: (),
614        }
615    }
616}
617
618impl From<RoleAttributes> for RoleAttributesRaw {
619    fn from(
620        RoleAttributes {
621            inherit,
622            superuser,
623            login,
624            ..
625        }: RoleAttributes,
626    ) -> RoleAttributesRaw {
627        RoleAttributesRaw {
628            inherit,
629            password: None,
630            scram_iterations: None,
631            superuser,
632            login,
633            _private: (),
634        }
635    }
636}
637
638impl From<PlannedRoleAttributes> for RoleAttributesRaw {
639    fn from(
640        PlannedRoleAttributes {
641            inherit,
642            password,
643            scram_iterations,
644            superuser,
645            login,
646            ..
647        }: PlannedRoleAttributes,
648    ) -> RoleAttributesRaw {
649        let default_attributes = RoleAttributesRaw::new();
650        RoleAttributesRaw {
651            inherit: inherit.unwrap_or(default_attributes.inherit),
652            password,
653            scram_iterations,
654            superuser,
655            login,
656            _private: (),
657        }
658    }
659}
660
661/// Default variable values for a [`CatalogRole`].
662#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
663pub struct RoleVars {
664    /// Map of variable names to their value.
665    pub map: BTreeMap<String, OwnedVarInput>,
666}
667
668/// A role in a [`SessionCatalog`].
669pub trait CatalogRole {
670    /// Returns a fully-specified name of the role.
671    fn name(&self) -> &str;
672
673    /// Returns a stable ID for the role.
674    fn id(&self) -> RoleId;
675
676    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
677    /// membership.
678    ///
679    /// Key is the role that some role is a member of, value is the grantor role ID.
680    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
681
682    /// Returns the attributes associated with this role.
683    fn attributes(&self) -> &RoleAttributes;
684
685    /// Returns all variables that this role has a default value stored for.
686    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
687}
688
689/// A network policy in a [`SessionCatalog`].
690pub trait CatalogNetworkPolicy {
691    /// Returns a fully-specified name of the NetworkPolicy.
692    fn name(&self) -> &str;
693
694    /// Returns a stable ID for the NetworkPolicy.
695    fn id(&self) -> NetworkPolicyId;
696
697    /// Returns the ID of the owning NetworkPolicy.
698    fn owner_id(&self) -> RoleId;
699
700    /// Returns the privileges associated with the NetworkPolicy.
701    fn privileges(&self) -> &PrivilegeMap;
702}
703
704/// A cluster in a [`SessionCatalog`].
705pub trait CatalogCluster<'a> {
706    /// Returns a fully-specified name of the cluster.
707    fn name(&self) -> &str;
708
709    /// Returns a stable ID for the cluster.
710    fn id(&self) -> ClusterId;
711
712    /// Returns the objects that are bound to this cluster.
713    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
714
715    /// Returns the replicas of the cluster as a map from replica name to
716    /// replica ID.
717    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
718
719    /// Returns the replicas of the cluster.
720    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
721
722    /// Returns the replica belonging to the cluster with replica ID `id`.
723    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_>;
724
725    /// Returns the ID of the owning role.
726    fn owner_id(&self) -> RoleId;
727
728    /// Returns the privileges associated with the cluster.
729    fn privileges(&self) -> &PrivilegeMap;
730
731    /// Returns true if this cluster is a managed cluster.
732    fn is_managed(&self) -> bool;
733
734    /// Returns the size of the cluster, if the cluster is a managed cluster.
735    fn managed_size(&self) -> Option<&str>;
736
737    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
738    fn schedule(&self) -> Option<&ClusterSchedule>;
739
740    /// Try to convert this cluster into a [`CreateClusterPlan`].
741    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
742    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
743}
744
745/// A cluster replica in a [`SessionCatalog`]
746pub trait CatalogClusterReplica<'a>: Debug {
747    /// Returns the name of the cluster replica.
748    fn name(&self) -> &str;
749
750    /// Returns a stable ID for the cluster that the replica belongs to.
751    fn cluster_id(&self) -> ClusterId;
752
753    /// Returns a stable ID for the replica.
754    fn replica_id(&self) -> ReplicaId;
755
756    /// Returns the ID of the owning role.
757    fn owner_id(&self) -> RoleId;
758
759    /// Returns whether or not the replica is internal
760    fn internal(&self) -> bool;
761}
762
763/// An item in a [`SessionCatalog`].
764///
765/// Note that "item" has a very specific meaning in the context of a SQL
766/// catalog, and refers to the various entities that belong to a schema.
767pub trait CatalogItem {
768    /// Returns the fully qualified name of the catalog item.
769    fn name(&self) -> &QualifiedItemName;
770
771    /// Returns the [`CatalogItemId`] for the item.
772    fn id(&self) -> CatalogItemId;
773
774    /// Returns the [`GlobalId`]s associated with this item.
775    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
776
777    /// Returns the catalog item's OID.
778    fn oid(&self) -> u32;
779
780    /// Returns the resolved function.
781    ///
782    /// If the catalog item is not of a type that produces functions (i.e.,
783    /// anything other than a function), it returns an error.
784    fn func(&self) -> Result<&'static Func, CatalogError>;
785
786    /// Returns the resolved source connection.
787    ///
788    /// If the catalog item is not of a type that contains a `SourceDesc`
789    /// (i.e., anything other than sources), it returns an error.
790    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
791
792    /// Returns the resolved connection.
793    ///
794    /// If the catalog item is not a connection, it returns an error.
795    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
796
797    /// Returns the type of the catalog item.
798    fn item_type(&self) -> CatalogItemType;
799
800    /// A normalized SQL statement that describes how to create the catalog
801    /// item.
802    fn create_sql(&self) -> &str;
803
804    /// Returns the IDs of the catalog items upon which this catalog item
805    /// directly references.
806    fn references(&self) -> &ResolvedIds;
807
808    /// Returns the IDs of the catalog items upon which this catalog item
809    /// depends.
810    fn uses(&self) -> BTreeSet<CatalogItemId>;
811
812    /// Returns the IDs of the catalog items that directly reference this catalog item.
813    fn referenced_by(&self) -> &[CatalogItemId];
814
815    /// Returns the IDs of the catalog items that depend upon this catalog item.
816    fn used_by(&self) -> &[CatalogItemId];
817
818    /// Reports whether this catalog entry is a subsource and, if it is, the
819    /// ingestion it is an export of, as well as the item it exports.
820    fn subsource_details(
821        &self,
822    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
823
824    /// Reports whether this catalog entry is a source export and, if it is, the
825    /// ingestion it is an export of, as well as the item it exports.
826    fn source_export_details(
827        &self,
828    ) -> Option<(
829        CatalogItemId,
830        &UnresolvedItemName,
831        &SourceExportDetails,
832        &SourceExportDataConfig<ReferencedConnection>,
833    )>;
834
835    /// Reports whether this catalog item is a progress source.
836    fn is_progress_source(&self) -> bool;
837
838    /// If this catalog item is a source, it return the IDs of its progress collection.
839    fn progress_id(&self) -> Option<CatalogItemId>;
840
841    /// Returns the index details associated with the catalog item, if the
842    /// catalog item is an index.
843    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
844
845    /// Returns the column defaults associated with the catalog item, if the
846    /// catalog item is a table that accepts writes.
847    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
848
849    /// The item this catalog item replaces, if any.
850    fn replacement_target(&self) -> Option<CatalogItemId>;
851
852    /// Returns the type information associated with the catalog item, if the
853    /// catalog item is a type.
854    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
855
856    /// Returns the ID of the owning role.
857    fn owner_id(&self) -> RoleId;
858
859    /// Returns the privileges associated with the item.
860    fn privileges(&self) -> &PrivilegeMap;
861
862    /// Returns the cluster the item belongs to.
863    fn cluster_id(&self) -> Option<ClusterId>;
864
865    /// Returns the [`CatalogCollectionItem`] for a specific version of this
866    /// [`CatalogItem`].
867    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
868
869    /// The latest version of this item, if it's version-able.
870    fn latest_version(&self) -> Option<RelationVersion>;
871}
872
873/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
874/// refers to.
875pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
876    /// Returns a description of the result set produced by the catalog item.
877    ///
878    /// If the catalog item is not of a type that produces data (e.g., a sink or
879    /// an index), it returns `None`.
880    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>>;
881
882    /// The [`GlobalId`] for this item.
883    fn global_id(&self) -> GlobalId;
884}
885
886/// The type of a [`CatalogItem`].
887#[derive(
888    Debug,
889    Deserialize,
890    Clone,
891    Copy,
892    Eq,
893    Hash,
894    Ord,
895    PartialEq,
896    PartialOrd,
897    Serialize
898)]
899pub enum CatalogItemType {
900    /// A table.
901    Table,
902    /// A source.
903    Source,
904    /// A sink.
905    Sink,
906    /// A view.
907    View,
908    /// A materialized view.
909    MaterializedView,
910    /// An index.
911    Index,
912    /// A type.
913    Type,
914    /// A func.
915    Func,
916    /// A secret.
917    Secret,
918    /// A connection.
919    Connection,
920    /// A continual task.
921    ContinualTask,
922}
923
924impl CatalogItemType {
925    /// Reports whether the given type of item conflicts with items of type
926    /// `CatalogItemType::Type`.
927    ///
928    /// In PostgreSQL, even though types live in a separate namespace from other
929    /// schema objects, creating a table, view, or materialized view creates a
930    /// type named after that relation. This prevents creating a type with the
931    /// same name as a relational object, even though types and relational
932    /// objects live in separate namespaces. (Indexes are even weirder; while
933    /// they don't get a type with the same name, they get an entry in
934    /// `pg_class` that prevents *record* types of the same name as the index,
935    /// but not other types of types, like enums.)
936    ///
937    /// We don't presently construct types that mirror relational objects,
938    /// though we likely will need to in the future for full PostgreSQL
939    /// compatibility (see database-issues#7142). For now, we use this method to
940    /// prevent creating types and relational objects that have the same name, so
941    /// that it is a backwards compatible change in the future to introduce a
942    /// type named after each relational object in the system.
943    pub fn conflicts_with_type(&self) -> bool {
944        match self {
945            CatalogItemType::Table => true,
946            CatalogItemType::Source => true,
947            CatalogItemType::View => true,
948            CatalogItemType::MaterializedView => true,
949            CatalogItemType::Index => true,
950            CatalogItemType::Type => true,
951            CatalogItemType::Sink => false,
952            CatalogItemType::Func => false,
953            CatalogItemType::Secret => false,
954            CatalogItemType::Connection => false,
955            CatalogItemType::ContinualTask => true,
956        }
957    }
958}
959
960impl fmt::Display for CatalogItemType {
961    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
962        match self {
963            CatalogItemType::Table => f.write_str("table"),
964            CatalogItemType::Source => f.write_str("source"),
965            CatalogItemType::Sink => f.write_str("sink"),
966            CatalogItemType::View => f.write_str("view"),
967            CatalogItemType::MaterializedView => f.write_str("materialized view"),
968            CatalogItemType::Index => f.write_str("index"),
969            CatalogItemType::Type => f.write_str("type"),
970            CatalogItemType::Func => f.write_str("func"),
971            CatalogItemType::Secret => f.write_str("secret"),
972            CatalogItemType::Connection => f.write_str("connection"),
973            CatalogItemType::ContinualTask => f.write_str("continual task"),
974        }
975    }
976}
977
978impl From<CatalogItemType> for ObjectType {
979    fn from(value: CatalogItemType) -> Self {
980        match value {
981            CatalogItemType::Table => ObjectType::Table,
982            CatalogItemType::Source => ObjectType::Source,
983            CatalogItemType::Sink => ObjectType::Sink,
984            CatalogItemType::View => ObjectType::View,
985            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
986            CatalogItemType::Index => ObjectType::Index,
987            CatalogItemType::Type => ObjectType::Type,
988            CatalogItemType::Func => ObjectType::Func,
989            CatalogItemType::Secret => ObjectType::Secret,
990            CatalogItemType::Connection => ObjectType::Connection,
991            CatalogItemType::ContinualTask => ObjectType::ContinualTask,
992        }
993    }
994}
995
996impl From<CatalogItemType> for mz_audit_log::ObjectType {
997    fn from(value: CatalogItemType) -> Self {
998        match value {
999            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
1000            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
1001            CatalogItemType::View => mz_audit_log::ObjectType::View,
1002            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
1003            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
1004            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
1005            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
1006            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
1007            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
1008            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
1009            CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
1010        }
1011    }
1012}
1013
1014/// Details about a type in the catalog.
1015#[derive(Clone, Debug, Eq, PartialEq)]
1016pub struct CatalogTypeDetails<T: TypeReference> {
1017    /// The ID of the type with this type as the array element, if available.
1018    pub array_id: Option<CatalogItemId>,
1019    /// The description of this type.
1020    pub typ: CatalogType<T>,
1021    /// Additional metadata about the type in PostgreSQL, if relevant.
1022    pub pg_metadata: Option<CatalogTypePgMetadata>,
1023}
1024
1025/// Additional PostgreSQL metadata about a type.
1026#[derive(Clone, Debug, Eq, PartialEq)]
1027pub struct CatalogTypePgMetadata {
1028    /// The OID of the `typinput` function in PostgreSQL.
1029    pub typinput_oid: u32,
1030    /// The OID of the `typreceive` function in PostgreSQL.
1031    pub typreceive_oid: u32,
1032}
1033
1034/// Represents a reference to type in the catalog
1035pub trait TypeReference {
1036    /// The actual type used to reference a `CatalogType`
1037    type Reference: Clone + Debug + Eq + PartialEq;
1038}
1039
1040/// Reference to a type by it's name
1041#[derive(Clone, Debug, Eq, PartialEq)]
1042pub struct NameReference;
1043
1044impl TypeReference for NameReference {
1045    type Reference = &'static str;
1046}
1047
1048/// Reference to a type by it's global ID
1049#[derive(Clone, Debug, Eq, PartialEq)]
1050pub struct IdReference;
1051
1052impl TypeReference for IdReference {
1053    type Reference = CatalogItemId;
1054}
1055
1056/// A type stored in the catalog.
1057///
1058/// The variants correspond one-to-one with [`mz_repr::SqlScalarType`], but with type
1059/// modifiers removed and with embedded types replaced with references to other
1060/// types in the catalog.
1061#[allow(missing_docs)]
1062#[derive(Clone, Debug, Eq, PartialEq)]
1063pub enum CatalogType<T: TypeReference> {
1064    AclItem,
1065    Array {
1066        element_reference: T::Reference,
1067    },
1068    Bool,
1069    Bytes,
1070    Char,
1071    Date,
1072    Float32,
1073    Float64,
1074    Int16,
1075    Int32,
1076    Int64,
1077    UInt16,
1078    UInt32,
1079    UInt64,
1080    MzTimestamp,
1081    Interval,
1082    Jsonb,
1083    List {
1084        element_reference: T::Reference,
1085        element_modifiers: Vec<i64>,
1086    },
1087    Map {
1088        key_reference: T::Reference,
1089        key_modifiers: Vec<i64>,
1090        value_reference: T::Reference,
1091        value_modifiers: Vec<i64>,
1092    },
1093    Numeric,
1094    Oid,
1095    PgLegacyChar,
1096    PgLegacyName,
1097    Pseudo,
1098    Range {
1099        element_reference: T::Reference,
1100    },
1101    Record {
1102        fields: Vec<CatalogRecordField<T>>,
1103    },
1104    RegClass,
1105    RegProc,
1106    RegType,
1107    String,
1108    Time,
1109    Timestamp,
1110    TimestampTz,
1111    Uuid,
1112    VarChar,
1113    Int2Vector,
1114    MzAclItem,
1115}
1116
1117impl CatalogType<IdReference> {
1118    /// Returns the relation description for the type, if the type is a record
1119    /// type.
1120    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1121        match &self {
1122            CatalogType::Record { fields } => {
1123                let mut desc = RelationDesc::builder();
1124                for f in fields {
1125                    let name = f.name.clone();
1126                    let ty = query::scalar_type_from_catalog(
1127                        catalog,
1128                        f.type_reference,
1129                        &f.type_modifiers,
1130                    )?;
1131                    // TODO: support plumbing `NOT NULL` constraints through
1132                    // `CREATE TYPE`.
1133                    let ty = ty.nullable(true);
1134                    desc = desc.with_column(name, ty);
1135                }
1136                Ok(Some(desc.finish()))
1137            }
1138            _ => Ok(None),
1139        }
1140    }
1141}
1142
1143/// A description of a field in a [`CatalogType::Record`].
1144#[derive(Clone, Debug, Eq, PartialEq)]
1145pub struct CatalogRecordField<T: TypeReference> {
1146    /// The name of the field.
1147    pub name: ColumnName,
1148    /// The ID of the type of the field.
1149    pub type_reference: T::Reference,
1150    /// Modifiers to apply to the type.
1151    pub type_modifiers: Vec<i64>,
1152}
1153
1154#[derive(Clone, Debug, Eq, PartialEq)]
1155/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1156///
1157/// Note that Materialize also uses a number of pseudotypes when planning, but
1158/// we have yet to need to integrate them with `TypeCategory`.
1159///
1160/// [typcategory]:
1161/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1162pub enum TypeCategory {
1163    /// Array type.
1164    Array,
1165    /// Bit string type.
1166    BitString,
1167    /// Boolean type.
1168    Boolean,
1169    /// Composite type.
1170    Composite,
1171    /// Date/time type.
1172    DateTime,
1173    /// Enum type.
1174    Enum,
1175    /// Geometric type.
1176    Geometric,
1177    /// List type. Materialize specific.
1178    List,
1179    /// Network address type.
1180    NetworkAddress,
1181    /// Numeric type.
1182    Numeric,
1183    /// Pseudo type.
1184    Pseudo,
1185    /// Range type.
1186    Range,
1187    /// String type.
1188    String,
1189    /// Timestamp type.
1190    Timespan,
1191    /// User-defined type.
1192    UserDefined,
1193    /// Unknown type.
1194    Unknown,
1195}
1196
1197impl fmt::Display for TypeCategory {
1198    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1199        f.write_str(match self {
1200            TypeCategory::Array => "array",
1201            TypeCategory::BitString => "bit-string",
1202            TypeCategory::Boolean => "boolean",
1203            TypeCategory::Composite => "composite",
1204            TypeCategory::DateTime => "date-time",
1205            TypeCategory::Enum => "enum",
1206            TypeCategory::Geometric => "geometric",
1207            TypeCategory::List => "list",
1208            TypeCategory::NetworkAddress => "network-address",
1209            TypeCategory::Numeric => "numeric",
1210            TypeCategory::Pseudo => "pseudo",
1211            TypeCategory::Range => "range",
1212            TypeCategory::String => "string",
1213            TypeCategory::Timespan => "timespan",
1214            TypeCategory::UserDefined => "user-defined",
1215            TypeCategory::Unknown => "unknown",
1216        })
1217    }
1218}
1219
1220/// Identifies an environment.
1221///
1222/// Outside of tests, an environment ID can be constructed only from a string of
1223/// the following form:
1224///
1225/// ```text
1226/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1227/// ```
1228///
1229/// The fields have the following formats:
1230///
1231/// * The cloud provider consists of one or more alphanumeric characters.
1232/// * The cloud provider region consists of one or more alphanumeric or hyphen
1233///   characters.
1234/// * The organization ID is a UUID in its canonical text format.
1235/// * The ordinal is a decimal number with between one and eight digits.
1236///
1237/// There is no way to construct an environment ID from parts, to ensure that
1238/// the `Display` representation is parseable according to the above rules.
1239// NOTE(benesch): ideally we'd have accepted the components of the environment
1240// ID using separate command-line arguments, or at least a string format that
1241// used a field separator that did not appear in the fields. Alas. We can't
1242// easily change it now, as it's used as the e.g. default sink progress topic.
1243#[derive(Debug, Clone, PartialEq)]
1244pub struct EnvironmentId {
1245    cloud_provider: CloudProvider,
1246    cloud_provider_region: String,
1247    organization_id: Uuid,
1248    ordinal: u64,
1249}
1250
1251impl EnvironmentId {
1252    /// Creates a dummy `EnvironmentId` for use in tests.
1253    pub fn for_tests() -> EnvironmentId {
1254        EnvironmentId {
1255            cloud_provider: CloudProvider::Local,
1256            cloud_provider_region: "az1".into(),
1257            organization_id: Uuid::new_v4(),
1258            ordinal: 0,
1259        }
1260    }
1261
1262    /// Returns the cloud provider associated with this environment ID.
1263    pub fn cloud_provider(&self) -> &CloudProvider {
1264        &self.cloud_provider
1265    }
1266
1267    /// Returns the cloud provider region associated with this environment ID.
1268    pub fn cloud_provider_region(&self) -> &str {
1269        &self.cloud_provider_region
1270    }
1271
1272    /// Returns the name of the region associted with this environment ID.
1273    ///
1274    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1275    /// [`EnvironmentId::cloud_provider_region`].
1276    pub fn region(&self) -> String {
1277        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1278    }
1279
1280    /// Returns the organization ID associated with this environment ID.
1281    pub fn organization_id(&self) -> Uuid {
1282        self.organization_id
1283    }
1284
1285    /// Returns the ordinal associated with this environment ID.
1286    pub fn ordinal(&self) -> u64 {
1287        self.ordinal
1288    }
1289}
1290
1291// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1292// populated using this key. Consequently, any changes to that trait
1293// implementation will also have to be reflected in the existing feature
1294// targeting config in LaunchDarkly, otherwise environments might receive
1295// different configs upon restart.
1296impl fmt::Display for EnvironmentId {
1297    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1298        write!(
1299            f,
1300            "{}-{}-{}-{}",
1301            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1302        )
1303    }
1304}
1305
1306impl FromStr for EnvironmentId {
1307    type Err = InvalidEnvironmentIdError;
1308
1309    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1310        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1311            Regex::new(
1312                "^(?P<cloud_provider>[[:alnum:]]+)-\
1313                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1314                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1315                  (?P<ordinal>\\d{1,8})$"
1316            ).unwrap()
1317        });
1318        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1319        Ok(EnvironmentId {
1320            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1321            cloud_provider_region: captures["cloud_provider_region"].into(),
1322            organization_id: captures["organization_id"]
1323                .parse()
1324                .map_err(|_| InvalidEnvironmentIdError)?,
1325            ordinal: captures["ordinal"]
1326                .parse()
1327                .map_err(|_| InvalidEnvironmentIdError)?,
1328        })
1329    }
1330}
1331
1332/// The error type for [`EnvironmentId::from_str`].
1333#[derive(Debug, Clone, PartialEq)]
1334pub struct InvalidEnvironmentIdError;
1335
1336impl fmt::Display for InvalidEnvironmentIdError {
1337    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1338        f.write_str("invalid environment ID")
1339    }
1340}
1341
1342impl Error for InvalidEnvironmentIdError {}
1343
1344impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1345    fn from(_: InvalidCloudProviderError) -> Self {
1346        InvalidEnvironmentIdError
1347    }
1348}
1349
1350/// An error returned by the catalog.
1351#[derive(Clone, Debug, Eq, PartialEq)]
1352pub enum CatalogError {
1353    /// Unknown database.
1354    UnknownDatabase(String),
1355    /// Database already exists.
1356    DatabaseAlreadyExists(String),
1357    /// Unknown schema.
1358    UnknownSchema(String),
1359    /// Schema already exists.
1360    SchemaAlreadyExists(String),
1361    /// Unknown role.
1362    UnknownRole(String),
1363    /// Role already exists.
1364    RoleAlreadyExists(String),
1365    /// Network Policy already exists.
1366    NetworkPolicyAlreadyExists(String),
1367    /// Unknown cluster.
1368    UnknownCluster(String),
1369    /// Unexpected builtin cluster.
1370    UnexpectedBuiltinCluster(String),
1371    /// Unexpected builtin cluster.
1372    UnexpectedBuiltinClusterType(String),
1373    /// Cluster already exists.
1374    ClusterAlreadyExists(String),
1375    /// Unknown cluster replica.
1376    UnknownClusterReplica(String),
1377    /// Unknown cluster replica size.
1378    UnknownClusterReplicaSize(String),
1379    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1380    DuplicateReplica(String, String),
1381    /// Unknown item.
1382    UnknownItem(String),
1383    /// Item already exists.
1384    ItemAlreadyExists(CatalogItemId, String),
1385    /// Unknown function.
1386    UnknownFunction {
1387        /// The identifier of the function we couldn't find
1388        name: String,
1389        /// A suggested alternative to the named function.
1390        alternative: Option<String>,
1391    },
1392    /// Unknown type.
1393    UnknownType {
1394        /// The identifier of the type we couldn't find.
1395        name: String,
1396    },
1397    /// Unknown connection.
1398    UnknownConnection(String),
1399    /// Unknown network policy.
1400    UnknownNetworkPolicy(String),
1401    /// Expected the catalog item to have the given type, but it did not.
1402    UnexpectedType {
1403        /// The item's name.
1404        name: String,
1405        /// The actual type of the item.
1406        actual_type: CatalogItemType,
1407        /// The expected type of the item.
1408        expected_type: CatalogItemType,
1409    },
1410    /// Ran out of unique IDs.
1411    IdExhaustion,
1412    /// Ran out of unique OIDs.
1413    OidExhaustion,
1414    /// Timeline already exists.
1415    TimelineAlreadyExists(String),
1416    /// Id Allocator already exists.
1417    IdAllocatorAlreadyExists(String),
1418    /// Config already exists.
1419    ConfigAlreadyExists(String),
1420    /// Builtin migrations failed.
1421    FailedBuiltinSchemaMigration(String),
1422    /// StorageCollectionMetadata already exists.
1423    StorageCollectionMetadataAlreadyExists(GlobalId),
1424}
1425
1426impl fmt::Display for CatalogError {
1427    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1428        match self {
1429            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1430            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1431            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1432            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1433            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1434            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1435            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1436            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1437            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1438            Self::NetworkPolicyAlreadyExists(name) => {
1439                write!(f, "network policy '{name}' already exists")
1440            }
1441            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1442            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1443            Self::UnexpectedBuiltinCluster(name) => {
1444                write!(f, "Unexpected builtin cluster '{}'", name)
1445            }
1446            Self::UnexpectedBuiltinClusterType(name) => {
1447                write!(f, "Unexpected builtin cluster type'{}'", name)
1448            }
1449            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1450            Self::UnknownClusterReplica(name) => {
1451                write!(f, "unknown cluster replica '{}'", name)
1452            }
1453            Self::UnknownClusterReplicaSize(name) => {
1454                write!(f, "unknown cluster replica size '{}'", name)
1455            }
1456            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1457                f,
1458                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1459            ),
1460            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1461            Self::ItemAlreadyExists(_gid, name) => {
1462                write!(f, "catalog item '{name}' already exists")
1463            }
1464            Self::UnexpectedType {
1465                name,
1466                actual_type,
1467                expected_type,
1468            } => {
1469                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1470            }
1471            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1472            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1473            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1474            Self::IdAllocatorAlreadyExists(name) => {
1475                write!(f, "ID allocator '{name}' already exists")
1476            }
1477            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1478            Self::FailedBuiltinSchemaMigration(objects) => {
1479                write!(f, "failed to migrate schema of builtin objects: {objects}")
1480            }
1481            Self::StorageCollectionMetadataAlreadyExists(key) => {
1482                write!(f, "storage metadata for '{key}' already exists")
1483            }
1484        }
1485    }
1486}
1487
1488impl CatalogError {
1489    /// Returns any applicable hints for [`CatalogError`].
1490    pub fn hint(&self) -> Option<String> {
1491        match self {
1492            CatalogError::UnknownFunction { alternative, .. } => {
1493                match alternative {
1494                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1495                    Some(alt) => Some(format!("Try using {alt}")),
1496                }
1497            }
1498            _ => None,
1499        }
1500    }
1501}
1502
1503impl Error for CatalogError {}
1504
1505// Enum variant docs would be useless here.
1506#[allow(missing_docs)]
1507#[derive(
1508    Debug,
1509    Clone,
1510    PartialOrd,
1511    Ord,
1512    PartialEq,
1513    Eq,
1514    Hash,
1515    Copy,
1516    Deserialize,
1517    Serialize
1518)]
1519/// The types of objects stored in the catalog.
1520pub enum ObjectType {
1521    Table,
1522    View,
1523    MaterializedView,
1524    Source,
1525    Sink,
1526    Index,
1527    Type,
1528    Role,
1529    Cluster,
1530    ClusterReplica,
1531    Secret,
1532    Connection,
1533    Database,
1534    Schema,
1535    Func,
1536    ContinualTask,
1537    NetworkPolicy,
1538}
1539
1540impl ObjectType {
1541    /// Reports if the object type can be treated as a relation.
1542    pub fn is_relation(&self) -> bool {
1543        match self {
1544            ObjectType::Table
1545            | ObjectType::View
1546            | ObjectType::MaterializedView
1547            | ObjectType::Source
1548            | ObjectType::ContinualTask => true,
1549            ObjectType::Sink
1550            | ObjectType::Index
1551            | ObjectType::Type
1552            | ObjectType::Secret
1553            | ObjectType::Connection
1554            | ObjectType::Func
1555            | ObjectType::Database
1556            | ObjectType::Schema
1557            | ObjectType::Cluster
1558            | ObjectType::ClusterReplica
1559            | ObjectType::Role
1560            | ObjectType::NetworkPolicy => false,
1561        }
1562    }
1563}
1564
1565impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1566    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1567        match value {
1568            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1569            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1570            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1571            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1572            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1573            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1574            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1575            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1576            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1577            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1578            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1579            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1580            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1581            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1582            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1583            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1584            mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask,
1585            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1586        }
1587    }
1588}
1589
1590impl From<CommentObjectId> for ObjectType {
1591    fn from(value: CommentObjectId) -> ObjectType {
1592        match value {
1593            CommentObjectId::Table(_) => ObjectType::Table,
1594            CommentObjectId::View(_) => ObjectType::View,
1595            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1596            CommentObjectId::Source(_) => ObjectType::Source,
1597            CommentObjectId::Sink(_) => ObjectType::Sink,
1598            CommentObjectId::Index(_) => ObjectType::Index,
1599            CommentObjectId::Func(_) => ObjectType::Func,
1600            CommentObjectId::Connection(_) => ObjectType::Connection,
1601            CommentObjectId::Type(_) => ObjectType::Type,
1602            CommentObjectId::Secret(_) => ObjectType::Secret,
1603            CommentObjectId::Role(_) => ObjectType::Role,
1604            CommentObjectId::Database(_) => ObjectType::Database,
1605            CommentObjectId::Schema(_) => ObjectType::Schema,
1606            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1607            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1608            CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1609            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1610        }
1611    }
1612}
1613
1614impl Display for ObjectType {
1615    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1616        f.write_str(match self {
1617            ObjectType::Table => "TABLE",
1618            ObjectType::View => "VIEW",
1619            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1620            ObjectType::Source => "SOURCE",
1621            ObjectType::Sink => "SINK",
1622            ObjectType::Index => "INDEX",
1623            ObjectType::Type => "TYPE",
1624            ObjectType::Role => "ROLE",
1625            ObjectType::Cluster => "CLUSTER",
1626            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1627            ObjectType::Secret => "SECRET",
1628            ObjectType::Connection => "CONNECTION",
1629            ObjectType::Database => "DATABASE",
1630            ObjectType::Schema => "SCHEMA",
1631            ObjectType::Func => "FUNCTION",
1632            ObjectType::ContinualTask => "CONTINUAL TASK",
1633            ObjectType::NetworkPolicy => "NETWORK POLICY",
1634        })
1635    }
1636}
1637
1638#[derive(
1639    Debug,
1640    Clone,
1641    PartialOrd,
1642    Ord,
1643    PartialEq,
1644    Eq,
1645    Hash,
1646    Copy,
1647    Deserialize,
1648    Serialize
1649)]
1650/// The types of objects in the system.
1651pub enum SystemObjectType {
1652    /// Catalog object type.
1653    Object(ObjectType),
1654    /// Entire system.
1655    System,
1656}
1657
1658impl SystemObjectType {
1659    /// Reports if the object type can be treated as a relation.
1660    pub fn is_relation(&self) -> bool {
1661        match self {
1662            SystemObjectType::Object(object_type) => object_type.is_relation(),
1663            SystemObjectType::System => false,
1664        }
1665    }
1666}
1667
1668impl Display for SystemObjectType {
1669    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1670        match self {
1671            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1672            SystemObjectType::System => f.write_str("SYSTEM"),
1673        }
1674    }
1675}
1676
1677/// Enum used to format object names in error messages.
1678#[derive(Debug, Clone, PartialEq, Eq)]
1679pub enum ErrorMessageObjectDescription {
1680    /// The name of a specific object.
1681    Object {
1682        /// Type of object.
1683        object_type: ObjectType,
1684        /// Name of object.
1685        object_name: Option<String>,
1686    },
1687    /// The name of the entire system.
1688    System,
1689}
1690
1691impl ErrorMessageObjectDescription {
1692    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1693    pub fn from_id(
1694        object_id: &ObjectId,
1695        catalog: &dyn SessionCatalog,
1696    ) -> ErrorMessageObjectDescription {
1697        let object_name = match object_id {
1698            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1699            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1700                .get_cluster_replica(*cluster_id, *replica_id)
1701                .name()
1702                .to_string(),
1703            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1704            ObjectId::Schema((database_spec, schema_spec)) => {
1705                let name = catalog.get_schema(database_spec, schema_spec).name();
1706                catalog.resolve_full_schema_name(name).to_string()
1707            }
1708            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1709            ObjectId::Item(id) => {
1710                let name = catalog.get_item(id).name();
1711                catalog.resolve_full_name(name).to_string()
1712            }
1713            ObjectId::NetworkPolicy(network_policy_id) => catalog
1714                .get_network_policy(network_policy_id)
1715                .name()
1716                .to_string(),
1717        };
1718        ErrorMessageObjectDescription::Object {
1719            object_type: catalog.get_object_type(object_id),
1720            object_name: Some(object_name),
1721        }
1722    }
1723
1724    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1725    pub fn from_sys_id(
1726        object_id: &SystemObjectId,
1727        catalog: &dyn SessionCatalog,
1728    ) -> ErrorMessageObjectDescription {
1729        match object_id {
1730            SystemObjectId::Object(object_id) => {
1731                ErrorMessageObjectDescription::from_id(object_id, catalog)
1732            }
1733            SystemObjectId::System => ErrorMessageObjectDescription::System,
1734        }
1735    }
1736
1737    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1738    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1739        match object_type {
1740            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1741                object_type,
1742                object_name: None,
1743            },
1744            SystemObjectType::System => ErrorMessageObjectDescription::System,
1745        }
1746    }
1747}
1748
1749impl Display for ErrorMessageObjectDescription {
1750    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1751        match self {
1752            ErrorMessageObjectDescription::Object {
1753                object_type,
1754                object_name,
1755            } => {
1756                let object_name = object_name
1757                    .as_ref()
1758                    .map(|object_name| format!(" {}", object_name.quoted()))
1759                    .unwrap_or_else(|| "".to_string());
1760                write!(f, "{object_type}{object_name}")
1761            }
1762            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1763        }
1764    }
1765}
1766
1767#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1768// These attributes are needed because the key of a map must be a string. We also
1769// get the added benefit of flattening this struct in it's serialized form.
1770#[serde(into = "BTreeMap<String, RoleId>")]
1771#[serde(try_from = "BTreeMap<String, RoleId>")]
1772/// Represents the grantee and a grantor of a role membership.
1773pub struct RoleMembership {
1774    /// Key is the role that some role is a member of, value is the grantor role ID.
1775    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1776    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1777    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1778    // migration.
1779    pub map: BTreeMap<RoleId, RoleId>,
1780}
1781
1782impl RoleMembership {
1783    /// Creates a new [`RoleMembership`].
1784    pub fn new() -> RoleMembership {
1785        RoleMembership {
1786            map: BTreeMap::new(),
1787        }
1788    }
1789}
1790
1791impl From<RoleMembership> for BTreeMap<String, RoleId> {
1792    fn from(value: RoleMembership) -> Self {
1793        value
1794            .map
1795            .into_iter()
1796            .map(|(k, v)| (k.to_string(), v))
1797            .collect()
1798    }
1799}
1800
1801impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1802    type Error = anyhow::Error;
1803
1804    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1805        Ok(RoleMembership {
1806            map: value
1807                .into_iter()
1808                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1809                .collect::<Result<_, anyhow::Error>>()?,
1810        })
1811    }
1812}
1813
1814/// Specification for objects that will be affected by a default privilege.
1815#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1816pub struct DefaultPrivilegeObject {
1817    /// The role id that created the object.
1818    pub role_id: RoleId,
1819    /// The database that the object is created in if Some, otherwise all databases.
1820    pub database_id: Option<DatabaseId>,
1821    /// The schema that the object is created in if Some, otherwise all databases.
1822    pub schema_id: Option<SchemaId>,
1823    /// The type of object.
1824    pub object_type: ObjectType,
1825}
1826
1827impl DefaultPrivilegeObject {
1828    /// Creates a new [`DefaultPrivilegeObject`].
1829    pub fn new(
1830        role_id: RoleId,
1831        database_id: Option<DatabaseId>,
1832        schema_id: Option<SchemaId>,
1833        object_type: ObjectType,
1834    ) -> DefaultPrivilegeObject {
1835        DefaultPrivilegeObject {
1836            role_id,
1837            database_id,
1838            schema_id,
1839            object_type,
1840        }
1841    }
1842}
1843
1844impl std::fmt::Display for DefaultPrivilegeObject {
1845    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1846        // TODO: Don't just wrap Debug.
1847        write!(f, "{self:?}")
1848    }
1849}
1850
1851/// Specification for the privileges that will be granted from default privileges.
1852#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1853pub struct DefaultPrivilegeAclItem {
1854    /// The role that will receive the privileges.
1855    pub grantee: RoleId,
1856    /// The specific privileges granted.
1857    pub acl_mode: AclMode,
1858}
1859
1860impl DefaultPrivilegeAclItem {
1861    /// Creates a new [`DefaultPrivilegeAclItem`].
1862    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1863        DefaultPrivilegeAclItem { grantee, acl_mode }
1864    }
1865
1866    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1867    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1868        MzAclItem {
1869            grantee: self.grantee,
1870            grantor,
1871            acl_mode: self.acl_mode,
1872        }
1873    }
1874}
1875
1876/// Which builtins to return in `BUILTINS::iter`.
1877///
1878/// All calls to `BUILTINS::iter` within the lifetime of an environmentd process
1879/// must provide an equal `BuiltinsConfig`. It is not allowed to change
1880/// dynamically.
1881#[derive(Debug, Clone)]
1882pub struct BuiltinsConfig {
1883    /// If true, include system builtin continual tasks.
1884    pub include_continual_tasks: bool,
1885}
1886
1887#[cfg(test)]
1888mod tests {
1889    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1890
1891    #[mz_ore::test]
1892    fn test_environment_id() {
1893        for (input, expected) in [
1894            (
1895                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1896                Ok(EnvironmentId {
1897                    cloud_provider: CloudProvider::Local,
1898                    cloud_provider_region: "az1".into(),
1899                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1900                    ordinal: 452,
1901                }),
1902            ),
1903            (
1904                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1905                Ok(EnvironmentId {
1906                    cloud_provider: CloudProvider::Aws,
1907                    cloud_provider_region: "us-east-1".into(),
1908                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1909                    ordinal: 0,
1910                }),
1911            ),
1912            (
1913                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1914                Ok(EnvironmentId {
1915                    cloud_provider: CloudProvider::Gcp,
1916                    cloud_provider_region: "us-central1".into(),
1917                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1918                    ordinal: 0,
1919                }),
1920            ),
1921            (
1922                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1923                Ok(EnvironmentId {
1924                    cloud_provider: CloudProvider::Azure,
1925                    cloud_provider_region: "australiaeast".into(),
1926                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1927                    ordinal: 0,
1928                }),
1929            ),
1930            (
1931                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1932                Ok(EnvironmentId {
1933                    cloud_provider: CloudProvider::Generic,
1934                    cloud_provider_region: "moon-station-11-darkside".into(),
1935                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1936                    ordinal: 0,
1937                }),
1938            ),
1939            ("", Err(InvalidEnvironmentIdError)),
1940            (
1941                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1942                Err(InvalidEnvironmentIdError),
1943            ),
1944            (
1945                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1946                Err(InvalidEnvironmentIdError),
1947            ),
1948            (
1949                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1950                Err(InvalidEnvironmentIdError),
1951            ),
1952        ] {
1953            let actual = input.parse();
1954            assert_eq!(expected, actual, "input = {}", input);
1955            if let Ok(actual) = actual {
1956                assert_eq!(input, actual.to_string(), "input = {}", input);
1957            }
1958        }
1959    }
1960}