mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::num::NonZeroU32;
20use std::str::FromStr;
21use std::sync::LazyLock;
22use std::time::{Duration, Instant};
23
24use chrono::{DateTime, Utc};
25use mz_auth::password::Password;
26use mz_build_info::BuildInfo;
27use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
28use mz_controller_types::{ClusterId, ReplicaId};
29use mz_expr::MirScalarExpr;
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_ore::str::StrExt;
32use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
33use mz_repr::explain::ExprHumanizer;
34use mz_repr::network_policy_id::NetworkPolicyId;
35use mz_repr::role_id::RoleId;
36use mz_repr::{
37    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
38};
39use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
40use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
41use mz_storage_types::connections::{Connection, ConnectionContext};
42use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
43use proptest_derive::Arbitrary;
44use regex::Regex;
45use serde::{Deserialize, Serialize};
46use uuid::Uuid;
47
48use crate::func::Func;
49use crate::names::{
50    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
51    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
52    SchemaSpecifier, SystemObjectId,
53};
54use crate::plan::statement::StatementDesc;
55use crate::plan::statement::ddl::PlannedRoleAttributes;
56use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
57use crate::session::vars::{OwnedVarInput, SystemVars};
58
59/// A catalog keeps track of SQL objects and session state available to the
60/// planner.
61///
62/// The `sql` crate is agnostic to any particular catalog implementation. This
63/// trait describes the required interface.
64///
65/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
66/// catalog contains databases, databases contain schemas, and schemas contain
67/// catalog items, like sources, sinks, view, and indexes.
68///
69/// There are two classes of operations provided by a catalog:
70///
71///   * Resolution operations, like [`resolve_item`]. These fill in missing name
72///     components based upon connection defaults, e.g., resolving the partial
73///     name `view42` to the fully-specified name `materialize.public.view42`.
74///
75///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
76///     metadata about a catalog entity based on a fully-specified name that is
77///     known to be valid (i.e., because the name was successfully resolved, or
78///     was constructed based on the output of a prior lookup operation). These
79///     functions panic if called with invalid input.
80///
81///   * Session management, such as managing variables' states and adding
82///     notices to the session.
83///
84/// [`get_databases`]: SessionCatalog::get_databases
85/// [`get_item`]: SessionCatalog::get_item
86/// [`resolve_item`]: SessionCatalog::resolve_item
87pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
88    /// Returns the id of the role that is issuing the query.
89    fn active_role_id(&self) -> &RoleId;
90
91    /// Returns the database to use if one is not explicitly specified.
92    fn active_database_name(&self) -> Option<&str> {
93        self.active_database()
94            .map(|id| self.get_database(id))
95            .map(|db| db.name())
96    }
97
98    /// Returns the database to use if one is not explicitly specified.
99    fn active_database(&self) -> Option<&DatabaseId>;
100
101    /// Returns the cluster to use if one is not explicitly specified.
102    fn active_cluster(&self) -> &str;
103
104    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
105    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
106
107    /// Returns the descriptor of the named prepared statement on the session, or
108    /// None if the prepared statement does not exist.
109    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
110
111    /// Retrieves a reference to the specified portal's descriptor.
112    ///
113    /// If there is no such portal, returns `None`.
114    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc>;
115
116    /// Resolves the named database.
117    ///
118    /// If `database_name` exists in the catalog, it returns a reference to the
119    /// resolved database; otherwise it returns an error.
120    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
121
122    /// Gets a database by its ID.
123    ///
124    /// Panics if `id` does not specify a valid database.
125    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
126
127    /// Gets all databases.
128    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
129
130    /// Resolves a partially-specified schema name.
131    ///
132    /// If the schema exists in the catalog, it returns a reference to the
133    /// resolved schema; otherwise it returns an error.
134    fn resolve_schema(
135        &self,
136        database_name: Option<&str>,
137        schema_name: &str,
138    ) -> Result<&dyn CatalogSchema, CatalogError>;
139
140    /// Resolves a schema name within a specified database.
141    ///
142    /// If the schema exists in the database, it returns a reference to the
143    /// resolved schema; otherwise it returns an error.
144    fn resolve_schema_in_database(
145        &self,
146        database_spec: &ResolvedDatabaseSpecifier,
147        schema_name: &str,
148    ) -> Result<&dyn CatalogSchema, CatalogError>;
149
150    /// Gets a schema by its ID.
151    ///
152    /// Panics if `id` does not specify a valid schema.
153    fn get_schema(
154        &self,
155        database_spec: &ResolvedDatabaseSpecifier,
156        schema_spec: &SchemaSpecifier,
157    ) -> &dyn CatalogSchema;
158
159    /// Gets all schemas.
160    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
161
162    /// Gets the mz_internal schema id.
163    fn get_mz_internal_schema_id(&self) -> SchemaId;
164
165    /// Gets the mz_unsafe schema id.
166    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
167
168    /// Returns true if `schema` is an internal system schema, false otherwise
169    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
170
171    /// Resolves the named role.
172    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
173
174    /// Resolves the named network policy.
175    fn resolve_network_policy(
176        &self,
177        network_policy_name: &str,
178    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
179
180    /// Gets a role by its ID.
181    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
182
183    /// Gets a role by its ID.
184    ///
185    /// Panics if `id` does not specify a valid role.
186    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
187
188    /// Gets all roles.
189    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
190
191    /// Gets the id of the `mz_system` role.
192    fn mz_system_role_id(&self) -> RoleId;
193
194    /// Collects all role IDs that `id` is transitively a member of.
195    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
196
197    /// Resolves the named cluster.
198    /// Gets a network_policy by its ID.
199    ///
200    /// Panics if `id` does not specify a valid role.
201    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
202
203    /// Gets all roles.
204    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
205
206    ///
207    /// If the provided name is `None`, resolves the currently active cluster.
208    fn resolve_cluster<'a, 'b>(
209        &'a self,
210        cluster_name: Option<&'b str>,
211    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
212
213    /// Resolves the named cluster replica.
214    fn resolve_cluster_replica<'a, 'b>(
215        &'a self,
216        cluster_replica_name: &'b QualifiedReplica,
217    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
218
219    /// Resolves a partially-specified item name, that is NOT a function or
220    /// type. (For resolving functions or types, please use
221    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
222    ///
223    /// If the partial name has a database component, it searches only the
224    /// specified database; otherwise, it searches the active database. If the
225    /// partial name has a schema component, it searches only the specified
226    /// schema; otherwise, it searches a default set of schemas within the
227    /// selected database. It returns an error if none of the searched schemas
228    /// contain an item whose name matches the item component of the partial
229    /// name.
230    ///
231    /// Note that it is not an error if the named item appears in more than one
232    /// of the search schemas. The catalog implementation must choose one.
233    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
234
235    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
236    /// functions within the catalog.
237    fn resolve_function(
238        &self,
239        item_name: &PartialItemName,
240    ) -> Result<&dyn CatalogItem, CatalogError>;
241
242    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
243    /// types within the catalog.
244    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
245
246    /// Resolves `name` to a type or item, preferring the type if both exist.
247    fn resolve_item_or_type(
248        &self,
249        name: &PartialItemName,
250    ) -> Result<&dyn CatalogItem, CatalogError> {
251        if let Ok(ty) = self.resolve_type(name) {
252            return Ok(ty);
253        }
254        self.resolve_item(name)
255    }
256
257    /// Gets a type named `name` from exactly one of the system schemas.
258    ///
259    /// # Panics
260    /// - If `name` is not an entry in any system schema
261    /// - If more than one system schema has an entry named `name`.
262    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
263
264    /// Gets an item by its ID.
265    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
266
267    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
268    /// exist.
269    ///
270    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
271    fn try_get_item_by_global_id<'a>(
272        &'a self,
273        id: &GlobalId,
274    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
275
276    /// Gets an item by its ID.
277    ///
278    /// Panics if `id` does not specify a valid item.
279    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
280
281    /// Gets an item by a [`GlobalId`].
282    ///
283    /// Panics if `id` does not specify a valid item.
284    ///
285    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
286    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
287
288    /// Gets all items.
289    fn get_items(&self) -> Vec<&dyn CatalogItem>;
290
291    /// Looks up an item by its name.
292    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
293
294    /// Looks up a type by its name.
295    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
296
297    /// Gets a cluster by ID.
298    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster<'_>;
299
300    /// Gets all clusters.
301    fn get_clusters(&self) -> Vec<&dyn CatalogCluster<'_>>;
302
303    /// Gets a cluster replica by ID.
304    fn get_cluster_replica(
305        &self,
306        cluster_id: ClusterId,
307        replica_id: ReplicaId,
308    ) -> &dyn CatalogClusterReplica<'_>;
309
310    /// Gets all cluster replicas.
311    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
312
313    /// Gets all system privileges.
314    fn get_system_privileges(&self) -> &PrivilegeMap;
315
316    /// Gets all default privileges.
317    fn get_default_privileges(
318        &self,
319    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
320
321    /// Finds a name like `name` that is not already in use.
322    ///
323    /// If `name` itself is available, it is returned unchanged.
324    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
325
326    /// Returns a fully qualified human readable name from fully qualified non-human readable name
327    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
328
329    /// Returns a fully qualified human readable schema name from fully qualified non-human
330    /// readable schema name
331    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
332
333    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
334    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
335
336    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
337    fn resolve_global_id(
338        &self,
339        item_id: &CatalogItemId,
340        version: RelationVersionSelector,
341    ) -> GlobalId;
342
343    /// Returns the configuration of the catalog.
344    fn config(&self) -> &CatalogConfig;
345
346    /// Returns the number of milliseconds since the system epoch. For normal use
347    /// this means the Unix epoch. This can safely be mocked in tests and start
348    /// at 0.
349    fn now(&self) -> EpochMillis;
350
351    /// Returns the set of supported AWS PrivateLink availability zone ids.
352    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
353
354    /// Returns system vars
355    fn system_vars(&self) -> &SystemVars;
356
357    /// Returns mutable system vars
358    ///
359    /// Clients should use this this method carefully, as changes to the backing
360    /// state here are not guarateed to be persisted. The motivating use case
361    /// for this method was ensuring that features are temporary turned on so
362    /// catalog rehydration does not break due to unsupported SQL syntax.
363    fn system_vars_mut(&mut self) -> &mut SystemVars;
364
365    /// Returns the [`RoleId`] of the owner of an object by its ID.
366    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
367
368    /// Returns the [`PrivilegeMap`] of the object.
369    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
370
371    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
372    ///
373    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
374    /// earlier in the list than the roots. This is particularly userful for the order to drop
375    /// objects.
376    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
377
378    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
379    ///
380    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
381    /// earlier in the list than `id`. This is particularly userful for the order to drop
382    /// objects.
383    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
384
385    /// Returns all possible privileges associated with an object type.
386    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
387
388    /// Returns the object type of `object_id`.
389    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
390
391    /// Returns the system object type of `id`.
392    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
393
394    /// Returns the minimal qualification required to unambiguously specify
395    /// `qualified_name`.
396    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
397
398    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
399    /// successfully executes.
400    fn add_notice(&self, notice: PlanNotice);
401
402    /// Returns the associated comments for the given `id`
403    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
404
405    /// Reports whether the specified cluster size is a modern "cc" size rather
406    /// than a legacy T-shirt size.
407    fn is_cluster_size_cc(&self, size: &str) -> bool;
408}
409
410/// Configuration associated with a catalog.
411#[derive(Debug, Clone)]
412pub struct CatalogConfig {
413    /// Returns the time at which the catalog booted.
414    pub start_time: DateTime<Utc>,
415    /// Returns the instant at which the catalog booted.
416    pub start_instant: Instant,
417    /// A random integer associated with this instance of the catalog.
418    ///
419    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
420    /// topics. Perhaps we can remove this when database-issues#977 is complete.
421    pub nonce: u64,
422    /// A persistent ID associated with the environment.
423    pub environment_id: EnvironmentId,
424    /// A transient UUID associated with this process.
425    pub session_id: Uuid,
426    /// Information about this build of Materialize.
427    pub build_info: &'static BuildInfo,
428    /// Default timestamp interval.
429    pub timestamp_interval: Duration,
430    /// Function that returns a wall clock now time; can safely be mocked to return
431    /// 0.
432    pub now: NowFn,
433    /// Context for source and sink connections.
434    pub connection_context: ConnectionContext,
435    /// Which system builtins to include. Not allowed to change dynamically.
436    pub builtins_cfg: BuiltinsConfig,
437    /// Helm chart version
438    pub helm_chart_version: Option<String>,
439}
440
441/// A database in a [`SessionCatalog`].
442pub trait CatalogDatabase {
443    /// Returns a fully-specified name of the database.
444    fn name(&self) -> &str;
445
446    /// Returns a stable ID for the database.
447    fn id(&self) -> DatabaseId;
448
449    /// Returns whether the database contains schemas.
450    fn has_schemas(&self) -> bool;
451
452    /// Returns the schemas of the database as a map from schema name to
453    /// schema ID.
454    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
455
456    /// Returns the schemas of the database.
457    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
458
459    /// Returns the ID of the owning role.
460    fn owner_id(&self) -> RoleId;
461
462    /// Returns the privileges associated with the database.
463    fn privileges(&self) -> &PrivilegeMap;
464}
465
466/// A schema in a [`SessionCatalog`].
467pub trait CatalogSchema {
468    /// Returns a fully-specified id of the database
469    fn database(&self) -> &ResolvedDatabaseSpecifier;
470
471    /// Returns a fully-specified name of the schema.
472    fn name(&self) -> &QualifiedSchemaName;
473
474    /// Returns a stable ID for the schema.
475    fn id(&self) -> &SchemaSpecifier;
476
477    /// Lists the `CatalogItem`s for the schema.
478    fn has_items(&self) -> bool;
479
480    /// Returns the IDs of the items in the schema.
481    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
482
483    /// Returns the ID of the owning role.
484    fn owner_id(&self) -> RoleId;
485
486    /// Returns the privileges associated with the schema.
487    fn privileges(&self) -> &PrivilegeMap;
488}
489
490/// Parameters used to modify password
491#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
492pub struct PasswordConfig {
493    /// The Password.
494    pub password: Password,
495    /// a non default iteration count for hashing the password.
496    pub scram_iterations: NonZeroU32,
497}
498
499/// A modification of a role password in the catalog
500#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
501pub enum PasswordAction {
502    /// Set a new password.
503    Set(PasswordConfig),
504    /// Remove the existing password.
505    Clear,
506    /// Leave the existing password unchanged.
507    NoChange,
508}
509
510/// A raw representation of attributes belonging to a [`CatalogRole`] that we might
511/// get as input from the user. This includes the password.
512/// This struct explicitly does not implement `Serialize` or `Deserialize` to avoid
513/// accidentally serializing passwords.
514#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
515pub struct RoleAttributesRaw {
516    /// Indicates whether the role has inheritance of privileges.
517    pub inherit: bool,
518    /// The raw password of the role. This is for self managed auth, not cloud.
519    pub password: Option<Password>,
520    /// Hash iterations used to securely store passwords. This is for self-managed auth
521    pub scram_iterations: Option<NonZeroU32>,
522    /// Whether or not this user is a superuser.
523    pub superuser: Option<bool>,
524    /// Whether this role is login
525    pub login: Option<bool>,
526    // Force use of constructor.
527    _private: (),
528}
529
530/// Attributes belonging to a [`CatalogRole`].
531#[derive(Debug, Clone, Eq, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Arbitrary)]
532pub struct RoleAttributes {
533    /// Indicates whether the role has inheritance of privileges.
534    pub inherit: bool,
535    /// Whether or not this user is a superuser.
536    pub superuser: Option<bool>,
537    /// Whether this role is login
538    pub login: Option<bool>,
539    // Force use of constructor.
540    _private: (),
541}
542
543impl RoleAttributesRaw {
544    /// Creates a new [`RoleAttributesRaw`] with default attributes.
545    pub const fn new() -> RoleAttributesRaw {
546        RoleAttributesRaw {
547            inherit: true,
548            password: None,
549            scram_iterations: None,
550            superuser: None,
551            login: None,
552            _private: (),
553        }
554    }
555
556    /// Adds all attributes excluding password.
557    pub const fn with_all(mut self) -> RoleAttributesRaw {
558        self.inherit = true;
559        self.superuser = Some(true);
560        self.login = Some(true);
561        self
562    }
563}
564
565impl RoleAttributes {
566    /// Creates a new [`RoleAttributes`] with default attributes.
567    pub const fn new() -> RoleAttributes {
568        RoleAttributes {
569            inherit: true,
570            superuser: None,
571            login: None,
572            _private: (),
573        }
574    }
575
576    /// Adds all attributes except password.
577    pub const fn with_all(mut self) -> RoleAttributes {
578        self.inherit = true;
579        self.superuser = Some(true);
580        self.login = Some(true);
581        self
582    }
583
584    /// Returns whether or not the role has inheritence of privileges.
585    pub const fn is_inherit(&self) -> bool {
586        self.inherit
587    }
588}
589
590impl From<RoleAttributesRaw> for RoleAttributes {
591    fn from(
592        RoleAttributesRaw {
593            inherit,
594            superuser,
595            login,
596            ..
597        }: RoleAttributesRaw,
598    ) -> RoleAttributes {
599        RoleAttributes {
600            inherit,
601            superuser,
602            login,
603            _private: (),
604        }
605    }
606}
607
608impl From<RoleAttributes> for RoleAttributesRaw {
609    fn from(
610        RoleAttributes {
611            inherit,
612            superuser,
613            login,
614            ..
615        }: RoleAttributes,
616    ) -> RoleAttributesRaw {
617        RoleAttributesRaw {
618            inherit,
619            password: None,
620            scram_iterations: None,
621            superuser,
622            login,
623            _private: (),
624        }
625    }
626}
627
628impl From<PlannedRoleAttributes> for RoleAttributesRaw {
629    fn from(
630        PlannedRoleAttributes {
631            inherit,
632            password,
633            scram_iterations,
634            superuser,
635            login,
636            ..
637        }: PlannedRoleAttributes,
638    ) -> RoleAttributesRaw {
639        let default_attributes = RoleAttributesRaw::new();
640        RoleAttributesRaw {
641            inherit: inherit.unwrap_or(default_attributes.inherit),
642            password,
643            scram_iterations,
644            superuser,
645            login,
646            _private: (),
647        }
648    }
649}
650
651/// Default variable values for a [`CatalogRole`].
652#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
653pub struct RoleVars {
654    /// Map of variable names to their value.
655    pub map: BTreeMap<String, OwnedVarInput>,
656}
657
658/// A role in a [`SessionCatalog`].
659pub trait CatalogRole {
660    /// Returns a fully-specified name of the role.
661    fn name(&self) -> &str;
662
663    /// Returns a stable ID for the role.
664    fn id(&self) -> RoleId;
665
666    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
667    /// membership.
668    ///
669    /// Key is the role that some role is a member of, value is the grantor role ID.
670    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
671
672    /// Returns the attributes associated with this role.
673    fn attributes(&self) -> &RoleAttributes;
674
675    /// Returns all variables that this role has a default value stored for.
676    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
677}
678
679/// A network policy in a [`SessionCatalog`].
680pub trait CatalogNetworkPolicy {
681    /// Returns a fully-specified name of the NetworkPolicy.
682    fn name(&self) -> &str;
683
684    /// Returns a stable ID for the NetworkPolicy.
685    fn id(&self) -> NetworkPolicyId;
686
687    /// Returns the ID of the owning NetworkPolicy.
688    fn owner_id(&self) -> RoleId;
689
690    /// Returns the privileges associated with the NetworkPolicy.
691    fn privileges(&self) -> &PrivilegeMap;
692}
693
694/// A cluster in a [`SessionCatalog`].
695pub trait CatalogCluster<'a> {
696    /// Returns a fully-specified name of the cluster.
697    fn name(&self) -> &str;
698
699    /// Returns a stable ID for the cluster.
700    fn id(&self) -> ClusterId;
701
702    /// Returns the objects that are bound to this cluster.
703    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
704
705    /// Returns the replicas of the cluster as a map from replica name to
706    /// replica ID.
707    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
708
709    /// Returns the replicas of the cluster.
710    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
711
712    /// Returns the replica belonging to the cluster with replica ID `id`.
713    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_>;
714
715    /// Returns the ID of the owning role.
716    fn owner_id(&self) -> RoleId;
717
718    /// Returns the privileges associated with the cluster.
719    fn privileges(&self) -> &PrivilegeMap;
720
721    /// Returns true if this cluster is a managed cluster.
722    fn is_managed(&self) -> bool;
723
724    /// Returns the size of the cluster, if the cluster is a managed cluster.
725    fn managed_size(&self) -> Option<&str>;
726
727    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
728    fn schedule(&self) -> Option<&ClusterSchedule>;
729
730    /// Try to convert this cluster into a [`CreateClusterPlan`].
731    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
732    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
733}
734
735/// A cluster replica in a [`SessionCatalog`]
736pub trait CatalogClusterReplica<'a>: Debug {
737    /// Returns the name of the cluster replica.
738    fn name(&self) -> &str;
739
740    /// Returns a stable ID for the cluster that the replica belongs to.
741    fn cluster_id(&self) -> ClusterId;
742
743    /// Returns a stable ID for the replica.
744    fn replica_id(&self) -> ReplicaId;
745
746    /// Returns the ID of the owning role.
747    fn owner_id(&self) -> RoleId;
748
749    /// Returns whether or not the replica is internal
750    fn internal(&self) -> bool;
751}
752
753/// An item in a [`SessionCatalog`].
754///
755/// Note that "item" has a very specific meaning in the context of a SQL
756/// catalog, and refers to the various entities that belong to a schema.
757pub trait CatalogItem {
758    /// Returns the fully qualified name of the catalog item.
759    fn name(&self) -> &QualifiedItemName;
760
761    /// Returns the [`CatalogItemId`] for the item.
762    fn id(&self) -> CatalogItemId;
763
764    /// Returns the [`GlobalId`]s associated with this item.
765    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
766
767    /// Returns the catalog item's OID.
768    fn oid(&self) -> u32;
769
770    /// Returns the resolved function.
771    ///
772    /// If the catalog item is not of a type that produces functions (i.e.,
773    /// anything other than a function), it returns an error.
774    fn func(&self) -> Result<&'static Func, CatalogError>;
775
776    /// Returns the resolved source connection.
777    ///
778    /// If the catalog item is not of a type that contains a `SourceDesc`
779    /// (i.e., anything other than sources), it returns an error.
780    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
781
782    /// Returns the resolved connection.
783    ///
784    /// If the catalog item is not a connection, it returns an error.
785    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
786
787    /// Returns the type of the catalog item.
788    fn item_type(&self) -> CatalogItemType;
789
790    /// A normalized SQL statement that describes how to create the catalog
791    /// item.
792    fn create_sql(&self) -> &str;
793
794    /// Returns the IDs of the catalog items upon which this catalog item
795    /// directly references.
796    fn references(&self) -> &ResolvedIds;
797
798    /// Returns the IDs of the catalog items upon which this catalog item
799    /// depends.
800    fn uses(&self) -> BTreeSet<CatalogItemId>;
801
802    /// Returns the IDs of the catalog items that directly reference this catalog item.
803    fn referenced_by(&self) -> &[CatalogItemId];
804
805    /// Returns the IDs of the catalog items that depend upon this catalog item.
806    fn used_by(&self) -> &[CatalogItemId];
807
808    /// Reports whether this catalog entry is a subsource and, if it is, the
809    /// ingestion it is an export of, as well as the item it exports.
810    fn subsource_details(
811        &self,
812    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
813
814    /// Reports whether this catalog entry is a source export and, if it is, the
815    /// ingestion it is an export of, as well as the item it exports.
816    fn source_export_details(
817        &self,
818    ) -> Option<(
819        CatalogItemId,
820        &UnresolvedItemName,
821        &SourceExportDetails,
822        &SourceExportDataConfig<ReferencedConnection>,
823    )>;
824
825    /// Reports whether this catalog item is a progress source.
826    fn is_progress_source(&self) -> bool;
827
828    /// If this catalog item is a source, it return the IDs of its progress collection.
829    fn progress_id(&self) -> Option<CatalogItemId>;
830
831    /// Returns the index details associated with the catalog item, if the
832    /// catalog item is an index.
833    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
834
835    /// Returns the column defaults associated with the catalog item, if the
836    /// catalog item is a table that accepts writes.
837    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
838
839    /// Returns the type information associated with the catalog item, if the
840    /// catalog item is a type.
841    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
842
843    /// Returns the ID of the owning role.
844    fn owner_id(&self) -> RoleId;
845
846    /// Returns the privileges associated with the item.
847    fn privileges(&self) -> &PrivilegeMap;
848
849    /// Returns the cluster the item belongs to.
850    fn cluster_id(&self) -> Option<ClusterId>;
851
852    /// Returns the [`CatalogCollectionItem`] for a specific version of this
853    /// [`CatalogItem`].
854    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
855
856    /// The latest version of this item, if it's version-able.
857    fn latest_version(&self) -> Option<RelationVersion>;
858}
859
860/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
861/// refers to.
862pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
863    /// Returns a description of the result set produced by the catalog item.
864    ///
865    /// If the catalog item is not of a type that produces data (i.e., a sink or
866    /// an index), it returns an error.
867    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, CatalogError>;
868
869    /// The [`GlobalId`] for this item.
870    fn global_id(&self) -> GlobalId;
871}
872
873/// The type of a [`CatalogItem`].
874#[derive(Debug, Deserialize, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
875pub enum CatalogItemType {
876    /// A table.
877    Table,
878    /// A source.
879    Source,
880    /// A sink.
881    Sink,
882    /// A view.
883    View,
884    /// A materialized view.
885    MaterializedView,
886    /// An index.
887    Index,
888    /// A type.
889    Type,
890    /// A func.
891    Func,
892    /// A secret.
893    Secret,
894    /// A connection.
895    Connection,
896    /// A continual task.
897    ContinualTask,
898}
899
900impl CatalogItemType {
901    /// Reports whether the given type of item conflicts with items of type
902    /// `CatalogItemType::Type`.
903    ///
904    /// In PostgreSQL, even though types live in a separate namespace from other
905    /// schema objects, creating a table, view, or materialized view creates a
906    /// type named after that relation. This prevents creating a type with the
907    /// same name as a relational object, even though types and relational
908    /// objects live in separate namespaces. (Indexes are even weirder; while
909    /// they don't get a type with the same name, they get an entry in
910    /// `pg_class` that prevents *record* types of the same name as the index,
911    /// but not other types of types, like enums.)
912    ///
913    /// We don't presently construct types that mirror relational objects,
914    /// though we likely will need to in the future for full PostgreSQL
915    /// compatibility (see database-issues#7142). For now, we use this method to
916    /// prevent creating types and relational objects that have the same name, so
917    /// that it is a backwards compatible change in the future to introduce a
918    /// type named after each relational object in the system.
919    pub fn conflicts_with_type(&self) -> bool {
920        match self {
921            CatalogItemType::Table => true,
922            CatalogItemType::Source => true,
923            CatalogItemType::View => true,
924            CatalogItemType::MaterializedView => true,
925            CatalogItemType::Index => true,
926            CatalogItemType::Type => true,
927            CatalogItemType::Sink => false,
928            CatalogItemType::Func => false,
929            CatalogItemType::Secret => false,
930            CatalogItemType::Connection => false,
931            CatalogItemType::ContinualTask => true,
932        }
933    }
934}
935
936impl fmt::Display for CatalogItemType {
937    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
938        match self {
939            CatalogItemType::Table => f.write_str("table"),
940            CatalogItemType::Source => f.write_str("source"),
941            CatalogItemType::Sink => f.write_str("sink"),
942            CatalogItemType::View => f.write_str("view"),
943            CatalogItemType::MaterializedView => f.write_str("materialized view"),
944            CatalogItemType::Index => f.write_str("index"),
945            CatalogItemType::Type => f.write_str("type"),
946            CatalogItemType::Func => f.write_str("func"),
947            CatalogItemType::Secret => f.write_str("secret"),
948            CatalogItemType::Connection => f.write_str("connection"),
949            CatalogItemType::ContinualTask => f.write_str("continual task"),
950        }
951    }
952}
953
954impl From<CatalogItemType> for ObjectType {
955    fn from(value: CatalogItemType) -> Self {
956        match value {
957            CatalogItemType::Table => ObjectType::Table,
958            CatalogItemType::Source => ObjectType::Source,
959            CatalogItemType::Sink => ObjectType::Sink,
960            CatalogItemType::View => ObjectType::View,
961            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
962            CatalogItemType::Index => ObjectType::Index,
963            CatalogItemType::Type => ObjectType::Type,
964            CatalogItemType::Func => ObjectType::Func,
965            CatalogItemType::Secret => ObjectType::Secret,
966            CatalogItemType::Connection => ObjectType::Connection,
967            CatalogItemType::ContinualTask => ObjectType::ContinualTask,
968        }
969    }
970}
971
972impl From<CatalogItemType> for mz_audit_log::ObjectType {
973    fn from(value: CatalogItemType) -> Self {
974        match value {
975            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
976            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
977            CatalogItemType::View => mz_audit_log::ObjectType::View,
978            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
979            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
980            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
981            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
982            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
983            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
984            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
985            CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
986        }
987    }
988}
989
990/// Details about a type in the catalog.
991#[derive(Clone, Debug, Eq, PartialEq)]
992pub struct CatalogTypeDetails<T: TypeReference> {
993    /// The ID of the type with this type as the array element, if available.
994    pub array_id: Option<CatalogItemId>,
995    /// The description of this type.
996    pub typ: CatalogType<T>,
997    /// Additional metadata about the type in PostgreSQL, if relevant.
998    pub pg_metadata: Option<CatalogTypePgMetadata>,
999}
1000
1001/// Additional PostgreSQL metadata about a type.
1002#[derive(Clone, Debug, Eq, PartialEq)]
1003pub struct CatalogTypePgMetadata {
1004    /// The OID of the `typinput` function in PostgreSQL.
1005    pub typinput_oid: u32,
1006    /// The OID of the `typreceive` function in PostgreSQL.
1007    pub typreceive_oid: u32,
1008}
1009
1010/// Represents a reference to type in the catalog
1011pub trait TypeReference {
1012    /// The actual type used to reference a `CatalogType`
1013    type Reference: Clone + Debug + Eq + PartialEq;
1014}
1015
1016/// Reference to a type by it's name
1017#[derive(Clone, Debug, Eq, PartialEq)]
1018pub struct NameReference;
1019
1020impl TypeReference for NameReference {
1021    type Reference = &'static str;
1022}
1023
1024/// Reference to a type by it's global ID
1025#[derive(Clone, Debug, Eq, PartialEq)]
1026pub struct IdReference;
1027
1028impl TypeReference for IdReference {
1029    type Reference = CatalogItemId;
1030}
1031
1032/// A type stored in the catalog.
1033///
1034/// The variants correspond one-to-one with [`mz_repr::SqlScalarType`], but with type
1035/// modifiers removed and with embedded types replaced with references to other
1036/// types in the catalog.
1037#[allow(missing_docs)]
1038#[derive(Clone, Debug, Eq, PartialEq)]
1039pub enum CatalogType<T: TypeReference> {
1040    AclItem,
1041    Array {
1042        element_reference: T::Reference,
1043    },
1044    Bool,
1045    Bytes,
1046    Char,
1047    Date,
1048    Float32,
1049    Float64,
1050    Int16,
1051    Int32,
1052    Int64,
1053    UInt16,
1054    UInt32,
1055    UInt64,
1056    MzTimestamp,
1057    Interval,
1058    Jsonb,
1059    List {
1060        element_reference: T::Reference,
1061        element_modifiers: Vec<i64>,
1062    },
1063    Map {
1064        key_reference: T::Reference,
1065        key_modifiers: Vec<i64>,
1066        value_reference: T::Reference,
1067        value_modifiers: Vec<i64>,
1068    },
1069    Numeric,
1070    Oid,
1071    PgLegacyChar,
1072    PgLegacyName,
1073    Pseudo,
1074    Range {
1075        element_reference: T::Reference,
1076    },
1077    Record {
1078        fields: Vec<CatalogRecordField<T>>,
1079    },
1080    RegClass,
1081    RegProc,
1082    RegType,
1083    String,
1084    Time,
1085    Timestamp,
1086    TimestampTz,
1087    Uuid,
1088    VarChar,
1089    Int2Vector,
1090    MzAclItem,
1091}
1092
1093impl CatalogType<IdReference> {
1094    /// Returns the relation description for the type, if the type is a record
1095    /// type.
1096    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1097        match &self {
1098            CatalogType::Record { fields } => {
1099                let mut desc = RelationDesc::builder();
1100                for f in fields {
1101                    let name = f.name.clone();
1102                    let ty = query::scalar_type_from_catalog(
1103                        catalog,
1104                        f.type_reference,
1105                        &f.type_modifiers,
1106                    )?;
1107                    // TODO: support plumbing `NOT NULL` constraints through
1108                    // `CREATE TYPE`.
1109                    let ty = ty.nullable(true);
1110                    desc = desc.with_column(name, ty);
1111                }
1112                Ok(Some(desc.finish()))
1113            }
1114            _ => Ok(None),
1115        }
1116    }
1117}
1118
1119/// A description of a field in a [`CatalogType::Record`].
1120#[derive(Clone, Debug, Eq, PartialEq)]
1121pub struct CatalogRecordField<T: TypeReference> {
1122    /// The name of the field.
1123    pub name: ColumnName,
1124    /// The ID of the type of the field.
1125    pub type_reference: T::Reference,
1126    /// Modifiers to apply to the type.
1127    pub type_modifiers: Vec<i64>,
1128}
1129
1130#[derive(Clone, Debug, Eq, PartialEq)]
1131/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1132///
1133/// Note that Materialize also uses a number of pseudotypes when planning, but
1134/// we have yet to need to integrate them with `TypeCategory`.
1135///
1136/// [typcategory]:
1137/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1138pub enum TypeCategory {
1139    /// Array type.
1140    Array,
1141    /// Bit string type.
1142    BitString,
1143    /// Boolean type.
1144    Boolean,
1145    /// Composite type.
1146    Composite,
1147    /// Date/time type.
1148    DateTime,
1149    /// Enum type.
1150    Enum,
1151    /// Geometric type.
1152    Geometric,
1153    /// List type. Materialize specific.
1154    List,
1155    /// Network address type.
1156    NetworkAddress,
1157    /// Numeric type.
1158    Numeric,
1159    /// Pseudo type.
1160    Pseudo,
1161    /// Range type.
1162    Range,
1163    /// String type.
1164    String,
1165    /// Timestamp type.
1166    Timespan,
1167    /// User-defined type.
1168    UserDefined,
1169    /// Unknown type.
1170    Unknown,
1171}
1172
1173impl fmt::Display for TypeCategory {
1174    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1175        f.write_str(match self {
1176            TypeCategory::Array => "array",
1177            TypeCategory::BitString => "bit-string",
1178            TypeCategory::Boolean => "boolean",
1179            TypeCategory::Composite => "composite",
1180            TypeCategory::DateTime => "date-time",
1181            TypeCategory::Enum => "enum",
1182            TypeCategory::Geometric => "geometric",
1183            TypeCategory::List => "list",
1184            TypeCategory::NetworkAddress => "network-address",
1185            TypeCategory::Numeric => "numeric",
1186            TypeCategory::Pseudo => "pseudo",
1187            TypeCategory::Range => "range",
1188            TypeCategory::String => "string",
1189            TypeCategory::Timespan => "timespan",
1190            TypeCategory::UserDefined => "user-defined",
1191            TypeCategory::Unknown => "unknown",
1192        })
1193    }
1194}
1195
1196/// Identifies an environment.
1197///
1198/// Outside of tests, an environment ID can be constructed only from a string of
1199/// the following form:
1200///
1201/// ```text
1202/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1203/// ```
1204///
1205/// The fields have the following formats:
1206///
1207/// * The cloud provider consists of one or more alphanumeric characters.
1208/// * The cloud provider region consists of one or more alphanumeric or hyphen
1209///   characters.
1210/// * The organization ID is a UUID in its canonical text format.
1211/// * The ordinal is a decimal number with between one and eight digits.
1212///
1213/// There is no way to construct an environment ID from parts, to ensure that
1214/// the `Display` representation is parseable according to the above rules.
1215// NOTE(benesch): ideally we'd have accepted the components of the environment
1216// ID using separate command-line arguments, or at least a string format that
1217// used a field separator that did not appear in the fields. Alas. We can't
1218// easily change it now, as it's used as the e.g. default sink progress topic.
1219#[derive(Debug, Clone, PartialEq)]
1220pub struct EnvironmentId {
1221    cloud_provider: CloudProvider,
1222    cloud_provider_region: String,
1223    organization_id: Uuid,
1224    ordinal: u64,
1225}
1226
1227impl EnvironmentId {
1228    /// Creates a dummy `EnvironmentId` for use in tests.
1229    pub fn for_tests() -> EnvironmentId {
1230        EnvironmentId {
1231            cloud_provider: CloudProvider::Local,
1232            cloud_provider_region: "az1".into(),
1233            organization_id: Uuid::new_v4(),
1234            ordinal: 0,
1235        }
1236    }
1237
1238    /// Returns the cloud provider associated with this environment ID.
1239    pub fn cloud_provider(&self) -> &CloudProvider {
1240        &self.cloud_provider
1241    }
1242
1243    /// Returns the cloud provider region associated with this environment ID.
1244    pub fn cloud_provider_region(&self) -> &str {
1245        &self.cloud_provider_region
1246    }
1247
1248    /// Returns the name of the region associted with this environment ID.
1249    ///
1250    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1251    /// [`EnvironmentId::cloud_provider_region`].
1252    pub fn region(&self) -> String {
1253        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1254    }
1255
1256    /// Returns the organization ID associated with this environment ID.
1257    pub fn organization_id(&self) -> Uuid {
1258        self.organization_id
1259    }
1260
1261    /// Returns the ordinal associated with this environment ID.
1262    pub fn ordinal(&self) -> u64 {
1263        self.ordinal
1264    }
1265}
1266
1267// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1268// populated using this key. Consequently, any changes to that trait
1269// implementation will also have to be reflected in the existing feature
1270// targeting config in LaunchDarkly, otherwise environments might receive
1271// different configs upon restart.
1272impl fmt::Display for EnvironmentId {
1273    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1274        write!(
1275            f,
1276            "{}-{}-{}-{}",
1277            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1278        )
1279    }
1280}
1281
1282impl FromStr for EnvironmentId {
1283    type Err = InvalidEnvironmentIdError;
1284
1285    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1286        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1287            Regex::new(
1288                "^(?P<cloud_provider>[[:alnum:]]+)-\
1289                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1290                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1291                  (?P<ordinal>\\d{1,8})$"
1292            ).unwrap()
1293        });
1294        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1295        Ok(EnvironmentId {
1296            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1297            cloud_provider_region: captures["cloud_provider_region"].into(),
1298            organization_id: captures["organization_id"]
1299                .parse()
1300                .map_err(|_| InvalidEnvironmentIdError)?,
1301            ordinal: captures["ordinal"]
1302                .parse()
1303                .map_err(|_| InvalidEnvironmentIdError)?,
1304        })
1305    }
1306}
1307
1308/// The error type for [`EnvironmentId::from_str`].
1309#[derive(Debug, Clone, PartialEq)]
1310pub struct InvalidEnvironmentIdError;
1311
1312impl fmt::Display for InvalidEnvironmentIdError {
1313    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1314        f.write_str("invalid environment ID")
1315    }
1316}
1317
1318impl Error for InvalidEnvironmentIdError {}
1319
1320impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1321    fn from(_: InvalidCloudProviderError) -> Self {
1322        InvalidEnvironmentIdError
1323    }
1324}
1325
1326/// An error returned by the catalog.
1327#[derive(Clone, Debug, Eq, PartialEq)]
1328pub enum CatalogError {
1329    /// Unknown database.
1330    UnknownDatabase(String),
1331    /// Database already exists.
1332    DatabaseAlreadyExists(String),
1333    /// Unknown schema.
1334    UnknownSchema(String),
1335    /// Schema already exists.
1336    SchemaAlreadyExists(String),
1337    /// Unknown role.
1338    UnknownRole(String),
1339    /// Role already exists.
1340    RoleAlreadyExists(String),
1341    /// Network Policy already exists.
1342    NetworkPolicyAlreadyExists(String),
1343    /// Unknown cluster.
1344    UnknownCluster(String),
1345    /// Unexpected builtin cluster.
1346    UnexpectedBuiltinCluster(String),
1347    /// Unexpected builtin cluster.
1348    UnexpectedBuiltinClusterType(String),
1349    /// Cluster already exists.
1350    ClusterAlreadyExists(String),
1351    /// Unknown cluster replica.
1352    UnknownClusterReplica(String),
1353    /// Unknown cluster replica size.
1354    UnknownClusterReplicaSize(String),
1355    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1356    DuplicateReplica(String, String),
1357    /// Unknown item.
1358    UnknownItem(String),
1359    /// Item already exists.
1360    ItemAlreadyExists(CatalogItemId, String),
1361    /// Unknown function.
1362    UnknownFunction {
1363        /// The identifier of the function we couldn't find
1364        name: String,
1365        /// A suggested alternative to the named function.
1366        alternative: Option<String>,
1367    },
1368    /// Unknown type.
1369    UnknownType {
1370        /// The identifier of the type we couldn't find.
1371        name: String,
1372    },
1373    /// Unknown connection.
1374    UnknownConnection(String),
1375    /// Unknown network policy.
1376    UnknownNetworkPolicy(String),
1377    /// Expected the catalog item to have the given type, but it did not.
1378    UnexpectedType {
1379        /// The item's name.
1380        name: String,
1381        /// The actual type of the item.
1382        actual_type: CatalogItemType,
1383        /// The expected type of the item.
1384        expected_type: CatalogItemType,
1385    },
1386    /// Invalid attempt to depend on a non-dependable item.
1387    InvalidDependency {
1388        /// The invalid item's name.
1389        name: String,
1390        /// The invalid item's type.
1391        typ: CatalogItemType,
1392    },
1393    /// Ran out of unique IDs.
1394    IdExhaustion,
1395    /// Ran out of unique OIDs.
1396    OidExhaustion,
1397    /// Timeline already exists.
1398    TimelineAlreadyExists(String),
1399    /// Id Allocator already exists.
1400    IdAllocatorAlreadyExists(String),
1401    /// Config already exists.
1402    ConfigAlreadyExists(String),
1403    /// Builtin migrations failed.
1404    FailedBuiltinSchemaMigration(String),
1405    /// StorageCollectionMetadata already exists.
1406    StorageCollectionMetadataAlreadyExists(GlobalId),
1407}
1408
1409impl fmt::Display for CatalogError {
1410    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1411        match self {
1412            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1413            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1414            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1415            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1416            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1417            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1418            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1419            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1420            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1421            Self::NetworkPolicyAlreadyExists(name) => {
1422                write!(f, "network policy '{name}' already exists")
1423            }
1424            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1425            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1426            Self::UnexpectedBuiltinCluster(name) => {
1427                write!(f, "Unexpected builtin cluster '{}'", name)
1428            }
1429            Self::UnexpectedBuiltinClusterType(name) => {
1430                write!(f, "Unexpected builtin cluster type'{}'", name)
1431            }
1432            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1433            Self::UnknownClusterReplica(name) => {
1434                write!(f, "unknown cluster replica '{}'", name)
1435            }
1436            Self::UnknownClusterReplicaSize(name) => {
1437                write!(f, "unknown cluster replica size '{}'", name)
1438            }
1439            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1440                f,
1441                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1442            ),
1443            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1444            Self::ItemAlreadyExists(_gid, name) => {
1445                write!(f, "catalog item '{name}' already exists")
1446            }
1447            Self::UnexpectedType {
1448                name,
1449                actual_type,
1450                expected_type,
1451            } => {
1452                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1453            }
1454            Self::InvalidDependency { name, typ } => write!(
1455                f,
1456                "catalog item '{}' is {} {} and so cannot be depended upon",
1457                name,
1458                if matches!(typ, CatalogItemType::Index) {
1459                    "an"
1460                } else {
1461                    "a"
1462                },
1463                typ,
1464            ),
1465            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1466            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1467            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1468            Self::IdAllocatorAlreadyExists(name) => {
1469                write!(f, "ID allocator '{name}' already exists")
1470            }
1471            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1472            Self::FailedBuiltinSchemaMigration(objects) => {
1473                write!(f, "failed to migrate schema of builtin objects: {objects}")
1474            }
1475            Self::StorageCollectionMetadataAlreadyExists(key) => {
1476                write!(f, "storage metadata for '{key}' already exists")
1477            }
1478        }
1479    }
1480}
1481
1482impl CatalogError {
1483    /// Returns any applicable hints for [`CatalogError`].
1484    pub fn hint(&self) -> Option<String> {
1485        match self {
1486            CatalogError::UnknownFunction { alternative, .. } => {
1487                match alternative {
1488                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1489                    Some(alt) => Some(format!("Try using {alt}")),
1490                }
1491            }
1492            _ => None,
1493        }
1494    }
1495}
1496
1497impl Error for CatalogError {}
1498
1499// Enum variant docs would be useless here.
1500#[allow(missing_docs)]
1501#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1502/// The types of objects stored in the catalog.
1503pub enum ObjectType {
1504    Table,
1505    View,
1506    MaterializedView,
1507    Source,
1508    Sink,
1509    Index,
1510    Type,
1511    Role,
1512    Cluster,
1513    ClusterReplica,
1514    Secret,
1515    Connection,
1516    Database,
1517    Schema,
1518    Func,
1519    ContinualTask,
1520    NetworkPolicy,
1521}
1522
1523impl ObjectType {
1524    /// Reports if the object type can be treated as a relation.
1525    pub fn is_relation(&self) -> bool {
1526        match self {
1527            ObjectType::Table
1528            | ObjectType::View
1529            | ObjectType::MaterializedView
1530            | ObjectType::Source
1531            | ObjectType::ContinualTask => true,
1532            ObjectType::Sink
1533            | ObjectType::Index
1534            | ObjectType::Type
1535            | ObjectType::Secret
1536            | ObjectType::Connection
1537            | ObjectType::Func
1538            | ObjectType::Database
1539            | ObjectType::Schema
1540            | ObjectType::Cluster
1541            | ObjectType::ClusterReplica
1542            | ObjectType::Role
1543            | ObjectType::NetworkPolicy => false,
1544        }
1545    }
1546}
1547
1548impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1549    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1550        match value {
1551            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1552            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1553            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1554            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1555            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1556            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1557            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1558            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1559            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1560            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1561            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1562            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1563            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1564            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1565            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1566            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1567            mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask,
1568            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1569        }
1570    }
1571}
1572
1573impl From<CommentObjectId> for ObjectType {
1574    fn from(value: CommentObjectId) -> ObjectType {
1575        match value {
1576            CommentObjectId::Table(_) => ObjectType::Table,
1577            CommentObjectId::View(_) => ObjectType::View,
1578            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1579            CommentObjectId::Source(_) => ObjectType::Source,
1580            CommentObjectId::Sink(_) => ObjectType::Sink,
1581            CommentObjectId::Index(_) => ObjectType::Index,
1582            CommentObjectId::Func(_) => ObjectType::Func,
1583            CommentObjectId::Connection(_) => ObjectType::Connection,
1584            CommentObjectId::Type(_) => ObjectType::Type,
1585            CommentObjectId::Secret(_) => ObjectType::Secret,
1586            CommentObjectId::Role(_) => ObjectType::Role,
1587            CommentObjectId::Database(_) => ObjectType::Database,
1588            CommentObjectId::Schema(_) => ObjectType::Schema,
1589            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1590            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1591            CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1592            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1593        }
1594    }
1595}
1596
1597impl Display for ObjectType {
1598    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1599        f.write_str(match self {
1600            ObjectType::Table => "TABLE",
1601            ObjectType::View => "VIEW",
1602            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1603            ObjectType::Source => "SOURCE",
1604            ObjectType::Sink => "SINK",
1605            ObjectType::Index => "INDEX",
1606            ObjectType::Type => "TYPE",
1607            ObjectType::Role => "ROLE",
1608            ObjectType::Cluster => "CLUSTER",
1609            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1610            ObjectType::Secret => "SECRET",
1611            ObjectType::Connection => "CONNECTION",
1612            ObjectType::Database => "DATABASE",
1613            ObjectType::Schema => "SCHEMA",
1614            ObjectType::Func => "FUNCTION",
1615            ObjectType::ContinualTask => "CONTINUAL TASK",
1616            ObjectType::NetworkPolicy => "NETWORK POLICY",
1617        })
1618    }
1619}
1620
1621#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1622/// The types of objects in the system.
1623pub enum SystemObjectType {
1624    /// Catalog object type.
1625    Object(ObjectType),
1626    /// Entire system.
1627    System,
1628}
1629
1630impl SystemObjectType {
1631    /// Reports if the object type can be treated as a relation.
1632    pub fn is_relation(&self) -> bool {
1633        match self {
1634            SystemObjectType::Object(object_type) => object_type.is_relation(),
1635            SystemObjectType::System => false,
1636        }
1637    }
1638}
1639
1640impl Display for SystemObjectType {
1641    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1642        match self {
1643            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1644            SystemObjectType::System => f.write_str("SYSTEM"),
1645        }
1646    }
1647}
1648
1649/// Enum used to format object names in error messages.
1650#[derive(Debug, Clone, PartialEq, Eq)]
1651pub enum ErrorMessageObjectDescription {
1652    /// The name of a specific object.
1653    Object {
1654        /// Type of object.
1655        object_type: ObjectType,
1656        /// Name of object.
1657        object_name: Option<String>,
1658    },
1659    /// The name of the entire system.
1660    System,
1661}
1662
1663impl ErrorMessageObjectDescription {
1664    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1665    pub fn from_id(
1666        object_id: &ObjectId,
1667        catalog: &dyn SessionCatalog,
1668    ) -> ErrorMessageObjectDescription {
1669        let object_name = match object_id {
1670            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1671            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1672                .get_cluster_replica(*cluster_id, *replica_id)
1673                .name()
1674                .to_string(),
1675            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1676            ObjectId::Schema((database_spec, schema_spec)) => {
1677                let name = catalog.get_schema(database_spec, schema_spec).name();
1678                catalog.resolve_full_schema_name(name).to_string()
1679            }
1680            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1681            ObjectId::Item(id) => {
1682                let name = catalog.get_item(id).name();
1683                catalog.resolve_full_name(name).to_string()
1684            }
1685            ObjectId::NetworkPolicy(network_policy_id) => catalog
1686                .get_network_policy(network_policy_id)
1687                .name()
1688                .to_string(),
1689        };
1690        ErrorMessageObjectDescription::Object {
1691            object_type: catalog.get_object_type(object_id),
1692            object_name: Some(object_name),
1693        }
1694    }
1695
1696    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1697    pub fn from_sys_id(
1698        object_id: &SystemObjectId,
1699        catalog: &dyn SessionCatalog,
1700    ) -> ErrorMessageObjectDescription {
1701        match object_id {
1702            SystemObjectId::Object(object_id) => {
1703                ErrorMessageObjectDescription::from_id(object_id, catalog)
1704            }
1705            SystemObjectId::System => ErrorMessageObjectDescription::System,
1706        }
1707    }
1708
1709    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1710    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1711        match object_type {
1712            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1713                object_type,
1714                object_name: None,
1715            },
1716            SystemObjectType::System => ErrorMessageObjectDescription::System,
1717        }
1718    }
1719}
1720
1721impl Display for ErrorMessageObjectDescription {
1722    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1723        match self {
1724            ErrorMessageObjectDescription::Object {
1725                object_type,
1726                object_name,
1727            } => {
1728                let object_name = object_name
1729                    .as_ref()
1730                    .map(|object_name| format!(" {}", object_name.quoted()))
1731                    .unwrap_or_else(|| "".to_string());
1732                write!(f, "{object_type}{object_name}")
1733            }
1734            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1735        }
1736    }
1737}
1738
1739#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1740// These attributes are needed because the key of a map must be a string. We also
1741// get the added benefit of flattening this struct in it's serialized form.
1742#[serde(into = "BTreeMap<String, RoleId>")]
1743#[serde(try_from = "BTreeMap<String, RoleId>")]
1744/// Represents the grantee and a grantor of a role membership.
1745pub struct RoleMembership {
1746    /// Key is the role that some role is a member of, value is the grantor role ID.
1747    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1748    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1749    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1750    // migration.
1751    pub map: BTreeMap<RoleId, RoleId>,
1752}
1753
1754impl RoleMembership {
1755    /// Creates a new [`RoleMembership`].
1756    pub fn new() -> RoleMembership {
1757        RoleMembership {
1758            map: BTreeMap::new(),
1759        }
1760    }
1761}
1762
1763impl From<RoleMembership> for BTreeMap<String, RoleId> {
1764    fn from(value: RoleMembership) -> Self {
1765        value
1766            .map
1767            .into_iter()
1768            .map(|(k, v)| (k.to_string(), v))
1769            .collect()
1770    }
1771}
1772
1773impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1774    type Error = anyhow::Error;
1775
1776    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1777        Ok(RoleMembership {
1778            map: value
1779                .into_iter()
1780                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1781                .collect::<Result<_, anyhow::Error>>()?,
1782        })
1783    }
1784}
1785
1786/// Specification for objects that will be affected by a default privilege.
1787#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1788pub struct DefaultPrivilegeObject {
1789    /// The role id that created the object.
1790    pub role_id: RoleId,
1791    /// The database that the object is created in if Some, otherwise all databases.
1792    pub database_id: Option<DatabaseId>,
1793    /// The schema that the object is created in if Some, otherwise all databases.
1794    pub schema_id: Option<SchemaId>,
1795    /// The type of object.
1796    pub object_type: ObjectType,
1797}
1798
1799impl DefaultPrivilegeObject {
1800    /// Creates a new [`DefaultPrivilegeObject`].
1801    pub fn new(
1802        role_id: RoleId,
1803        database_id: Option<DatabaseId>,
1804        schema_id: Option<SchemaId>,
1805        object_type: ObjectType,
1806    ) -> DefaultPrivilegeObject {
1807        DefaultPrivilegeObject {
1808            role_id,
1809            database_id,
1810            schema_id,
1811            object_type,
1812        }
1813    }
1814}
1815
1816impl std::fmt::Display for DefaultPrivilegeObject {
1817    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1818        // TODO: Don't just wrap Debug.
1819        write!(f, "{self:?}")
1820    }
1821}
1822
1823/// Specification for the privileges that will be granted from default privileges.
1824#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1825pub struct DefaultPrivilegeAclItem {
1826    /// The role that will receive the privileges.
1827    pub grantee: RoleId,
1828    /// The specific privileges granted.
1829    pub acl_mode: AclMode,
1830}
1831
1832impl DefaultPrivilegeAclItem {
1833    /// Creates a new [`DefaultPrivilegeAclItem`].
1834    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1835        DefaultPrivilegeAclItem { grantee, acl_mode }
1836    }
1837
1838    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1839    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1840        MzAclItem {
1841            grantee: self.grantee,
1842            grantor,
1843            acl_mode: self.acl_mode,
1844        }
1845    }
1846}
1847
1848/// Which builtins to return in `BUILTINS::iter`.
1849///
1850/// All calls to `BUILTINS::iter` within the lifetime of an environmentd process
1851/// must provide an equal `BuiltinsConfig`. It is not allowed to change
1852/// dynamically.
1853#[derive(Debug, Clone)]
1854pub struct BuiltinsConfig {
1855    /// If true, include system builtin continual tasks.
1856    pub include_continual_tasks: bool,
1857}
1858
1859#[cfg(test)]
1860mod tests {
1861    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1862
1863    #[mz_ore::test]
1864    fn test_environment_id() {
1865        for (input, expected) in [
1866            (
1867                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1868                Ok(EnvironmentId {
1869                    cloud_provider: CloudProvider::Local,
1870                    cloud_provider_region: "az1".into(),
1871                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1872                    ordinal: 452,
1873                }),
1874            ),
1875            (
1876                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1877                Ok(EnvironmentId {
1878                    cloud_provider: CloudProvider::Aws,
1879                    cloud_provider_region: "us-east-1".into(),
1880                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1881                    ordinal: 0,
1882                }),
1883            ),
1884            (
1885                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1886                Ok(EnvironmentId {
1887                    cloud_provider: CloudProvider::Gcp,
1888                    cloud_provider_region: "us-central1".into(),
1889                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1890                    ordinal: 0,
1891                }),
1892            ),
1893            (
1894                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1895                Ok(EnvironmentId {
1896                    cloud_provider: CloudProvider::Azure,
1897                    cloud_provider_region: "australiaeast".into(),
1898                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1899                    ordinal: 0,
1900                }),
1901            ),
1902            (
1903                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1904                Ok(EnvironmentId {
1905                    cloud_provider: CloudProvider::Generic,
1906                    cloud_provider_region: "moon-station-11-darkside".into(),
1907                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1908                    ordinal: 0,
1909                }),
1910            ),
1911            ("", Err(InvalidEnvironmentIdError)),
1912            (
1913                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1914                Err(InvalidEnvironmentIdError),
1915            ),
1916            (
1917                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1918                Err(InvalidEnvironmentIdError),
1919            ),
1920            (
1921                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1922                Err(InvalidEnvironmentIdError),
1923            ),
1924        ] {
1925            let actual = input.parse();
1926            assert_eq!(expected, actual, "input = {}", input);
1927            if let Ok(actual) = actual {
1928                assert_eq!(input, actual.to_string(), "input = {}", input);
1929            }
1930        }
1931    }
1932}