Skip to main content

mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::num::NonZeroU32;
20use std::str::FromStr;
21use std::sync::LazyLock;
22use std::time::Instant;
23
24use chrono::{DateTime, Utc};
25use mz_auth::password::Password;
26use mz_build_info::BuildInfo;
27use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
28use mz_controller_types::{ClusterId, ReplicaId};
29use mz_expr::MirScalarExpr;
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_ore::str::StrExt;
32use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
33use mz_repr::explain::ExprHumanizer;
34use mz_repr::network_policy_id::NetworkPolicyId;
35use mz_repr::role_id::RoleId;
36use mz_repr::{
37    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
38};
39use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
40use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
41use mz_storage_types::connections::{Connection, ConnectionContext};
42use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
43use proptest_derive::Arbitrary;
44use regex::Regex;
45use serde::{Deserialize, Serialize};
46use uuid::Uuid;
47
48use crate::func::Func;
49use crate::names::{
50    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
51    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
52    SchemaSpecifier, SystemObjectId,
53};
54use crate::plan::statement::StatementDesc;
55use crate::plan::statement::ddl::PlannedRoleAttributes;
56use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
57use crate::session::vars::{OwnedVarInput, SystemVars};
58
59/// A catalog keeps track of SQL objects and session state available to the
60/// planner.
61///
62/// The `sql` crate is agnostic to any particular catalog implementation. This
63/// trait describes the required interface.
64///
65/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
66/// catalog contains databases, databases contain schemas, and schemas contain
67/// catalog items, like sources, sinks, view, and indexes.
68///
69/// There are two classes of operations provided by a catalog:
70///
71///   * Resolution operations, like [`resolve_item`]. These fill in missing name
72///     components based upon connection defaults, e.g., resolving the partial
73///     name `view42` to the fully-specified name `materialize.public.view42`.
74///
75///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
76///     metadata about a catalog entity based on a fully-specified name that is
77///     known to be valid (i.e., because the name was successfully resolved, or
78///     was constructed based on the output of a prior lookup operation). These
79///     functions panic if called with invalid input.
80///
81///   * Session management, such as managing variables' states and adding
82///     notices to the session.
83///
84/// [`get_databases`]: SessionCatalog::get_databases
85/// [`get_item`]: SessionCatalog::get_item
86/// [`resolve_item`]: SessionCatalog::resolve_item
87pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
88    /// Returns the id of the role that is issuing the query.
89    fn active_role_id(&self) -> &RoleId;
90
91    /// Returns the database to use if one is not explicitly specified.
92    fn active_database_name(&self) -> Option<&str> {
93        self.active_database()
94            .map(|id| self.get_database(id))
95            .map(|db| db.name())
96    }
97
98    /// Returns the database to use if one is not explicitly specified.
99    fn active_database(&self) -> Option<&DatabaseId>;
100
101    /// Returns the cluster to use if one is not explicitly specified.
102    fn active_cluster(&self) -> &str;
103
104    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
105    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
106
107    /// Returns the descriptor of the named prepared statement on the session, or
108    /// None if the prepared statement does not exist.
109    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
110
111    /// Retrieves a reference to the specified portal's descriptor.
112    ///
113    /// If there is no such portal, returns `None`.
114    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc>;
115
116    /// Resolves the named database.
117    ///
118    /// If `database_name` exists in the catalog, it returns a reference to the
119    /// resolved database; otherwise it returns an error.
120    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
121
122    /// Gets a database by its ID.
123    ///
124    /// Panics if `id` does not specify a valid database.
125    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
126
127    /// Gets all databases.
128    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
129
130    /// Resolves a partially-specified schema name.
131    ///
132    /// If the schema exists in the catalog, it returns a reference to the
133    /// resolved schema; otherwise it returns an error.
134    fn resolve_schema(
135        &self,
136        database_name: Option<&str>,
137        schema_name: &str,
138    ) -> Result<&dyn CatalogSchema, CatalogError>;
139
140    /// Resolves a schema name within a specified database.
141    ///
142    /// If the schema exists in the database, it returns a reference to the
143    /// resolved schema; otherwise it returns an error.
144    fn resolve_schema_in_database(
145        &self,
146        database_spec: &ResolvedDatabaseSpecifier,
147        schema_name: &str,
148    ) -> Result<&dyn CatalogSchema, CatalogError>;
149
150    /// Gets a schema by its ID.
151    ///
152    /// Panics if `id` does not specify a valid schema.
153    fn get_schema(
154        &self,
155        database_spec: &ResolvedDatabaseSpecifier,
156        schema_spec: &SchemaSpecifier,
157    ) -> &dyn CatalogSchema;
158
159    /// Gets all schemas.
160    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
161
162    /// Gets the mz_internal schema id.
163    fn get_mz_internal_schema_id(&self) -> SchemaId;
164
165    /// Gets the mz_unsafe schema id.
166    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
167
168    /// Returns true if `schema` is an internal system schema, false otherwise
169    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
170
171    /// Resolves the named role.
172    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
173
174    /// Resolves the named network policy.
175    fn resolve_network_policy(
176        &self,
177        network_policy_name: &str,
178    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
179
180    /// Gets a role by its ID.
181    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
182
183    /// Gets a role by its ID.
184    ///
185    /// Panics if `id` does not specify a valid role.
186    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
187
188    /// Gets all roles.
189    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
190
191    /// Gets the id of the `mz_system` role.
192    fn mz_system_role_id(&self) -> RoleId;
193
194    /// Collects all role IDs that `id` is transitively a member of.
195    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
196
197    /// Resolves the named cluster.
198    /// Gets a network_policy by its ID.
199    ///
200    /// Panics if `id` does not specify a valid role.
201    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
202
203    /// Gets all roles.
204    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
205
206    ///
207    /// If the provided name is `None`, resolves the currently active cluster.
208    fn resolve_cluster<'a, 'b>(
209        &'a self,
210        cluster_name: Option<&'b str>,
211    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
212
213    /// Resolves the named cluster replica.
214    fn resolve_cluster_replica<'a, 'b>(
215        &'a self,
216        cluster_replica_name: &'b QualifiedReplica,
217    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
218
219    /// Resolves a partially-specified item name, that is NOT a function or
220    /// type. (For resolving functions or types, please use
221    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
222    ///
223    /// If the partial name has a database component, it searches only the
224    /// specified database; otherwise, it searches the active database. If the
225    /// partial name has a schema component, it searches only the specified
226    /// schema; otherwise, it searches a default set of schemas within the
227    /// selected database. It returns an error if none of the searched schemas
228    /// contain an item whose name matches the item component of the partial
229    /// name.
230    ///
231    /// Note that it is not an error if the named item appears in more than one
232    /// of the search schemas. The catalog implementation must choose one.
233    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
234
235    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
236    /// functions within the catalog.
237    fn resolve_function(
238        &self,
239        item_name: &PartialItemName,
240    ) -> Result<&dyn CatalogItem, CatalogError>;
241
242    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
243    /// types within the catalog.
244    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
245
246    /// Resolves `name` to a type or item, preferring the type if both exist.
247    fn resolve_item_or_type(
248        &self,
249        name: &PartialItemName,
250    ) -> Result<&dyn CatalogItem, CatalogError> {
251        if let Ok(ty) = self.resolve_type(name) {
252            return Ok(ty);
253        }
254        self.resolve_item(name)
255    }
256
257    /// Gets a type named `name` from exactly one of the system schemas.
258    ///
259    /// # Panics
260    /// - If `name` is not an entry in any system schema
261    /// - If more than one system schema has an entry named `name`.
262    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
263
264    /// Gets an item by its ID.
265    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
266
267    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
268    /// exist.
269    ///
270    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
271    fn try_get_item_by_global_id<'a>(
272        &'a self,
273        id: &GlobalId,
274    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
275
276    /// Gets an item by its ID.
277    ///
278    /// Panics if `id` does not specify a valid item.
279    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
280
281    /// Gets an item by a [`GlobalId`].
282    ///
283    /// Panics if `id` does not specify a valid item.
284    ///
285    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
286    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
287
288    /// Gets all items.
289    fn get_items(&self) -> Vec<&dyn CatalogItem>;
290
291    /// Looks up an item by its name.
292    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
293
294    /// Looks up a type by its name.
295    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
296
297    /// Gets a cluster by ID.
298    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster<'_>;
299
300    /// Gets all clusters.
301    fn get_clusters(&self) -> Vec<&dyn CatalogCluster<'_>>;
302
303    /// Gets a cluster replica by ID.
304    fn get_cluster_replica(
305        &self,
306        cluster_id: ClusterId,
307        replica_id: ReplicaId,
308    ) -> &dyn CatalogClusterReplica<'_>;
309
310    /// Gets all cluster replicas.
311    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
312
313    /// Gets all system privileges.
314    fn get_system_privileges(&self) -> &PrivilegeMap;
315
316    /// Gets all default privileges.
317    fn get_default_privileges(
318        &self,
319    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
320
321    /// Finds a name like `name` that is not already in use.
322    ///
323    /// If `name` itself is available, it is returned unchanged.
324    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
325
326    /// Returns a fully qualified human readable name from fully qualified non-human readable name
327    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
328
329    /// Returns a fully qualified human readable schema name from fully qualified non-human
330    /// readable schema name
331    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
332
333    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
334    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
335
336    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
337    fn resolve_global_id(
338        &self,
339        item_id: &CatalogItemId,
340        version: RelationVersionSelector,
341    ) -> GlobalId;
342
343    /// Returns the configuration of the catalog.
344    fn config(&self) -> &CatalogConfig;
345
346    /// Returns the number of milliseconds since the system epoch. For normal use
347    /// this means the Unix epoch. This can safely be mocked in tests and start
348    /// at 0.
349    fn now(&self) -> EpochMillis;
350
351    /// Returns the set of supported AWS PrivateLink availability zone ids.
352    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
353
354    /// Returns system vars
355    fn system_vars(&self) -> &SystemVars;
356
357    /// Returns mutable system vars
358    ///
359    /// Clients should use this this method carefully, as changes to the backing
360    /// state here are not guarateed to be persisted. The motivating use case
361    /// for this method was ensuring that features are temporary turned on so
362    /// catalog rehydration does not break due to unsupported SQL syntax.
363    fn system_vars_mut(&mut self) -> &mut SystemVars;
364
365    /// Returns the [`RoleId`] of the owner of an object by its ID.
366    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
367
368    /// Returns the [`PrivilegeMap`] of the object.
369    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
370
371    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
372    ///
373    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
374    /// earlier in the list than the roots. This is particularly userful for the order to drop
375    /// objects.
376    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
377
378    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
379    ///
380    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
381    /// earlier in the list than `id`. This is particularly userful for the order to drop
382    /// objects.
383    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
384
385    /// Returns all possible privileges associated with an object type.
386    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
387
388    /// Returns the object type of `object_id`.
389    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
390
391    /// Returns the system object type of `id`.
392    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
393
394    /// Returns the minimal qualification required to unambiguously specify
395    /// `qualified_name`.
396    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
397
398    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
399    /// successfully executes.
400    fn add_notice(&self, notice: PlanNotice);
401
402    /// Returns the associated comments for the given `id`
403    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
404
405    /// Reports whether the specified cluster size is a modern "cc" size rather
406    /// than a legacy T-shirt size.
407    fn is_cluster_size_cc(&self, size: &str) -> bool;
408}
409
410/// Configuration associated with a catalog.
411#[derive(Debug, Clone)]
412pub struct CatalogConfig {
413    /// Returns the time at which the catalog booted.
414    pub start_time: DateTime<Utc>,
415    /// Returns the instant at which the catalog booted.
416    pub start_instant: Instant,
417    /// A random integer associated with this instance of the catalog.
418    ///
419    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
420    /// topics. Perhaps we can remove this when database-issues#977 is complete.
421    pub nonce: u64,
422    /// A persistent ID associated with the environment.
423    pub environment_id: EnvironmentId,
424    /// A transient UUID associated with this process.
425    pub session_id: Uuid,
426    /// Information about this build of Materialize.
427    pub build_info: &'static BuildInfo,
428    /// Function that returns a wall clock now time; can safely be mocked to return
429    /// 0.
430    pub now: NowFn,
431    /// Context for source and sink connections.
432    pub connection_context: ConnectionContext,
433    /// Which system builtins to include. Not allowed to change dynamically.
434    pub builtins_cfg: BuiltinsConfig,
435    /// Helm chart version
436    pub helm_chart_version: Option<String>,
437}
438
439/// A database in a [`SessionCatalog`].
440pub trait CatalogDatabase {
441    /// Returns a fully-specified name of the database.
442    fn name(&self) -> &str;
443
444    /// Returns a stable ID for the database.
445    fn id(&self) -> DatabaseId;
446
447    /// Returns whether the database contains schemas.
448    fn has_schemas(&self) -> bool;
449
450    /// Returns the schemas of the database as a map from schema name to
451    /// schema ID.
452    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
453
454    /// Returns the schemas of the database.
455    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
456
457    /// Returns the ID of the owning role.
458    fn owner_id(&self) -> RoleId;
459
460    /// Returns the privileges associated with the database.
461    fn privileges(&self) -> &PrivilegeMap;
462}
463
464/// A schema in a [`SessionCatalog`].
465pub trait CatalogSchema {
466    /// Returns a fully-specified id of the database
467    fn database(&self) -> &ResolvedDatabaseSpecifier;
468
469    /// Returns a fully-specified name of the schema.
470    fn name(&self) -> &QualifiedSchemaName;
471
472    /// Returns a stable ID for the schema.
473    fn id(&self) -> &SchemaSpecifier;
474
475    /// Lists the `CatalogItem`s for the schema.
476    fn has_items(&self) -> bool;
477
478    /// Returns the IDs of the items in the schema.
479    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
480
481    /// Returns the ID of the owning role.
482    fn owner_id(&self) -> RoleId;
483
484    /// Returns the privileges associated with the schema.
485    fn privileges(&self) -> &PrivilegeMap;
486}
487
488/// Parameters used to modify password
489#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
490pub struct PasswordConfig {
491    /// The Password.
492    pub password: Password,
493    /// a non default iteration count for hashing the password.
494    pub scram_iterations: NonZeroU32,
495}
496
497/// A modification of a role password in the catalog
498#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
499pub enum PasswordAction {
500    /// Set a new password.
501    Set(PasswordConfig),
502    /// Remove the existing password.
503    Clear,
504    /// Leave the existing password unchanged.
505    NoChange,
506}
507
508/// A raw representation of attributes belonging to a [`CatalogRole`] that we might
509/// get as input from the user. This includes the password.
510/// This struct explicitly does not implement `Serialize` or `Deserialize` to avoid
511/// accidentally serializing passwords.
512#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
513pub struct RoleAttributesRaw {
514    /// Indicates whether the role has inheritance of privileges.
515    pub inherit: bool,
516    /// The raw password of the role. This is for self managed auth, not cloud.
517    pub password: Option<Password>,
518    /// Hash iterations used to securely store passwords. This is for self-managed auth
519    pub scram_iterations: Option<NonZeroU32>,
520    /// Whether or not this user is a superuser.
521    pub superuser: Option<bool>,
522    /// Whether this role is login
523    pub login: Option<bool>,
524    // Force use of constructor.
525    _private: (),
526}
527
528/// Attributes belonging to a [`CatalogRole`].
529#[derive(
530    Debug,
531    Clone,
532    Eq,
533    Serialize,
534    Deserialize,
535    PartialEq,
536    Ord,
537    PartialOrd,
538    Arbitrary
539)]
540pub struct RoleAttributes {
541    /// Indicates whether the role has inheritance of privileges.
542    pub inherit: bool,
543    /// Whether or not this user is a superuser.
544    pub superuser: Option<bool>,
545    /// Whether this role is login
546    pub login: Option<bool>,
547    // Force use of constructor.
548    _private: (),
549}
550
551impl RoleAttributesRaw {
552    /// Creates a new [`RoleAttributesRaw`] with default attributes.
553    pub const fn new() -> RoleAttributesRaw {
554        RoleAttributesRaw {
555            inherit: true,
556            password: None,
557            scram_iterations: None,
558            superuser: None,
559            login: None,
560            _private: (),
561        }
562    }
563
564    /// Adds all attributes excluding password.
565    pub const fn with_all(mut self) -> RoleAttributesRaw {
566        self.inherit = true;
567        self.superuser = Some(true);
568        self.login = Some(true);
569        self
570    }
571}
572
573impl RoleAttributes {
574    /// Creates a new [`RoleAttributes`] with default attributes.
575    pub const fn new() -> RoleAttributes {
576        RoleAttributes {
577            inherit: true,
578            superuser: None,
579            login: None,
580            _private: (),
581        }
582    }
583
584    /// Adds all attributes except password.
585    pub const fn with_all(mut self) -> RoleAttributes {
586        self.inherit = true;
587        self.superuser = Some(true);
588        self.login = Some(true);
589        self
590    }
591
592    /// Returns whether or not the role has inheritence of privileges.
593    pub const fn is_inherit(&self) -> bool {
594        self.inherit
595    }
596}
597
598impl From<RoleAttributesRaw> for RoleAttributes {
599    fn from(
600        RoleAttributesRaw {
601            inherit,
602            superuser,
603            login,
604            ..
605        }: RoleAttributesRaw,
606    ) -> RoleAttributes {
607        RoleAttributes {
608            inherit,
609            superuser,
610            login,
611            _private: (),
612        }
613    }
614}
615
616impl From<RoleAttributes> for RoleAttributesRaw {
617    fn from(
618        RoleAttributes {
619            inherit,
620            superuser,
621            login,
622            ..
623        }: RoleAttributes,
624    ) -> RoleAttributesRaw {
625        RoleAttributesRaw {
626            inherit,
627            password: None,
628            scram_iterations: None,
629            superuser,
630            login,
631            _private: (),
632        }
633    }
634}
635
636impl From<PlannedRoleAttributes> for RoleAttributesRaw {
637    fn from(
638        PlannedRoleAttributes {
639            inherit,
640            password,
641            scram_iterations,
642            superuser,
643            login,
644            ..
645        }: PlannedRoleAttributes,
646    ) -> RoleAttributesRaw {
647        let default_attributes = RoleAttributesRaw::new();
648        RoleAttributesRaw {
649            inherit: inherit.unwrap_or(default_attributes.inherit),
650            password,
651            scram_iterations,
652            superuser,
653            login,
654            _private: (),
655        }
656    }
657}
658
659/// Default variable values for a [`CatalogRole`].
660#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
661pub struct RoleVars {
662    /// Map of variable names to their value.
663    pub map: BTreeMap<String, OwnedVarInput>,
664}
665
666/// A role in a [`SessionCatalog`].
667pub trait CatalogRole {
668    /// Returns a fully-specified name of the role.
669    fn name(&self) -> &str;
670
671    /// Returns a stable ID for the role.
672    fn id(&self) -> RoleId;
673
674    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
675    /// membership.
676    ///
677    /// Key is the role that some role is a member of, value is the grantor role ID.
678    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
679
680    /// Returns the attributes associated with this role.
681    fn attributes(&self) -> &RoleAttributes;
682
683    /// Returns all variables that this role has a default value stored for.
684    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
685}
686
687/// A network policy in a [`SessionCatalog`].
688pub trait CatalogNetworkPolicy {
689    /// Returns a fully-specified name of the NetworkPolicy.
690    fn name(&self) -> &str;
691
692    /// Returns a stable ID for the NetworkPolicy.
693    fn id(&self) -> NetworkPolicyId;
694
695    /// Returns the ID of the owning NetworkPolicy.
696    fn owner_id(&self) -> RoleId;
697
698    /// Returns the privileges associated with the NetworkPolicy.
699    fn privileges(&self) -> &PrivilegeMap;
700}
701
702/// A cluster in a [`SessionCatalog`].
703pub trait CatalogCluster<'a> {
704    /// Returns a fully-specified name of the cluster.
705    fn name(&self) -> &str;
706
707    /// Returns a stable ID for the cluster.
708    fn id(&self) -> ClusterId;
709
710    /// Returns the objects that are bound to this cluster.
711    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
712
713    /// Returns the replicas of the cluster as a map from replica name to
714    /// replica ID.
715    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
716
717    /// Returns the replicas of the cluster.
718    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
719
720    /// Returns the replica belonging to the cluster with replica ID `id`.
721    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_>;
722
723    /// Returns the ID of the owning role.
724    fn owner_id(&self) -> RoleId;
725
726    /// Returns the privileges associated with the cluster.
727    fn privileges(&self) -> &PrivilegeMap;
728
729    /// Returns true if this cluster is a managed cluster.
730    fn is_managed(&self) -> bool;
731
732    /// Returns the size of the cluster, if the cluster is a managed cluster.
733    fn managed_size(&self) -> Option<&str>;
734
735    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
736    fn schedule(&self) -> Option<&ClusterSchedule>;
737
738    /// Try to convert this cluster into a [`CreateClusterPlan`].
739    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
740    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
741}
742
743/// A cluster replica in a [`SessionCatalog`]
744pub trait CatalogClusterReplica<'a>: Debug {
745    /// Returns the name of the cluster replica.
746    fn name(&self) -> &str;
747
748    /// Returns a stable ID for the cluster that the replica belongs to.
749    fn cluster_id(&self) -> ClusterId;
750
751    /// Returns a stable ID for the replica.
752    fn replica_id(&self) -> ReplicaId;
753
754    /// Returns the ID of the owning role.
755    fn owner_id(&self) -> RoleId;
756
757    /// Returns whether or not the replica is internal
758    fn internal(&self) -> bool;
759}
760
761/// An item in a [`SessionCatalog`].
762///
763/// Note that "item" has a very specific meaning in the context of a SQL
764/// catalog, and refers to the various entities that belong to a schema.
765pub trait CatalogItem {
766    /// Returns the fully qualified name of the catalog item.
767    fn name(&self) -> &QualifiedItemName;
768
769    /// Returns the [`CatalogItemId`] for the item.
770    fn id(&self) -> CatalogItemId;
771
772    /// Returns the [`GlobalId`]s associated with this item.
773    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
774
775    /// Returns the catalog item's OID.
776    fn oid(&self) -> u32;
777
778    /// Returns the resolved function.
779    ///
780    /// If the catalog item is not of a type that produces functions (i.e.,
781    /// anything other than a function), it returns an error.
782    fn func(&self) -> Result<&'static Func, CatalogError>;
783
784    /// Returns the resolved source connection.
785    ///
786    /// If the catalog item is not of a type that contains a `SourceDesc`
787    /// (i.e., anything other than sources), it returns an error.
788    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
789
790    /// Returns the resolved connection.
791    ///
792    /// If the catalog item is not a connection, it returns an error.
793    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
794
795    /// Returns the type of the catalog item.
796    fn item_type(&self) -> CatalogItemType;
797
798    /// A normalized SQL statement that describes how to create the catalog
799    /// item.
800    fn create_sql(&self) -> &str;
801
802    /// Returns the IDs of the catalog items upon which this catalog item
803    /// directly references.
804    fn references(&self) -> &ResolvedIds;
805
806    /// Returns the IDs of the catalog items upon which this catalog item
807    /// depends.
808    fn uses(&self) -> BTreeSet<CatalogItemId>;
809
810    /// Returns the IDs of the catalog items that directly reference this catalog item.
811    fn referenced_by(&self) -> &[CatalogItemId];
812
813    /// Returns the IDs of the catalog items that depend upon this catalog item.
814    fn used_by(&self) -> &[CatalogItemId];
815
816    /// Reports whether this catalog entry is a subsource and, if it is, the
817    /// ingestion it is an export of, as well as the item it exports.
818    fn subsource_details(
819        &self,
820    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
821
822    /// Reports whether this catalog entry is a source export and, if it is, the
823    /// ingestion it is an export of, as well as the item it exports.
824    fn source_export_details(
825        &self,
826    ) -> Option<(
827        CatalogItemId,
828        &UnresolvedItemName,
829        &SourceExportDetails,
830        &SourceExportDataConfig<ReferencedConnection>,
831    )>;
832
833    /// Reports whether this catalog item is a progress source.
834    fn is_progress_source(&self) -> bool;
835
836    /// If this catalog item is a source, it return the IDs of its progress collection.
837    fn progress_id(&self) -> Option<CatalogItemId>;
838
839    /// Returns the index details associated with the catalog item, if the
840    /// catalog item is an index.
841    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
842
843    /// Returns the column defaults associated with the catalog item, if the
844    /// catalog item is a table that accepts writes.
845    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
846
847    /// The item this catalog item replaces, if any.
848    fn replacement_target(&self) -> Option<CatalogItemId>;
849
850    /// Returns the type information associated with the catalog item, if the
851    /// catalog item is a type.
852    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
853
854    /// Returns the ID of the owning role.
855    fn owner_id(&self) -> RoleId;
856
857    /// Returns the privileges associated with the item.
858    fn privileges(&self) -> &PrivilegeMap;
859
860    /// Returns the cluster the item belongs to.
861    fn cluster_id(&self) -> Option<ClusterId>;
862
863    /// Returns the [`CatalogCollectionItem`] for a specific version of this
864    /// [`CatalogItem`].
865    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
866
867    /// The latest version of this item, if it's version-able.
868    fn latest_version(&self) -> Option<RelationVersion>;
869}
870
871/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
872/// refers to.
873pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
874    /// Returns a description of the result set produced by the catalog item.
875    ///
876    /// If the catalog item is not of a type that produces data (e.g., a sink or
877    /// an index), it returns `None`.
878    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>>;
879
880    /// The [`GlobalId`] for this item.
881    fn global_id(&self) -> GlobalId;
882}
883
884/// The type of a [`CatalogItem`].
885#[derive(
886    Debug,
887    Deserialize,
888    Clone,
889    Copy,
890    Eq,
891    Hash,
892    Ord,
893    PartialEq,
894    PartialOrd,
895    Serialize
896)]
897pub enum CatalogItemType {
898    /// A table.
899    Table,
900    /// A source.
901    Source,
902    /// A sink.
903    Sink,
904    /// A view.
905    View,
906    /// A materialized view.
907    MaterializedView,
908    /// An index.
909    Index,
910    /// A type.
911    Type,
912    /// A func.
913    Func,
914    /// A secret.
915    Secret,
916    /// A connection.
917    Connection,
918    /// A continual task.
919    ContinualTask,
920}
921
922impl CatalogItemType {
923    /// Reports whether the given type of item conflicts with items of type
924    /// `CatalogItemType::Type`.
925    ///
926    /// In PostgreSQL, even though types live in a separate namespace from other
927    /// schema objects, creating a table, view, or materialized view creates a
928    /// type named after that relation. This prevents creating a type with the
929    /// same name as a relational object, even though types and relational
930    /// objects live in separate namespaces. (Indexes are even weirder; while
931    /// they don't get a type with the same name, they get an entry in
932    /// `pg_class` that prevents *record* types of the same name as the index,
933    /// but not other types of types, like enums.)
934    ///
935    /// We don't presently construct types that mirror relational objects,
936    /// though we likely will need to in the future for full PostgreSQL
937    /// compatibility (see database-issues#7142). For now, we use this method to
938    /// prevent creating types and relational objects that have the same name, so
939    /// that it is a backwards compatible change in the future to introduce a
940    /// type named after each relational object in the system.
941    pub fn conflicts_with_type(&self) -> bool {
942        match self {
943            CatalogItemType::Table => true,
944            CatalogItemType::Source => true,
945            CatalogItemType::View => true,
946            CatalogItemType::MaterializedView => true,
947            CatalogItemType::Index => true,
948            CatalogItemType::Type => true,
949            CatalogItemType::Sink => false,
950            CatalogItemType::Func => false,
951            CatalogItemType::Secret => false,
952            CatalogItemType::Connection => false,
953            CatalogItemType::ContinualTask => true,
954        }
955    }
956}
957
958impl fmt::Display for CatalogItemType {
959    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
960        match self {
961            CatalogItemType::Table => f.write_str("table"),
962            CatalogItemType::Source => f.write_str("source"),
963            CatalogItemType::Sink => f.write_str("sink"),
964            CatalogItemType::View => f.write_str("view"),
965            CatalogItemType::MaterializedView => f.write_str("materialized view"),
966            CatalogItemType::Index => f.write_str("index"),
967            CatalogItemType::Type => f.write_str("type"),
968            CatalogItemType::Func => f.write_str("func"),
969            CatalogItemType::Secret => f.write_str("secret"),
970            CatalogItemType::Connection => f.write_str("connection"),
971            CatalogItemType::ContinualTask => f.write_str("continual task"),
972        }
973    }
974}
975
976impl From<CatalogItemType> for ObjectType {
977    fn from(value: CatalogItemType) -> Self {
978        match value {
979            CatalogItemType::Table => ObjectType::Table,
980            CatalogItemType::Source => ObjectType::Source,
981            CatalogItemType::Sink => ObjectType::Sink,
982            CatalogItemType::View => ObjectType::View,
983            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
984            CatalogItemType::Index => ObjectType::Index,
985            CatalogItemType::Type => ObjectType::Type,
986            CatalogItemType::Func => ObjectType::Func,
987            CatalogItemType::Secret => ObjectType::Secret,
988            CatalogItemType::Connection => ObjectType::Connection,
989            CatalogItemType::ContinualTask => ObjectType::ContinualTask,
990        }
991    }
992}
993
994impl From<CatalogItemType> for mz_audit_log::ObjectType {
995    fn from(value: CatalogItemType) -> Self {
996        match value {
997            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
998            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
999            CatalogItemType::View => mz_audit_log::ObjectType::View,
1000            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
1001            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
1002            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
1003            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
1004            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
1005            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
1006            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
1007            CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
1008        }
1009    }
1010}
1011
1012/// Details about a type in the catalog.
1013#[derive(Clone, Debug, Eq, PartialEq)]
1014pub struct CatalogTypeDetails<T: TypeReference> {
1015    /// The ID of the type with this type as the array element, if available.
1016    pub array_id: Option<CatalogItemId>,
1017    /// The description of this type.
1018    pub typ: CatalogType<T>,
1019    /// Additional metadata about the type in PostgreSQL, if relevant.
1020    pub pg_metadata: Option<CatalogTypePgMetadata>,
1021}
1022
1023/// Additional PostgreSQL metadata about a type.
1024#[derive(Clone, Debug, Eq, PartialEq)]
1025pub struct CatalogTypePgMetadata {
1026    /// The OID of the `typinput` function in PostgreSQL.
1027    pub typinput_oid: u32,
1028    /// The OID of the `typreceive` function in PostgreSQL.
1029    pub typreceive_oid: u32,
1030}
1031
1032/// Represents a reference to type in the catalog
1033pub trait TypeReference {
1034    /// The actual type used to reference a `CatalogType`
1035    type Reference: Clone + Debug + Eq + PartialEq;
1036}
1037
1038/// Reference to a type by it's name
1039#[derive(Clone, Debug, Eq, PartialEq)]
1040pub struct NameReference;
1041
1042impl TypeReference for NameReference {
1043    type Reference = &'static str;
1044}
1045
1046/// Reference to a type by it's global ID
1047#[derive(Clone, Debug, Eq, PartialEq)]
1048pub struct IdReference;
1049
1050impl TypeReference for IdReference {
1051    type Reference = CatalogItemId;
1052}
1053
1054/// A type stored in the catalog.
1055///
1056/// The variants correspond one-to-one with [`mz_repr::SqlScalarType`], but with type
1057/// modifiers removed and with embedded types replaced with references to other
1058/// types in the catalog.
1059#[allow(missing_docs)]
1060#[derive(Clone, Debug, Eq, PartialEq)]
1061pub enum CatalogType<T: TypeReference> {
1062    AclItem,
1063    Array {
1064        element_reference: T::Reference,
1065    },
1066    Bool,
1067    Bytes,
1068    Char,
1069    Date,
1070    Float32,
1071    Float64,
1072    Int16,
1073    Int32,
1074    Int64,
1075    UInt16,
1076    UInt32,
1077    UInt64,
1078    MzTimestamp,
1079    Interval,
1080    Jsonb,
1081    List {
1082        element_reference: T::Reference,
1083        element_modifiers: Vec<i64>,
1084    },
1085    Map {
1086        key_reference: T::Reference,
1087        key_modifiers: Vec<i64>,
1088        value_reference: T::Reference,
1089        value_modifiers: Vec<i64>,
1090    },
1091    Numeric,
1092    Oid,
1093    PgLegacyChar,
1094    PgLegacyName,
1095    Pseudo,
1096    Range {
1097        element_reference: T::Reference,
1098    },
1099    Record {
1100        fields: Vec<CatalogRecordField<T>>,
1101    },
1102    RegClass,
1103    RegProc,
1104    RegType,
1105    String,
1106    Time,
1107    Timestamp,
1108    TimestampTz,
1109    Uuid,
1110    VarChar,
1111    Int2Vector,
1112    MzAclItem,
1113}
1114
1115impl CatalogType<IdReference> {
1116    /// Returns the relation description for the type, if the type is a record
1117    /// type.
1118    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1119        match &self {
1120            CatalogType::Record { fields } => {
1121                let mut desc = RelationDesc::builder();
1122                for f in fields {
1123                    let name = f.name.clone();
1124                    let ty = query::scalar_type_from_catalog(
1125                        catalog,
1126                        f.type_reference,
1127                        &f.type_modifiers,
1128                    )?;
1129                    // TODO: support plumbing `NOT NULL` constraints through
1130                    // `CREATE TYPE`.
1131                    let ty = ty.nullable(true);
1132                    desc = desc.with_column(name, ty);
1133                }
1134                Ok(Some(desc.finish()))
1135            }
1136            _ => Ok(None),
1137        }
1138    }
1139}
1140
1141/// A description of a field in a [`CatalogType::Record`].
1142#[derive(Clone, Debug, Eq, PartialEq)]
1143pub struct CatalogRecordField<T: TypeReference> {
1144    /// The name of the field.
1145    pub name: ColumnName,
1146    /// The ID of the type of the field.
1147    pub type_reference: T::Reference,
1148    /// Modifiers to apply to the type.
1149    pub type_modifiers: Vec<i64>,
1150}
1151
1152#[derive(Clone, Debug, Eq, PartialEq)]
1153/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1154///
1155/// Note that Materialize also uses a number of pseudotypes when planning, but
1156/// we have yet to need to integrate them with `TypeCategory`.
1157///
1158/// [typcategory]:
1159/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1160pub enum TypeCategory {
1161    /// Array type.
1162    Array,
1163    /// Bit string type.
1164    BitString,
1165    /// Boolean type.
1166    Boolean,
1167    /// Composite type.
1168    Composite,
1169    /// Date/time type.
1170    DateTime,
1171    /// Enum type.
1172    Enum,
1173    /// Geometric type.
1174    Geometric,
1175    /// List type. Materialize specific.
1176    List,
1177    /// Network address type.
1178    NetworkAddress,
1179    /// Numeric type.
1180    Numeric,
1181    /// Pseudo type.
1182    Pseudo,
1183    /// Range type.
1184    Range,
1185    /// String type.
1186    String,
1187    /// Timestamp type.
1188    Timespan,
1189    /// User-defined type.
1190    UserDefined,
1191    /// Unknown type.
1192    Unknown,
1193}
1194
1195impl fmt::Display for TypeCategory {
1196    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1197        f.write_str(match self {
1198            TypeCategory::Array => "array",
1199            TypeCategory::BitString => "bit-string",
1200            TypeCategory::Boolean => "boolean",
1201            TypeCategory::Composite => "composite",
1202            TypeCategory::DateTime => "date-time",
1203            TypeCategory::Enum => "enum",
1204            TypeCategory::Geometric => "geometric",
1205            TypeCategory::List => "list",
1206            TypeCategory::NetworkAddress => "network-address",
1207            TypeCategory::Numeric => "numeric",
1208            TypeCategory::Pseudo => "pseudo",
1209            TypeCategory::Range => "range",
1210            TypeCategory::String => "string",
1211            TypeCategory::Timespan => "timespan",
1212            TypeCategory::UserDefined => "user-defined",
1213            TypeCategory::Unknown => "unknown",
1214        })
1215    }
1216}
1217
1218/// Identifies an environment.
1219///
1220/// Outside of tests, an environment ID can be constructed only from a string of
1221/// the following form:
1222///
1223/// ```text
1224/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1225/// ```
1226///
1227/// The fields have the following formats:
1228///
1229/// * The cloud provider consists of one or more alphanumeric characters.
1230/// * The cloud provider region consists of one or more alphanumeric or hyphen
1231///   characters.
1232/// * The organization ID is a UUID in its canonical text format.
1233/// * The ordinal is a decimal number with between one and eight digits.
1234///
1235/// There is no way to construct an environment ID from parts, to ensure that
1236/// the `Display` representation is parseable according to the above rules.
1237// NOTE(benesch): ideally we'd have accepted the components of the environment
1238// ID using separate command-line arguments, or at least a string format that
1239// used a field separator that did not appear in the fields. Alas. We can't
1240// easily change it now, as it's used as the e.g. default sink progress topic.
1241#[derive(Debug, Clone, PartialEq)]
1242pub struct EnvironmentId {
1243    cloud_provider: CloudProvider,
1244    cloud_provider_region: String,
1245    organization_id: Uuid,
1246    ordinal: u64,
1247}
1248
1249impl EnvironmentId {
1250    /// Creates a dummy `EnvironmentId` for use in tests.
1251    pub fn for_tests() -> EnvironmentId {
1252        EnvironmentId {
1253            cloud_provider: CloudProvider::Local,
1254            cloud_provider_region: "az1".into(),
1255            organization_id: Uuid::new_v4(),
1256            ordinal: 0,
1257        }
1258    }
1259
1260    /// Returns the cloud provider associated with this environment ID.
1261    pub fn cloud_provider(&self) -> &CloudProvider {
1262        &self.cloud_provider
1263    }
1264
1265    /// Returns the cloud provider region associated with this environment ID.
1266    pub fn cloud_provider_region(&self) -> &str {
1267        &self.cloud_provider_region
1268    }
1269
1270    /// Returns the name of the region associted with this environment ID.
1271    ///
1272    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1273    /// [`EnvironmentId::cloud_provider_region`].
1274    pub fn region(&self) -> String {
1275        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1276    }
1277
1278    /// Returns the organization ID associated with this environment ID.
1279    pub fn organization_id(&self) -> Uuid {
1280        self.organization_id
1281    }
1282
1283    /// Returns the ordinal associated with this environment ID.
1284    pub fn ordinal(&self) -> u64 {
1285        self.ordinal
1286    }
1287}
1288
1289// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1290// populated using this key. Consequently, any changes to that trait
1291// implementation will also have to be reflected in the existing feature
1292// targeting config in LaunchDarkly, otherwise environments might receive
1293// different configs upon restart.
1294impl fmt::Display for EnvironmentId {
1295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1296        write!(
1297            f,
1298            "{}-{}-{}-{}",
1299            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1300        )
1301    }
1302}
1303
1304impl FromStr for EnvironmentId {
1305    type Err = InvalidEnvironmentIdError;
1306
1307    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1308        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1309            Regex::new(
1310                "^(?P<cloud_provider>[[:alnum:]]+)-\
1311                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1312                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1313                  (?P<ordinal>\\d{1,8})$"
1314            ).unwrap()
1315        });
1316        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1317        Ok(EnvironmentId {
1318            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1319            cloud_provider_region: captures["cloud_provider_region"].into(),
1320            organization_id: captures["organization_id"]
1321                .parse()
1322                .map_err(|_| InvalidEnvironmentIdError)?,
1323            ordinal: captures["ordinal"]
1324                .parse()
1325                .map_err(|_| InvalidEnvironmentIdError)?,
1326        })
1327    }
1328}
1329
1330/// The error type for [`EnvironmentId::from_str`].
1331#[derive(Debug, Clone, PartialEq)]
1332pub struct InvalidEnvironmentIdError;
1333
1334impl fmt::Display for InvalidEnvironmentIdError {
1335    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1336        f.write_str("invalid environment ID")
1337    }
1338}
1339
1340impl Error for InvalidEnvironmentIdError {}
1341
1342impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1343    fn from(_: InvalidCloudProviderError) -> Self {
1344        InvalidEnvironmentIdError
1345    }
1346}
1347
1348/// An error returned by the catalog.
1349#[derive(Clone, Debug, Eq, PartialEq)]
1350pub enum CatalogError {
1351    /// Unknown database.
1352    UnknownDatabase(String),
1353    /// Database already exists.
1354    DatabaseAlreadyExists(String),
1355    /// Unknown schema.
1356    UnknownSchema(String),
1357    /// Schema already exists.
1358    SchemaAlreadyExists(String),
1359    /// Unknown role.
1360    UnknownRole(String),
1361    /// Role already exists.
1362    RoleAlreadyExists(String),
1363    /// Network Policy already exists.
1364    NetworkPolicyAlreadyExists(String),
1365    /// Unknown cluster.
1366    UnknownCluster(String),
1367    /// Unexpected builtin cluster.
1368    UnexpectedBuiltinCluster(String),
1369    /// Unexpected builtin cluster.
1370    UnexpectedBuiltinClusterType(String),
1371    /// Cluster already exists.
1372    ClusterAlreadyExists(String),
1373    /// Unknown cluster replica.
1374    UnknownClusterReplica(String),
1375    /// Unknown cluster replica size.
1376    UnknownClusterReplicaSize(String),
1377    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1378    DuplicateReplica(String, String),
1379    /// Unknown item.
1380    UnknownItem(String),
1381    /// Item already exists.
1382    ItemAlreadyExists(CatalogItemId, String),
1383    /// Unknown function.
1384    UnknownFunction {
1385        /// The identifier of the function we couldn't find
1386        name: String,
1387        /// A suggested alternative to the named function.
1388        alternative: Option<String>,
1389    },
1390    /// Unknown type.
1391    UnknownType {
1392        /// The identifier of the type we couldn't find.
1393        name: String,
1394    },
1395    /// Unknown connection.
1396    UnknownConnection(String),
1397    /// Unknown network policy.
1398    UnknownNetworkPolicy(String),
1399    /// Expected the catalog item to have the given type, but it did not.
1400    UnexpectedType {
1401        /// The item's name.
1402        name: String,
1403        /// The actual type of the item.
1404        actual_type: CatalogItemType,
1405        /// The expected type of the item.
1406        expected_type: CatalogItemType,
1407    },
1408    /// Ran out of unique IDs.
1409    IdExhaustion,
1410    /// Ran out of unique OIDs.
1411    OidExhaustion,
1412    /// Timeline already exists.
1413    TimelineAlreadyExists(String),
1414    /// Id Allocator already exists.
1415    IdAllocatorAlreadyExists(String),
1416    /// Config already exists.
1417    ConfigAlreadyExists(String),
1418    /// Builtin migrations failed.
1419    FailedBuiltinSchemaMigration(String),
1420    /// StorageCollectionMetadata already exists.
1421    StorageCollectionMetadataAlreadyExists(GlobalId),
1422}
1423
1424impl fmt::Display for CatalogError {
1425    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1426        match self {
1427            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1428            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1429            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1430            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1431            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1432            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1433            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1434            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1435            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1436            Self::NetworkPolicyAlreadyExists(name) => {
1437                write!(f, "network policy '{name}' already exists")
1438            }
1439            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1440            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1441            Self::UnexpectedBuiltinCluster(name) => {
1442                write!(f, "Unexpected builtin cluster '{}'", name)
1443            }
1444            Self::UnexpectedBuiltinClusterType(name) => {
1445                write!(f, "Unexpected builtin cluster type'{}'", name)
1446            }
1447            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1448            Self::UnknownClusterReplica(name) => {
1449                write!(f, "unknown cluster replica '{}'", name)
1450            }
1451            Self::UnknownClusterReplicaSize(name) => {
1452                write!(f, "unknown cluster replica size '{}'", name)
1453            }
1454            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1455                f,
1456                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1457            ),
1458            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1459            Self::ItemAlreadyExists(_gid, name) => {
1460                write!(f, "catalog item '{name}' already exists")
1461            }
1462            Self::UnexpectedType {
1463                name,
1464                actual_type,
1465                expected_type,
1466            } => {
1467                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1468            }
1469            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1470            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1471            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1472            Self::IdAllocatorAlreadyExists(name) => {
1473                write!(f, "ID allocator '{name}' already exists")
1474            }
1475            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1476            Self::FailedBuiltinSchemaMigration(objects) => {
1477                write!(f, "failed to migrate schema of builtin objects: {objects}")
1478            }
1479            Self::StorageCollectionMetadataAlreadyExists(key) => {
1480                write!(f, "storage metadata for '{key}' already exists")
1481            }
1482        }
1483    }
1484}
1485
1486impl CatalogError {
1487    /// Returns any applicable hints for [`CatalogError`].
1488    pub fn hint(&self) -> Option<String> {
1489        match self {
1490            CatalogError::UnknownFunction { alternative, .. } => {
1491                match alternative {
1492                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1493                    Some(alt) => Some(format!("Try using {alt}")),
1494                }
1495            }
1496            _ => None,
1497        }
1498    }
1499}
1500
1501impl Error for CatalogError {}
1502
1503// Enum variant docs would be useless here.
1504#[allow(missing_docs)]
1505#[derive(
1506    Debug,
1507    Clone,
1508    PartialOrd,
1509    Ord,
1510    PartialEq,
1511    Eq,
1512    Hash,
1513    Copy,
1514    Deserialize,
1515    Serialize
1516)]
1517/// The types of objects stored in the catalog.
1518pub enum ObjectType {
1519    Table,
1520    View,
1521    MaterializedView,
1522    Source,
1523    Sink,
1524    Index,
1525    Type,
1526    Role,
1527    Cluster,
1528    ClusterReplica,
1529    Secret,
1530    Connection,
1531    Database,
1532    Schema,
1533    Func,
1534    ContinualTask,
1535    NetworkPolicy,
1536}
1537
1538impl ObjectType {
1539    /// Reports if the object type can be treated as a relation.
1540    pub fn is_relation(&self) -> bool {
1541        match self {
1542            ObjectType::Table
1543            | ObjectType::View
1544            | ObjectType::MaterializedView
1545            | ObjectType::Source
1546            | ObjectType::ContinualTask => true,
1547            ObjectType::Sink
1548            | ObjectType::Index
1549            | ObjectType::Type
1550            | ObjectType::Secret
1551            | ObjectType::Connection
1552            | ObjectType::Func
1553            | ObjectType::Database
1554            | ObjectType::Schema
1555            | ObjectType::Cluster
1556            | ObjectType::ClusterReplica
1557            | ObjectType::Role
1558            | ObjectType::NetworkPolicy => false,
1559        }
1560    }
1561}
1562
1563impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1564    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1565        match value {
1566            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1567            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1568            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1569            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1570            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1571            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1572            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1573            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1574            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1575            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1576            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1577            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1578            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1579            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1580            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1581            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1582            mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask,
1583            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1584        }
1585    }
1586}
1587
1588impl From<CommentObjectId> for ObjectType {
1589    fn from(value: CommentObjectId) -> ObjectType {
1590        match value {
1591            CommentObjectId::Table(_) => ObjectType::Table,
1592            CommentObjectId::View(_) => ObjectType::View,
1593            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1594            CommentObjectId::Source(_) => ObjectType::Source,
1595            CommentObjectId::Sink(_) => ObjectType::Sink,
1596            CommentObjectId::Index(_) => ObjectType::Index,
1597            CommentObjectId::Func(_) => ObjectType::Func,
1598            CommentObjectId::Connection(_) => ObjectType::Connection,
1599            CommentObjectId::Type(_) => ObjectType::Type,
1600            CommentObjectId::Secret(_) => ObjectType::Secret,
1601            CommentObjectId::Role(_) => ObjectType::Role,
1602            CommentObjectId::Database(_) => ObjectType::Database,
1603            CommentObjectId::Schema(_) => ObjectType::Schema,
1604            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1605            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1606            CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1607            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1608        }
1609    }
1610}
1611
1612impl Display for ObjectType {
1613    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1614        f.write_str(match self {
1615            ObjectType::Table => "TABLE",
1616            ObjectType::View => "VIEW",
1617            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1618            ObjectType::Source => "SOURCE",
1619            ObjectType::Sink => "SINK",
1620            ObjectType::Index => "INDEX",
1621            ObjectType::Type => "TYPE",
1622            ObjectType::Role => "ROLE",
1623            ObjectType::Cluster => "CLUSTER",
1624            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1625            ObjectType::Secret => "SECRET",
1626            ObjectType::Connection => "CONNECTION",
1627            ObjectType::Database => "DATABASE",
1628            ObjectType::Schema => "SCHEMA",
1629            ObjectType::Func => "FUNCTION",
1630            ObjectType::ContinualTask => "CONTINUAL TASK",
1631            ObjectType::NetworkPolicy => "NETWORK POLICY",
1632        })
1633    }
1634}
1635
1636#[derive(
1637    Debug,
1638    Clone,
1639    PartialOrd,
1640    Ord,
1641    PartialEq,
1642    Eq,
1643    Hash,
1644    Copy,
1645    Deserialize,
1646    Serialize
1647)]
1648/// The types of objects in the system.
1649pub enum SystemObjectType {
1650    /// Catalog object type.
1651    Object(ObjectType),
1652    /// Entire system.
1653    System,
1654}
1655
1656impl SystemObjectType {
1657    /// Reports if the object type can be treated as a relation.
1658    pub fn is_relation(&self) -> bool {
1659        match self {
1660            SystemObjectType::Object(object_type) => object_type.is_relation(),
1661            SystemObjectType::System => false,
1662        }
1663    }
1664}
1665
1666impl Display for SystemObjectType {
1667    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1668        match self {
1669            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1670            SystemObjectType::System => f.write_str("SYSTEM"),
1671        }
1672    }
1673}
1674
1675/// Enum used to format object names in error messages.
1676#[derive(Debug, Clone, PartialEq, Eq)]
1677pub enum ErrorMessageObjectDescription {
1678    /// The name of a specific object.
1679    Object {
1680        /// Type of object.
1681        object_type: ObjectType,
1682        /// Name of object.
1683        object_name: Option<String>,
1684    },
1685    /// The name of the entire system.
1686    System,
1687}
1688
1689impl ErrorMessageObjectDescription {
1690    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1691    pub fn from_id(
1692        object_id: &ObjectId,
1693        catalog: &dyn SessionCatalog,
1694    ) -> ErrorMessageObjectDescription {
1695        let object_name = match object_id {
1696            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1697            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1698                .get_cluster_replica(*cluster_id, *replica_id)
1699                .name()
1700                .to_string(),
1701            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1702            ObjectId::Schema((database_spec, schema_spec)) => {
1703                let name = catalog.get_schema(database_spec, schema_spec).name();
1704                catalog.resolve_full_schema_name(name).to_string()
1705            }
1706            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1707            ObjectId::Item(id) => {
1708                let name = catalog.get_item(id).name();
1709                catalog.resolve_full_name(name).to_string()
1710            }
1711            ObjectId::NetworkPolicy(network_policy_id) => catalog
1712                .get_network_policy(network_policy_id)
1713                .name()
1714                .to_string(),
1715        };
1716        ErrorMessageObjectDescription::Object {
1717            object_type: catalog.get_object_type(object_id),
1718            object_name: Some(object_name),
1719        }
1720    }
1721
1722    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1723    pub fn from_sys_id(
1724        object_id: &SystemObjectId,
1725        catalog: &dyn SessionCatalog,
1726    ) -> ErrorMessageObjectDescription {
1727        match object_id {
1728            SystemObjectId::Object(object_id) => {
1729                ErrorMessageObjectDescription::from_id(object_id, catalog)
1730            }
1731            SystemObjectId::System => ErrorMessageObjectDescription::System,
1732        }
1733    }
1734
1735    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1736    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1737        match object_type {
1738            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1739                object_type,
1740                object_name: None,
1741            },
1742            SystemObjectType::System => ErrorMessageObjectDescription::System,
1743        }
1744    }
1745}
1746
1747impl Display for ErrorMessageObjectDescription {
1748    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1749        match self {
1750            ErrorMessageObjectDescription::Object {
1751                object_type,
1752                object_name,
1753            } => {
1754                let object_name = object_name
1755                    .as_ref()
1756                    .map(|object_name| format!(" {}", object_name.quoted()))
1757                    .unwrap_or_else(|| "".to_string());
1758                write!(f, "{object_type}{object_name}")
1759            }
1760            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1761        }
1762    }
1763}
1764
1765#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1766// These attributes are needed because the key of a map must be a string. We also
1767// get the added benefit of flattening this struct in it's serialized form.
1768#[serde(into = "BTreeMap<String, RoleId>")]
1769#[serde(try_from = "BTreeMap<String, RoleId>")]
1770/// Represents the grantee and a grantor of a role membership.
1771pub struct RoleMembership {
1772    /// Key is the role that some role is a member of, value is the grantor role ID.
1773    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1774    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1775    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1776    // migration.
1777    pub map: BTreeMap<RoleId, RoleId>,
1778}
1779
1780impl RoleMembership {
1781    /// Creates a new [`RoleMembership`].
1782    pub fn new() -> RoleMembership {
1783        RoleMembership {
1784            map: BTreeMap::new(),
1785        }
1786    }
1787}
1788
1789impl From<RoleMembership> for BTreeMap<String, RoleId> {
1790    fn from(value: RoleMembership) -> Self {
1791        value
1792            .map
1793            .into_iter()
1794            .map(|(k, v)| (k.to_string(), v))
1795            .collect()
1796    }
1797}
1798
1799impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1800    type Error = anyhow::Error;
1801
1802    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1803        Ok(RoleMembership {
1804            map: value
1805                .into_iter()
1806                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1807                .collect::<Result<_, anyhow::Error>>()?,
1808        })
1809    }
1810}
1811
1812/// Specification for objects that will be affected by a default privilege.
1813#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1814pub struct DefaultPrivilegeObject {
1815    /// The role id that created the object.
1816    pub role_id: RoleId,
1817    /// The database that the object is created in if Some, otherwise all databases.
1818    pub database_id: Option<DatabaseId>,
1819    /// The schema that the object is created in if Some, otherwise all databases.
1820    pub schema_id: Option<SchemaId>,
1821    /// The type of object.
1822    pub object_type: ObjectType,
1823}
1824
1825impl DefaultPrivilegeObject {
1826    /// Creates a new [`DefaultPrivilegeObject`].
1827    pub fn new(
1828        role_id: RoleId,
1829        database_id: Option<DatabaseId>,
1830        schema_id: Option<SchemaId>,
1831        object_type: ObjectType,
1832    ) -> DefaultPrivilegeObject {
1833        DefaultPrivilegeObject {
1834            role_id,
1835            database_id,
1836            schema_id,
1837            object_type,
1838        }
1839    }
1840}
1841
1842impl std::fmt::Display for DefaultPrivilegeObject {
1843    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1844        // TODO: Don't just wrap Debug.
1845        write!(f, "{self:?}")
1846    }
1847}
1848
1849/// Specification for the privileges that will be granted from default privileges.
1850#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1851pub struct DefaultPrivilegeAclItem {
1852    /// The role that will receive the privileges.
1853    pub grantee: RoleId,
1854    /// The specific privileges granted.
1855    pub acl_mode: AclMode,
1856}
1857
1858impl DefaultPrivilegeAclItem {
1859    /// Creates a new [`DefaultPrivilegeAclItem`].
1860    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1861        DefaultPrivilegeAclItem { grantee, acl_mode }
1862    }
1863
1864    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1865    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1866        MzAclItem {
1867            grantee: self.grantee,
1868            grantor,
1869            acl_mode: self.acl_mode,
1870        }
1871    }
1872}
1873
1874/// Which builtins to return in `BUILTINS::iter`.
1875///
1876/// All calls to `BUILTINS::iter` within the lifetime of an environmentd process
1877/// must provide an equal `BuiltinsConfig`. It is not allowed to change
1878/// dynamically.
1879#[derive(Debug, Clone)]
1880pub struct BuiltinsConfig {
1881    /// If true, include system builtin continual tasks.
1882    pub include_continual_tasks: bool,
1883}
1884
1885#[cfg(test)]
1886mod tests {
1887    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1888
1889    #[mz_ore::test]
1890    fn test_environment_id() {
1891        for (input, expected) in [
1892            (
1893                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1894                Ok(EnvironmentId {
1895                    cloud_provider: CloudProvider::Local,
1896                    cloud_provider_region: "az1".into(),
1897                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1898                    ordinal: 452,
1899                }),
1900            ),
1901            (
1902                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1903                Ok(EnvironmentId {
1904                    cloud_provider: CloudProvider::Aws,
1905                    cloud_provider_region: "us-east-1".into(),
1906                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1907                    ordinal: 0,
1908                }),
1909            ),
1910            (
1911                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1912                Ok(EnvironmentId {
1913                    cloud_provider: CloudProvider::Gcp,
1914                    cloud_provider_region: "us-central1".into(),
1915                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1916                    ordinal: 0,
1917                }),
1918            ),
1919            (
1920                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1921                Ok(EnvironmentId {
1922                    cloud_provider: CloudProvider::Azure,
1923                    cloud_provider_region: "australiaeast".into(),
1924                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1925                    ordinal: 0,
1926                }),
1927            ),
1928            (
1929                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1930                Ok(EnvironmentId {
1931                    cloud_provider: CloudProvider::Generic,
1932                    cloud_provider_region: "moon-station-11-darkside".into(),
1933                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1934                    ordinal: 0,
1935                }),
1936            ),
1937            ("", Err(InvalidEnvironmentIdError)),
1938            (
1939                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1940                Err(InvalidEnvironmentIdError),
1941            ),
1942            (
1943                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1944                Err(InvalidEnvironmentIdError),
1945            ),
1946            (
1947                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1948                Err(InvalidEnvironmentIdError),
1949            ),
1950        ] {
1951            let actual = input.parse();
1952            assert_eq!(expected, actual, "input = {}", input);
1953            if let Ok(actual) = actual {
1954                assert_eq!(input, actual.to_string(), "input = {}", input);
1955            }
1956        }
1957    }
1958}