mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::str::FromStr;
20use std::sync::LazyLock;
21use std::time::{Duration, Instant};
22
23use chrono::{DateTime, Utc};
24use mz_auth::password::Password;
25use mz_build_info::BuildInfo;
26use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::MirScalarExpr;
29use mz_ore::now::{EpochMillis, NowFn};
30use mz_ore::str::StrExt;
31use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
32use mz_repr::explain::ExprHumanizer;
33use mz_repr::network_policy_id::NetworkPolicyId;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
37};
38use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
39use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
40use mz_storage_types::connections::{Connection, ConnectionContext};
41use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
42use proptest_derive::Arbitrary;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use uuid::Uuid;
46
47use crate::func::Func;
48use crate::names::{
49    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
50    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
51    SchemaSpecifier, SystemObjectId,
52};
53use crate::plan::statement::StatementDesc;
54use crate::plan::statement::ddl::PlannedRoleAttributes;
55use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
56use crate::session::vars::{OwnedVarInput, SystemVars};
57
58/// A catalog keeps track of SQL objects and session state available to the
59/// planner.
60///
61/// The `sql` crate is agnostic to any particular catalog implementation. This
62/// trait describes the required interface.
63///
64/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
65/// catalog contains databases, databases contain schemas, and schemas contain
66/// catalog items, like sources, sinks, view, and indexes.
67///
68/// There are two classes of operations provided by a catalog:
69///
70///   * Resolution operations, like [`resolve_item`]. These fill in missing name
71///     components based upon connection defaults, e.g., resolving the partial
72///     name `view42` to the fully-specified name `materialize.public.view42`.
73///
74///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
75///     metadata about a catalog entity based on a fully-specified name that is
76///     known to be valid (i.e., because the name was successfully resolved, or
77///     was constructed based on the output of a prior lookup operation). These
78///     functions panic if called with invalid input.
79///
80///   * Session management, such as managing variables' states and adding
81///     notices to the session.
82///
83/// [`get_databases`]: SessionCatalog::get_databases
84/// [`get_item`]: SessionCatalog::get_item
85/// [`resolve_item`]: SessionCatalog::resolve_item
86pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
87    /// Returns the id of the role that is issuing the query.
88    fn active_role_id(&self) -> &RoleId;
89
90    /// Returns the database to use if one is not explicitly specified.
91    fn active_database_name(&self) -> Option<&str> {
92        self.active_database()
93            .map(|id| self.get_database(id))
94            .map(|db| db.name())
95    }
96
97    /// Returns the database to use if one is not explicitly specified.
98    fn active_database(&self) -> Option<&DatabaseId>;
99
100    /// Returns the cluster to use if one is not explicitly specified.
101    fn active_cluster(&self) -> &str;
102
103    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
104    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
105
106    /// Returns the descriptor of the named prepared statement on the session, or
107    /// None if the prepared statement does not exist.
108    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
109
110    /// Retrieves a reference to the specified portal's descriptor.
111    ///
112    /// If there is no such portal, returns `None`.
113    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc>;
114
115    /// Resolves the named database.
116    ///
117    /// If `database_name` exists in the catalog, it returns a reference to the
118    /// resolved database; otherwise it returns an error.
119    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
120
121    /// Gets a database by its ID.
122    ///
123    /// Panics if `id` does not specify a valid database.
124    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
125
126    /// Gets all databases.
127    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
128
129    /// Resolves a partially-specified schema name.
130    ///
131    /// If the schema exists in the catalog, it returns a reference to the
132    /// resolved schema; otherwise it returns an error.
133    fn resolve_schema(
134        &self,
135        database_name: Option<&str>,
136        schema_name: &str,
137    ) -> Result<&dyn CatalogSchema, CatalogError>;
138
139    /// Resolves a schema name within a specified database.
140    ///
141    /// If the schema exists in the database, it returns a reference to the
142    /// resolved schema; otherwise it returns an error.
143    fn resolve_schema_in_database(
144        &self,
145        database_spec: &ResolvedDatabaseSpecifier,
146        schema_name: &str,
147    ) -> Result<&dyn CatalogSchema, CatalogError>;
148
149    /// Gets a schema by its ID.
150    ///
151    /// Panics if `id` does not specify a valid schema.
152    fn get_schema(
153        &self,
154        database_spec: &ResolvedDatabaseSpecifier,
155        schema_spec: &SchemaSpecifier,
156    ) -> &dyn CatalogSchema;
157
158    /// Gets all schemas.
159    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
160
161    /// Gets the mz_internal schema id.
162    fn get_mz_internal_schema_id(&self) -> SchemaId;
163
164    /// Gets the mz_unsafe schema id.
165    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
166
167    /// Returns true if `schema` is an internal system schema, false otherwise
168    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
169
170    /// Resolves the named role.
171    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
172
173    /// Resolves the named network policy.
174    fn resolve_network_policy(
175        &self,
176        network_policy_name: &str,
177    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
178
179    /// Gets a role by its ID.
180    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
181
182    /// Gets a role by its ID.
183    ///
184    /// Panics if `id` does not specify a valid role.
185    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
186
187    /// Gets all roles.
188    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
189
190    /// Gets the id of the `mz_system` role.
191    fn mz_system_role_id(&self) -> RoleId;
192
193    /// Collects all role IDs that `id` is transitively a member of.
194    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
195
196    /// Resolves the named cluster.
197    /// Gets a network_policy by its ID.
198    ///
199    /// Panics if `id` does not specify a valid role.
200    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
201
202    /// Gets all roles.
203    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
204
205    ///
206    /// If the provided name is `None`, resolves the currently active cluster.
207    fn resolve_cluster<'a, 'b>(
208        &'a self,
209        cluster_name: Option<&'b str>,
210    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
211
212    /// Resolves the named cluster replica.
213    fn resolve_cluster_replica<'a, 'b>(
214        &'a self,
215        cluster_replica_name: &'b QualifiedReplica,
216    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
217
218    /// Resolves a partially-specified item name, that is NOT a function or
219    /// type. (For resolving functions or types, please use
220    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
221    ///
222    /// If the partial name has a database component, it searches only the
223    /// specified database; otherwise, it searches the active database. If the
224    /// partial name has a schema component, it searches only the specified
225    /// schema; otherwise, it searches a default set of schemas within the
226    /// selected database. It returns an error if none of the searched schemas
227    /// contain an item whose name matches the item component of the partial
228    /// name.
229    ///
230    /// Note that it is not an error if the named item appears in more than one
231    /// of the search schemas. The catalog implementation must choose one.
232    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
233
234    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
235    /// functions within the catalog.
236    fn resolve_function(
237        &self,
238        item_name: &PartialItemName,
239    ) -> Result<&dyn CatalogItem, CatalogError>;
240
241    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
242    /// types within the catalog.
243    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
244
245    /// Resolves `name` to a type or item, preferring the type if both exist.
246    fn resolve_item_or_type(
247        &self,
248        name: &PartialItemName,
249    ) -> Result<&dyn CatalogItem, CatalogError> {
250        if let Ok(ty) = self.resolve_type(name) {
251            return Ok(ty);
252        }
253        self.resolve_item(name)
254    }
255
256    /// Gets a type named `name` from exactly one of the system schemas.
257    ///
258    /// # Panics
259    /// - If `name` is not an entry in any system schema
260    /// - If more than one system schema has an entry named `name`.
261    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
262
263    /// Gets an item by its ID.
264    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
265
266    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
267    /// exist.
268    ///
269    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
270    fn try_get_item_by_global_id<'a>(
271        &'a self,
272        id: &GlobalId,
273    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
274
275    /// Gets an item by its ID.
276    ///
277    /// Panics if `id` does not specify a valid item.
278    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
279
280    /// Gets an item by a [`GlobalId`].
281    ///
282    /// Panics if `id` does not specify a valid item.
283    ///
284    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
285    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
286
287    /// Gets all items.
288    fn get_items(&self) -> Vec<&dyn CatalogItem>;
289
290    /// Looks up an item by its name.
291    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
292
293    /// Looks up a type by its name.
294    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
295
296    /// Gets a cluster by ID.
297    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster<'_>;
298
299    /// Gets all clusters.
300    fn get_clusters(&self) -> Vec<&dyn CatalogCluster<'_>>;
301
302    /// Gets a cluster replica by ID.
303    fn get_cluster_replica(
304        &self,
305        cluster_id: ClusterId,
306        replica_id: ReplicaId,
307    ) -> &dyn CatalogClusterReplica<'_>;
308
309    /// Gets all cluster replicas.
310    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
311
312    /// Gets all system privileges.
313    fn get_system_privileges(&self) -> &PrivilegeMap;
314
315    /// Gets all default privileges.
316    fn get_default_privileges(
317        &self,
318    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
319
320    /// Finds a name like `name` that is not already in use.
321    ///
322    /// If `name` itself is available, it is returned unchanged.
323    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
324
325    /// Returns a fully qualified human readable name from fully qualified non-human readable name
326    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
327
328    /// Returns a fully qualified human readable schema name from fully qualified non-human
329    /// readable schema name
330    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
331
332    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
333    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
334
335    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
336    fn resolve_global_id(
337        &self,
338        item_id: &CatalogItemId,
339        version: RelationVersionSelector,
340    ) -> GlobalId;
341
342    /// Returns the configuration of the catalog.
343    fn config(&self) -> &CatalogConfig;
344
345    /// Returns the number of milliseconds since the system epoch. For normal use
346    /// this means the Unix epoch. This can safely be mocked in tests and start
347    /// at 0.
348    fn now(&self) -> EpochMillis;
349
350    /// Returns the set of supported AWS PrivateLink availability zone ids.
351    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
352
353    /// Returns system vars
354    fn system_vars(&self) -> &SystemVars;
355
356    /// Returns mutable system vars
357    ///
358    /// Clients should use this this method carefully, as changes to the backing
359    /// state here are not guarateed to be persisted. The motivating use case
360    /// for this method was ensuring that features are temporary turned on so
361    /// catalog rehydration does not break due to unsupported SQL syntax.
362    fn system_vars_mut(&mut self) -> &mut SystemVars;
363
364    /// Returns the [`RoleId`] of the owner of an object by its ID.
365    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
366
367    /// Returns the [`PrivilegeMap`] of the object.
368    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
369
370    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
371    ///
372    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
373    /// earlier in the list than the roots. This is particularly userful for the order to drop
374    /// objects.
375    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
376
377    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
378    ///
379    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
380    /// earlier in the list than `id`. This is particularly userful for the order to drop
381    /// objects.
382    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
383
384    /// Returns all possible privileges associated with an object type.
385    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
386
387    /// Returns the object type of `object_id`.
388    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
389
390    /// Returns the system object type of `id`.
391    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
392
393    /// Returns the minimal qualification required to unambiguously specify
394    /// `qualified_name`.
395    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
396
397    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
398    /// successfully executes.
399    fn add_notice(&self, notice: PlanNotice);
400
401    /// Returns the associated comments for the given `id`
402    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
403
404    /// Reports whether the specified cluster size is a modern "cc" size rather
405    /// than a legacy T-shirt size.
406    fn is_cluster_size_cc(&self, size: &str) -> bool;
407}
408
409/// Configuration associated with a catalog.
410#[derive(Debug, Clone)]
411pub struct CatalogConfig {
412    /// Returns the time at which the catalog booted.
413    pub start_time: DateTime<Utc>,
414    /// Returns the instant at which the catalog booted.
415    pub start_instant: Instant,
416    /// A random integer associated with this instance of the catalog.
417    ///
418    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
419    /// topics. Perhaps we can remove this when database-issues#977 is complete.
420    pub nonce: u64,
421    /// A persistent ID associated with the environment.
422    pub environment_id: EnvironmentId,
423    /// A transient UUID associated with this process.
424    pub session_id: Uuid,
425    /// Information about this build of Materialize.
426    pub build_info: &'static BuildInfo,
427    /// Default timestamp interval.
428    pub timestamp_interval: Duration,
429    /// Function that returns a wall clock now time; can safely be mocked to return
430    /// 0.
431    pub now: NowFn,
432    /// Context for source and sink connections.
433    pub connection_context: ConnectionContext,
434    /// Which system builtins to include. Not allowed to change dynamically.
435    pub builtins_cfg: BuiltinsConfig,
436    /// Helm chart version
437    pub helm_chart_version: Option<String>,
438}
439
440/// A database in a [`SessionCatalog`].
441pub trait CatalogDatabase {
442    /// Returns a fully-specified name of the database.
443    fn name(&self) -> &str;
444
445    /// Returns a stable ID for the database.
446    fn id(&self) -> DatabaseId;
447
448    /// Returns whether the database contains schemas.
449    fn has_schemas(&self) -> bool;
450
451    /// Returns the schemas of the database as a map from schema name to
452    /// schema ID.
453    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
454
455    /// Returns the schemas of the database.
456    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
457
458    /// Returns the ID of the owning role.
459    fn owner_id(&self) -> RoleId;
460
461    /// Returns the privileges associated with the database.
462    fn privileges(&self) -> &PrivilegeMap;
463}
464
465/// A schema in a [`SessionCatalog`].
466pub trait CatalogSchema {
467    /// Returns a fully-specified id of the database
468    fn database(&self) -> &ResolvedDatabaseSpecifier;
469
470    /// Returns a fully-specified name of the schema.
471    fn name(&self) -> &QualifiedSchemaName;
472
473    /// Returns a stable ID for the schema.
474    fn id(&self) -> &SchemaSpecifier;
475
476    /// Lists the `CatalogItem`s for the schema.
477    fn has_items(&self) -> bool;
478
479    /// Returns the IDs of the items in the schema.
480    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
481
482    /// Returns the ID of the owning role.
483    fn owner_id(&self) -> RoleId;
484
485    /// Returns the privileges associated with the schema.
486    fn privileges(&self) -> &PrivilegeMap;
487}
488
489/// A modification of a role password in the catalog
490#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
491pub enum PasswordAction {
492    /// Set a new password.
493    Set(Password),
494    /// Remove the existing password.
495    Clear,
496    /// Leave the existing password unchanged.
497    NoChange,
498}
499
500/// A raw representation of attributes belonging to a [`CatalogRole`] that we might
501/// get as input from the user. This includes the password.
502/// This struct explicity does not implement `Serialize` or `Deserialize` to avoid
503/// accidentally serializing passwords.
504#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
505pub struct RoleAttributesRaw {
506    /// Indicates whether the role has inheritance of privileges.
507    pub inherit: bool,
508    /// The raw password of the role. This is for self managed auth, not cloud.
509    pub password: Option<Password>,
510    /// Whether or not this user is a superuser.
511    pub superuser: Option<bool>,
512    /// Whether this role is login
513    pub login: Option<bool>,
514    // Force use of constructor.
515    _private: (),
516}
517
518/// Attributes belonging to a [`CatalogRole`].
519#[derive(Debug, Clone, Eq, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Arbitrary)]
520pub struct RoleAttributes {
521    /// Indicates whether the role has inheritance of privileges.
522    pub inherit: bool,
523    /// Whether or not this user is a superuser.
524    pub superuser: Option<bool>,
525    /// Whether this role is login
526    pub login: Option<bool>,
527    // Force use of constructor.
528    _private: (),
529}
530
531impl RoleAttributesRaw {
532    /// Creates a new [`RoleAttributesRaw`] with default attributes.
533    pub const fn new() -> RoleAttributesRaw {
534        RoleAttributesRaw {
535            inherit: true,
536            password: None,
537            superuser: None,
538            login: None,
539            _private: (),
540        }
541    }
542
543    /// Adds all attributes excluding password.
544    pub const fn with_all(mut self) -> RoleAttributesRaw {
545        self.inherit = true;
546        self.superuser = Some(true);
547        self.login = Some(true);
548        self
549    }
550}
551
552impl RoleAttributes {
553    /// Creates a new [`RoleAttributes`] with default attributes.
554    pub const fn new() -> RoleAttributes {
555        RoleAttributes {
556            inherit: true,
557            superuser: None,
558            login: None,
559            _private: (),
560        }
561    }
562
563    /// Adds all attributes except password.
564    pub const fn with_all(mut self) -> RoleAttributes {
565        self.inherit = true;
566        self.superuser = Some(true);
567        self.login = Some(true);
568        self
569    }
570
571    /// Returns whether or not the role has inheritence of privileges.
572    pub const fn is_inherit(&self) -> bool {
573        self.inherit
574    }
575}
576
577impl From<RoleAttributesRaw> for RoleAttributes {
578    fn from(
579        RoleAttributesRaw {
580            inherit,
581            superuser,
582            login,
583            ..
584        }: RoleAttributesRaw,
585    ) -> RoleAttributes {
586        RoleAttributes {
587            inherit,
588            superuser,
589            login,
590            _private: (),
591        }
592    }
593}
594
595impl From<RoleAttributes> for RoleAttributesRaw {
596    fn from(
597        RoleAttributes {
598            inherit,
599            superuser,
600            login,
601            ..
602        }: RoleAttributes,
603    ) -> RoleAttributesRaw {
604        RoleAttributesRaw {
605            inherit,
606            password: None,
607            superuser,
608            login,
609            _private: (),
610        }
611    }
612}
613
614impl From<PlannedRoleAttributes> for RoleAttributesRaw {
615    fn from(
616        PlannedRoleAttributes {
617            inherit,
618            password,
619            superuser,
620            login,
621            ..
622        }: PlannedRoleAttributes,
623    ) -> RoleAttributesRaw {
624        let default_attributes = RoleAttributesRaw::new();
625        RoleAttributesRaw {
626            inherit: inherit.unwrap_or(default_attributes.inherit),
627            password,
628            superuser,
629            login,
630            _private: (),
631        }
632    }
633}
634
635/// Default variable values for a [`CatalogRole`].
636#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
637pub struct RoleVars {
638    /// Map of variable names to their value.
639    pub map: BTreeMap<String, OwnedVarInput>,
640}
641
642/// A role in a [`SessionCatalog`].
643pub trait CatalogRole {
644    /// Returns a fully-specified name of the role.
645    fn name(&self) -> &str;
646
647    /// Returns a stable ID for the role.
648    fn id(&self) -> RoleId;
649
650    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
651    /// membership.
652    ///
653    /// Key is the role that some role is a member of, value is the grantor role ID.
654    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
655
656    /// Returns the attributes associated with this role.
657    fn attributes(&self) -> &RoleAttributes;
658
659    /// Returns all variables that this role has a default value stored for.
660    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
661}
662
663/// A network policy in a [`SessionCatalog`].
664pub trait CatalogNetworkPolicy {
665    /// Returns a fully-specified name of the NetworkPolicy.
666    fn name(&self) -> &str;
667
668    /// Returns a stable ID for the NetworkPolicy.
669    fn id(&self) -> NetworkPolicyId;
670
671    /// Returns the ID of the owning NetworkPolicy.
672    fn owner_id(&self) -> RoleId;
673
674    /// Returns the privileges associated with the NetworkPolicy.
675    fn privileges(&self) -> &PrivilegeMap;
676}
677
678/// A cluster in a [`SessionCatalog`].
679pub trait CatalogCluster<'a> {
680    /// Returns a fully-specified name of the cluster.
681    fn name(&self) -> &str;
682
683    /// Returns a stable ID for the cluster.
684    fn id(&self) -> ClusterId;
685
686    /// Returns the objects that are bound to this cluster.
687    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
688
689    /// Returns the replicas of the cluster as a map from replica name to
690    /// replica ID.
691    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
692
693    /// Returns the replicas of the cluster.
694    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
695
696    /// Returns the replica belonging to the cluster with replica ID `id`.
697    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_>;
698
699    /// Returns the ID of the owning role.
700    fn owner_id(&self) -> RoleId;
701
702    /// Returns the privileges associated with the cluster.
703    fn privileges(&self) -> &PrivilegeMap;
704
705    /// Returns true if this cluster is a managed cluster.
706    fn is_managed(&self) -> bool;
707
708    /// Returns the size of the cluster, if the cluster is a managed cluster.
709    fn managed_size(&self) -> Option<&str>;
710
711    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
712    fn schedule(&self) -> Option<&ClusterSchedule>;
713
714    /// Try to convert this cluster into a [`CreateClusterPlan`].
715    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
716    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
717}
718
719/// A cluster replica in a [`SessionCatalog`]
720pub trait CatalogClusterReplica<'a>: Debug {
721    /// Returns the name of the cluster replica.
722    fn name(&self) -> &str;
723
724    /// Returns a stable ID for the cluster that the replica belongs to.
725    fn cluster_id(&self) -> ClusterId;
726
727    /// Returns a stable ID for the replica.
728    fn replica_id(&self) -> ReplicaId;
729
730    /// Returns the ID of the owning role.
731    fn owner_id(&self) -> RoleId;
732
733    /// Returns whether or not the replica is internal
734    fn internal(&self) -> bool;
735}
736
737/// An item in a [`SessionCatalog`].
738///
739/// Note that "item" has a very specific meaning in the context of a SQL
740/// catalog, and refers to the various entities that belong to a schema.
741pub trait CatalogItem {
742    /// Returns the fully qualified name of the catalog item.
743    fn name(&self) -> &QualifiedItemName;
744
745    /// Returns the [`CatalogItemId`] for the item.
746    fn id(&self) -> CatalogItemId;
747
748    /// Returns the [`GlobalId`]s associated with this item.
749    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
750
751    /// Returns the catalog item's OID.
752    fn oid(&self) -> u32;
753
754    /// Returns the resolved function.
755    ///
756    /// If the catalog item is not of a type that produces functions (i.e.,
757    /// anything other than a function), it returns an error.
758    fn func(&self) -> Result<&'static Func, CatalogError>;
759
760    /// Returns the resolved source connection.
761    ///
762    /// If the catalog item is not of a type that contains a `SourceDesc`
763    /// (i.e., anything other than sources), it returns an error.
764    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
765
766    /// Returns the resolved connection.
767    ///
768    /// If the catalog item is not a connection, it returns an error.
769    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
770
771    /// Returns the type of the catalog item.
772    fn item_type(&self) -> CatalogItemType;
773
774    /// A normalized SQL statement that describes how to create the catalog
775    /// item.
776    fn create_sql(&self) -> &str;
777
778    /// Returns the IDs of the catalog items upon which this catalog item
779    /// directly references.
780    fn references(&self) -> &ResolvedIds;
781
782    /// Returns the IDs of the catalog items upon which this catalog item
783    /// depends.
784    fn uses(&self) -> BTreeSet<CatalogItemId>;
785
786    /// Returns the IDs of the catalog items that directly reference this catalog item.
787    fn referenced_by(&self) -> &[CatalogItemId];
788
789    /// Returns the IDs of the catalog items that depend upon this catalog item.
790    fn used_by(&self) -> &[CatalogItemId];
791
792    /// Reports whether this catalog entry is a subsource and, if it is, the
793    /// ingestion it is an export of, as well as the item it exports.
794    fn subsource_details(
795        &self,
796    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
797
798    /// Reports whether this catalog entry is a source export and, if it is, the
799    /// ingestion it is an export of, as well as the item it exports.
800    fn source_export_details(
801        &self,
802    ) -> Option<(
803        CatalogItemId,
804        &UnresolvedItemName,
805        &SourceExportDetails,
806        &SourceExportDataConfig<ReferencedConnection>,
807    )>;
808
809    /// Reports whether this catalog item is a progress source.
810    fn is_progress_source(&self) -> bool;
811
812    /// If this catalog item is a source, it return the IDs of its progress collection.
813    fn progress_id(&self) -> Option<CatalogItemId>;
814
815    /// Returns the index details associated with the catalog item, if the
816    /// catalog item is an index.
817    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
818
819    /// Returns the column defaults associated with the catalog item, if the
820    /// catalog item is a table that accepts writes.
821    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
822
823    /// Returns the type information associated with the catalog item, if the
824    /// catalog item is a type.
825    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
826
827    /// Returns the ID of the owning role.
828    fn owner_id(&self) -> RoleId;
829
830    /// Returns the privileges associated with the item.
831    fn privileges(&self) -> &PrivilegeMap;
832
833    /// Returns the cluster the item belongs to.
834    fn cluster_id(&self) -> Option<ClusterId>;
835
836    /// Returns the [`CatalogCollectionItem`] for a specific version of this
837    /// [`CatalogItem`].
838    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
839
840    /// The latest version of this item, if it's version-able.
841    fn latest_version(&self) -> Option<RelationVersion>;
842}
843
844/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
845/// refers to.
846pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
847    /// Returns a description of the result set produced by the catalog item.
848    ///
849    /// If the catalog item is not of a type that produces data (i.e., a sink or
850    /// an index), it returns an error.
851    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, CatalogError>;
852
853    /// The [`GlobalId`] for this item.
854    fn global_id(&self) -> GlobalId;
855}
856
857/// The type of a [`CatalogItem`].
858#[derive(Debug, Deserialize, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
859pub enum CatalogItemType {
860    /// A table.
861    Table,
862    /// A source.
863    Source,
864    /// A sink.
865    Sink,
866    /// A view.
867    View,
868    /// A materialized view.
869    MaterializedView,
870    /// An index.
871    Index,
872    /// A type.
873    Type,
874    /// A func.
875    Func,
876    /// A secret.
877    Secret,
878    /// A connection.
879    Connection,
880    /// A continual task.
881    ContinualTask,
882}
883
884impl CatalogItemType {
885    /// Reports whether the given type of item conflicts with items of type
886    /// `CatalogItemType::Type`.
887    ///
888    /// In PostgreSQL, even though types live in a separate namespace from other
889    /// schema objects, creating a table, view, or materialized view creates a
890    /// type named after that relation. This prevents creating a type with the
891    /// same name as a relational object, even though types and relational
892    /// objects live in separate namespaces. (Indexes are even weirder; while
893    /// they don't get a type with the same name, they get an entry in
894    /// `pg_class` that prevents *record* types of the same name as the index,
895    /// but not other types of types, like enums.)
896    ///
897    /// We don't presently construct types that mirror relational objects,
898    /// though we likely will need to in the future for full PostgreSQL
899    /// compatibility (see database-issues#7142). For now, we use this method to
900    /// prevent creating types and relational objects that have the same name, so
901    /// that it is a backwards compatible change in the future to introduce a
902    /// type named after each relational object in the system.
903    pub fn conflicts_with_type(&self) -> bool {
904        match self {
905            CatalogItemType::Table => true,
906            CatalogItemType::Source => true,
907            CatalogItemType::View => true,
908            CatalogItemType::MaterializedView => true,
909            CatalogItemType::Index => true,
910            CatalogItemType::Type => true,
911            CatalogItemType::Sink => false,
912            CatalogItemType::Func => false,
913            CatalogItemType::Secret => false,
914            CatalogItemType::Connection => false,
915            CatalogItemType::ContinualTask => true,
916        }
917    }
918}
919
920impl fmt::Display for CatalogItemType {
921    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
922        match self {
923            CatalogItemType::Table => f.write_str("table"),
924            CatalogItemType::Source => f.write_str("source"),
925            CatalogItemType::Sink => f.write_str("sink"),
926            CatalogItemType::View => f.write_str("view"),
927            CatalogItemType::MaterializedView => f.write_str("materialized view"),
928            CatalogItemType::Index => f.write_str("index"),
929            CatalogItemType::Type => f.write_str("type"),
930            CatalogItemType::Func => f.write_str("func"),
931            CatalogItemType::Secret => f.write_str("secret"),
932            CatalogItemType::Connection => f.write_str("connection"),
933            CatalogItemType::ContinualTask => f.write_str("continual task"),
934        }
935    }
936}
937
938impl From<CatalogItemType> for ObjectType {
939    fn from(value: CatalogItemType) -> Self {
940        match value {
941            CatalogItemType::Table => ObjectType::Table,
942            CatalogItemType::Source => ObjectType::Source,
943            CatalogItemType::Sink => ObjectType::Sink,
944            CatalogItemType::View => ObjectType::View,
945            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
946            CatalogItemType::Index => ObjectType::Index,
947            CatalogItemType::Type => ObjectType::Type,
948            CatalogItemType::Func => ObjectType::Func,
949            CatalogItemType::Secret => ObjectType::Secret,
950            CatalogItemType::Connection => ObjectType::Connection,
951            CatalogItemType::ContinualTask => ObjectType::ContinualTask,
952        }
953    }
954}
955
956impl From<CatalogItemType> for mz_audit_log::ObjectType {
957    fn from(value: CatalogItemType) -> Self {
958        match value {
959            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
960            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
961            CatalogItemType::View => mz_audit_log::ObjectType::View,
962            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
963            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
964            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
965            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
966            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
967            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
968            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
969            CatalogItemType::ContinualTask => mz_audit_log::ObjectType::ContinualTask,
970        }
971    }
972}
973
974/// Details about a type in the catalog.
975#[derive(Clone, Debug, Eq, PartialEq)]
976pub struct CatalogTypeDetails<T: TypeReference> {
977    /// The ID of the type with this type as the array element, if available.
978    pub array_id: Option<CatalogItemId>,
979    /// The description of this type.
980    pub typ: CatalogType<T>,
981    /// Additional metadata about the type in PostgreSQL, if relevant.
982    pub pg_metadata: Option<CatalogTypePgMetadata>,
983}
984
985/// Additional PostgreSQL metadata about a type.
986#[derive(Clone, Debug, Eq, PartialEq)]
987pub struct CatalogTypePgMetadata {
988    /// The OID of the `typinput` function in PostgreSQL.
989    pub typinput_oid: u32,
990    /// The OID of the `typreceive` function in PostgreSQL.
991    pub typreceive_oid: u32,
992}
993
994/// Represents a reference to type in the catalog
995pub trait TypeReference {
996    /// The actual type used to reference a `CatalogType`
997    type Reference: Clone + Debug + Eq + PartialEq;
998}
999
1000/// Reference to a type by it's name
1001#[derive(Clone, Debug, Eq, PartialEq)]
1002pub struct NameReference;
1003
1004impl TypeReference for NameReference {
1005    type Reference = &'static str;
1006}
1007
1008/// Reference to a type by it's global ID
1009#[derive(Clone, Debug, Eq, PartialEq)]
1010pub struct IdReference;
1011
1012impl TypeReference for IdReference {
1013    type Reference = CatalogItemId;
1014}
1015
1016/// A type stored in the catalog.
1017///
1018/// The variants correspond one-to-one with [`mz_repr::SqlScalarType`], but with type
1019/// modifiers removed and with embedded types replaced with references to other
1020/// types in the catalog.
1021#[allow(missing_docs)]
1022#[derive(Clone, Debug, Eq, PartialEq)]
1023pub enum CatalogType<T: TypeReference> {
1024    AclItem,
1025    Array {
1026        element_reference: T::Reference,
1027    },
1028    Bool,
1029    Bytes,
1030    Char,
1031    Date,
1032    Float32,
1033    Float64,
1034    Int16,
1035    Int32,
1036    Int64,
1037    UInt16,
1038    UInt32,
1039    UInt64,
1040    MzTimestamp,
1041    Interval,
1042    Jsonb,
1043    List {
1044        element_reference: T::Reference,
1045        element_modifiers: Vec<i64>,
1046    },
1047    Map {
1048        key_reference: T::Reference,
1049        key_modifiers: Vec<i64>,
1050        value_reference: T::Reference,
1051        value_modifiers: Vec<i64>,
1052    },
1053    Numeric,
1054    Oid,
1055    PgLegacyChar,
1056    PgLegacyName,
1057    Pseudo,
1058    Range {
1059        element_reference: T::Reference,
1060    },
1061    Record {
1062        fields: Vec<CatalogRecordField<T>>,
1063    },
1064    RegClass,
1065    RegProc,
1066    RegType,
1067    String,
1068    Time,
1069    Timestamp,
1070    TimestampTz,
1071    Uuid,
1072    VarChar,
1073    Int2Vector,
1074    MzAclItem,
1075}
1076
1077impl CatalogType<IdReference> {
1078    /// Returns the relation description for the type, if the type is a record
1079    /// type.
1080    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1081        match &self {
1082            CatalogType::Record { fields } => {
1083                let mut desc = RelationDesc::builder();
1084                for f in fields {
1085                    let name = f.name.clone();
1086                    let ty = query::scalar_type_from_catalog(
1087                        catalog,
1088                        f.type_reference,
1089                        &f.type_modifiers,
1090                    )?;
1091                    // TODO: support plumbing `NOT NULL` constraints through
1092                    // `CREATE TYPE`.
1093                    let ty = ty.nullable(true);
1094                    desc = desc.with_column(name, ty);
1095                }
1096                Ok(Some(desc.finish()))
1097            }
1098            _ => Ok(None),
1099        }
1100    }
1101}
1102
1103/// A description of a field in a [`CatalogType::Record`].
1104#[derive(Clone, Debug, Eq, PartialEq)]
1105pub struct CatalogRecordField<T: TypeReference> {
1106    /// The name of the field.
1107    pub name: ColumnName,
1108    /// The ID of the type of the field.
1109    pub type_reference: T::Reference,
1110    /// Modifiers to apply to the type.
1111    pub type_modifiers: Vec<i64>,
1112}
1113
1114#[derive(Clone, Debug, Eq, PartialEq)]
1115/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1116///
1117/// Note that Materialize also uses a number of pseudotypes when planning, but
1118/// we have yet to need to integrate them with `TypeCategory`.
1119///
1120/// [typcategory]:
1121/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1122pub enum TypeCategory {
1123    /// Array type.
1124    Array,
1125    /// Bit string type.
1126    BitString,
1127    /// Boolean type.
1128    Boolean,
1129    /// Composite type.
1130    Composite,
1131    /// Date/time type.
1132    DateTime,
1133    /// Enum type.
1134    Enum,
1135    /// Geometric type.
1136    Geometric,
1137    /// List type. Materialize specific.
1138    List,
1139    /// Network address type.
1140    NetworkAddress,
1141    /// Numeric type.
1142    Numeric,
1143    /// Pseudo type.
1144    Pseudo,
1145    /// Range type.
1146    Range,
1147    /// String type.
1148    String,
1149    /// Timestamp type.
1150    Timespan,
1151    /// User-defined type.
1152    UserDefined,
1153    /// Unknown type.
1154    Unknown,
1155}
1156
1157impl fmt::Display for TypeCategory {
1158    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1159        f.write_str(match self {
1160            TypeCategory::Array => "array",
1161            TypeCategory::BitString => "bit-string",
1162            TypeCategory::Boolean => "boolean",
1163            TypeCategory::Composite => "composite",
1164            TypeCategory::DateTime => "date-time",
1165            TypeCategory::Enum => "enum",
1166            TypeCategory::Geometric => "geometric",
1167            TypeCategory::List => "list",
1168            TypeCategory::NetworkAddress => "network-address",
1169            TypeCategory::Numeric => "numeric",
1170            TypeCategory::Pseudo => "pseudo",
1171            TypeCategory::Range => "range",
1172            TypeCategory::String => "string",
1173            TypeCategory::Timespan => "timespan",
1174            TypeCategory::UserDefined => "user-defined",
1175            TypeCategory::Unknown => "unknown",
1176        })
1177    }
1178}
1179
1180/// Identifies an environment.
1181///
1182/// Outside of tests, an environment ID can be constructed only from a string of
1183/// the following form:
1184///
1185/// ```text
1186/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1187/// ```
1188///
1189/// The fields have the following formats:
1190///
1191/// * The cloud provider consists of one or more alphanumeric characters.
1192/// * The cloud provider region consists of one or more alphanumeric or hyphen
1193///   characters.
1194/// * The organization ID is a UUID in its canonical text format.
1195/// * The ordinal is a decimal number with between one and eight digits.
1196///
1197/// There is no way to construct an environment ID from parts, to ensure that
1198/// the `Display` representation is parseable according to the above rules.
1199// NOTE(benesch): ideally we'd have accepted the components of the environment
1200// ID using separate command-line arguments, or at least a string format that
1201// used a field separator that did not appear in the fields. Alas. We can't
1202// easily change it now, as it's used as the e.g. default sink progress topic.
1203#[derive(Debug, Clone, PartialEq)]
1204pub struct EnvironmentId {
1205    cloud_provider: CloudProvider,
1206    cloud_provider_region: String,
1207    organization_id: Uuid,
1208    ordinal: u64,
1209}
1210
1211impl EnvironmentId {
1212    /// Creates a dummy `EnvironmentId` for use in tests.
1213    pub fn for_tests() -> EnvironmentId {
1214        EnvironmentId {
1215            cloud_provider: CloudProvider::Local,
1216            cloud_provider_region: "az1".into(),
1217            organization_id: Uuid::new_v4(),
1218            ordinal: 0,
1219        }
1220    }
1221
1222    /// Returns the cloud provider associated with this environment ID.
1223    pub fn cloud_provider(&self) -> &CloudProvider {
1224        &self.cloud_provider
1225    }
1226
1227    /// Returns the cloud provider region associated with this environment ID.
1228    pub fn cloud_provider_region(&self) -> &str {
1229        &self.cloud_provider_region
1230    }
1231
1232    /// Returns the name of the region associted with this environment ID.
1233    ///
1234    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1235    /// [`EnvironmentId::cloud_provider_region`].
1236    pub fn region(&self) -> String {
1237        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1238    }
1239
1240    /// Returns the organization ID associated with this environment ID.
1241    pub fn organization_id(&self) -> Uuid {
1242        self.organization_id
1243    }
1244
1245    /// Returns the ordinal associated with this environment ID.
1246    pub fn ordinal(&self) -> u64 {
1247        self.ordinal
1248    }
1249}
1250
1251// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1252// populated using this key. Consequently, any changes to that trait
1253// implementation will also have to be reflected in the existing feature
1254// targeting config in LaunchDarkly, otherwise environments might receive
1255// different configs upon restart.
1256impl fmt::Display for EnvironmentId {
1257    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1258        write!(
1259            f,
1260            "{}-{}-{}-{}",
1261            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1262        )
1263    }
1264}
1265
1266impl FromStr for EnvironmentId {
1267    type Err = InvalidEnvironmentIdError;
1268
1269    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1270        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1271            Regex::new(
1272                "^(?P<cloud_provider>[[:alnum:]]+)-\
1273                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1274                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1275                  (?P<ordinal>\\d{1,8})$"
1276            ).unwrap()
1277        });
1278        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1279        Ok(EnvironmentId {
1280            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1281            cloud_provider_region: captures["cloud_provider_region"].into(),
1282            organization_id: captures["organization_id"]
1283                .parse()
1284                .map_err(|_| InvalidEnvironmentIdError)?,
1285            ordinal: captures["ordinal"]
1286                .parse()
1287                .map_err(|_| InvalidEnvironmentIdError)?,
1288        })
1289    }
1290}
1291
1292/// The error type for [`EnvironmentId::from_str`].
1293#[derive(Debug, Clone, PartialEq)]
1294pub struct InvalidEnvironmentIdError;
1295
1296impl fmt::Display for InvalidEnvironmentIdError {
1297    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1298        f.write_str("invalid environment ID")
1299    }
1300}
1301
1302impl Error for InvalidEnvironmentIdError {}
1303
1304impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1305    fn from(_: InvalidCloudProviderError) -> Self {
1306        InvalidEnvironmentIdError
1307    }
1308}
1309
1310/// An error returned by the catalog.
1311#[derive(Clone, Debug, Eq, PartialEq)]
1312pub enum CatalogError {
1313    /// Unknown database.
1314    UnknownDatabase(String),
1315    /// Database already exists.
1316    DatabaseAlreadyExists(String),
1317    /// Unknown schema.
1318    UnknownSchema(String),
1319    /// Schema already exists.
1320    SchemaAlreadyExists(String),
1321    /// Unknown role.
1322    UnknownRole(String),
1323    /// Role already exists.
1324    RoleAlreadyExists(String),
1325    /// Network Policy already exists.
1326    NetworkPolicyAlreadyExists(String),
1327    /// Unknown cluster.
1328    UnknownCluster(String),
1329    /// Unexpected builtin cluster.
1330    UnexpectedBuiltinCluster(String),
1331    /// Unexpected builtin cluster.
1332    UnexpectedBuiltinClusterType(String),
1333    /// Cluster already exists.
1334    ClusterAlreadyExists(String),
1335    /// Unknown cluster replica.
1336    UnknownClusterReplica(String),
1337    /// Unknown cluster replica size.
1338    UnknownClusterReplicaSize(String),
1339    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1340    DuplicateReplica(String, String),
1341    /// Unknown item.
1342    UnknownItem(String),
1343    /// Item already exists.
1344    ItemAlreadyExists(CatalogItemId, String),
1345    /// Unknown function.
1346    UnknownFunction {
1347        /// The identifier of the function we couldn't find
1348        name: String,
1349        /// A suggested alternative to the named function.
1350        alternative: Option<String>,
1351    },
1352    /// Unknown type.
1353    UnknownType {
1354        /// The identifier of the type we couldn't find.
1355        name: String,
1356    },
1357    /// Unknown connection.
1358    UnknownConnection(String),
1359    /// Unknown network policy.
1360    UnknownNetworkPolicy(String),
1361    /// Expected the catalog item to have the given type, but it did not.
1362    UnexpectedType {
1363        /// The item's name.
1364        name: String,
1365        /// The actual type of the item.
1366        actual_type: CatalogItemType,
1367        /// The expected type of the item.
1368        expected_type: CatalogItemType,
1369    },
1370    /// Invalid attempt to depend on a non-dependable item.
1371    InvalidDependency {
1372        /// The invalid item's name.
1373        name: String,
1374        /// The invalid item's type.
1375        typ: CatalogItemType,
1376    },
1377    /// Ran out of unique IDs.
1378    IdExhaustion,
1379    /// Ran out of unique OIDs.
1380    OidExhaustion,
1381    /// Timeline already exists.
1382    TimelineAlreadyExists(String),
1383    /// Id Allocator already exists.
1384    IdAllocatorAlreadyExists(String),
1385    /// Config already exists.
1386    ConfigAlreadyExists(String),
1387    /// Builtin migrations failed.
1388    FailedBuiltinSchemaMigration(String),
1389    /// StorageCollectionMetadata already exists.
1390    StorageCollectionMetadataAlreadyExists(GlobalId),
1391}
1392
1393impl fmt::Display for CatalogError {
1394    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1395        match self {
1396            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1397            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1398            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1399            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1400            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1401            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1402            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1403            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1404            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1405            Self::NetworkPolicyAlreadyExists(name) => {
1406                write!(f, "network policy '{name}' already exists")
1407            }
1408            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1409            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1410            Self::UnexpectedBuiltinCluster(name) => {
1411                write!(f, "Unexpected builtin cluster '{}'", name)
1412            }
1413            Self::UnexpectedBuiltinClusterType(name) => {
1414                write!(f, "Unexpected builtin cluster type'{}'", name)
1415            }
1416            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1417            Self::UnknownClusterReplica(name) => {
1418                write!(f, "unknown cluster replica '{}'", name)
1419            }
1420            Self::UnknownClusterReplicaSize(name) => {
1421                write!(f, "unknown cluster replica size '{}'", name)
1422            }
1423            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1424                f,
1425                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1426            ),
1427            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1428            Self::ItemAlreadyExists(_gid, name) => {
1429                write!(f, "catalog item '{name}' already exists")
1430            }
1431            Self::UnexpectedType {
1432                name,
1433                actual_type,
1434                expected_type,
1435            } => {
1436                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1437            }
1438            Self::InvalidDependency { name, typ } => write!(
1439                f,
1440                "catalog item '{}' is {} {} and so cannot be depended upon",
1441                name,
1442                if matches!(typ, CatalogItemType::Index) {
1443                    "an"
1444                } else {
1445                    "a"
1446                },
1447                typ,
1448            ),
1449            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1450            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1451            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1452            Self::IdAllocatorAlreadyExists(name) => {
1453                write!(f, "ID allocator '{name}' already exists")
1454            }
1455            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1456            Self::FailedBuiltinSchemaMigration(objects) => {
1457                write!(f, "failed to migrate schema of builtin objects: {objects}")
1458            }
1459            Self::StorageCollectionMetadataAlreadyExists(key) => {
1460                write!(f, "storage metadata for '{key}' already exists")
1461            }
1462        }
1463    }
1464}
1465
1466impl CatalogError {
1467    /// Returns any applicable hints for [`CatalogError`].
1468    pub fn hint(&self) -> Option<String> {
1469        match self {
1470            CatalogError::UnknownFunction { alternative, .. } => {
1471                match alternative {
1472                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1473                    Some(alt) => Some(format!("Try using {alt}")),
1474                }
1475            }
1476            _ => None,
1477        }
1478    }
1479}
1480
1481impl Error for CatalogError {}
1482
1483// Enum variant docs would be useless here.
1484#[allow(missing_docs)]
1485#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1486/// The types of objects stored in the catalog.
1487pub enum ObjectType {
1488    Table,
1489    View,
1490    MaterializedView,
1491    Source,
1492    Sink,
1493    Index,
1494    Type,
1495    Role,
1496    Cluster,
1497    ClusterReplica,
1498    Secret,
1499    Connection,
1500    Database,
1501    Schema,
1502    Func,
1503    ContinualTask,
1504    NetworkPolicy,
1505}
1506
1507impl ObjectType {
1508    /// Reports if the object type can be treated as a relation.
1509    pub fn is_relation(&self) -> bool {
1510        match self {
1511            ObjectType::Table
1512            | ObjectType::View
1513            | ObjectType::MaterializedView
1514            | ObjectType::Source
1515            | ObjectType::ContinualTask => true,
1516            ObjectType::Sink
1517            | ObjectType::Index
1518            | ObjectType::Type
1519            | ObjectType::Secret
1520            | ObjectType::Connection
1521            | ObjectType::Func
1522            | ObjectType::Database
1523            | ObjectType::Schema
1524            | ObjectType::Cluster
1525            | ObjectType::ClusterReplica
1526            | ObjectType::Role
1527            | ObjectType::NetworkPolicy => false,
1528        }
1529    }
1530}
1531
1532impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1533    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1534        match value {
1535            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1536            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1537            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1538            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1539            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1540            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1541            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1542            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1543            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1544            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1545            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1546            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1547            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1548            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1549            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1550            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1551            mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask,
1552            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1553        }
1554    }
1555}
1556
1557impl From<CommentObjectId> for ObjectType {
1558    fn from(value: CommentObjectId) -> ObjectType {
1559        match value {
1560            CommentObjectId::Table(_) => ObjectType::Table,
1561            CommentObjectId::View(_) => ObjectType::View,
1562            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1563            CommentObjectId::Source(_) => ObjectType::Source,
1564            CommentObjectId::Sink(_) => ObjectType::Sink,
1565            CommentObjectId::Index(_) => ObjectType::Index,
1566            CommentObjectId::Func(_) => ObjectType::Func,
1567            CommentObjectId::Connection(_) => ObjectType::Connection,
1568            CommentObjectId::Type(_) => ObjectType::Type,
1569            CommentObjectId::Secret(_) => ObjectType::Secret,
1570            CommentObjectId::Role(_) => ObjectType::Role,
1571            CommentObjectId::Database(_) => ObjectType::Database,
1572            CommentObjectId::Schema(_) => ObjectType::Schema,
1573            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1574            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1575            CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1576            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1577        }
1578    }
1579}
1580
1581impl Display for ObjectType {
1582    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1583        f.write_str(match self {
1584            ObjectType::Table => "TABLE",
1585            ObjectType::View => "VIEW",
1586            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1587            ObjectType::Source => "SOURCE",
1588            ObjectType::Sink => "SINK",
1589            ObjectType::Index => "INDEX",
1590            ObjectType::Type => "TYPE",
1591            ObjectType::Role => "ROLE",
1592            ObjectType::Cluster => "CLUSTER",
1593            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1594            ObjectType::Secret => "SECRET",
1595            ObjectType::Connection => "CONNECTION",
1596            ObjectType::Database => "DATABASE",
1597            ObjectType::Schema => "SCHEMA",
1598            ObjectType::Func => "FUNCTION",
1599            ObjectType::ContinualTask => "CONTINUAL TASK",
1600            ObjectType::NetworkPolicy => "NETWORK POLICY",
1601        })
1602    }
1603}
1604
1605#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq, Hash, Copy, Deserialize, Serialize)]
1606/// The types of objects in the system.
1607pub enum SystemObjectType {
1608    /// Catalog object type.
1609    Object(ObjectType),
1610    /// Entire system.
1611    System,
1612}
1613
1614impl SystemObjectType {
1615    /// Reports if the object type can be treated as a relation.
1616    pub fn is_relation(&self) -> bool {
1617        match self {
1618            SystemObjectType::Object(object_type) => object_type.is_relation(),
1619            SystemObjectType::System => false,
1620        }
1621    }
1622}
1623
1624impl Display for SystemObjectType {
1625    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1626        match self {
1627            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1628            SystemObjectType::System => f.write_str("SYSTEM"),
1629        }
1630    }
1631}
1632
1633/// Enum used to format object names in error messages.
1634#[derive(Debug, Clone, PartialEq, Eq)]
1635pub enum ErrorMessageObjectDescription {
1636    /// The name of a specific object.
1637    Object {
1638        /// Type of object.
1639        object_type: ObjectType,
1640        /// Name of object.
1641        object_name: Option<String>,
1642    },
1643    /// The name of the entire system.
1644    System,
1645}
1646
1647impl ErrorMessageObjectDescription {
1648    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1649    pub fn from_id(
1650        object_id: &ObjectId,
1651        catalog: &dyn SessionCatalog,
1652    ) -> ErrorMessageObjectDescription {
1653        let object_name = match object_id {
1654            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1655            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1656                .get_cluster_replica(*cluster_id, *replica_id)
1657                .name()
1658                .to_string(),
1659            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1660            ObjectId::Schema((database_spec, schema_spec)) => {
1661                let name = catalog.get_schema(database_spec, schema_spec).name();
1662                catalog.resolve_full_schema_name(name).to_string()
1663            }
1664            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1665            ObjectId::Item(id) => {
1666                let name = catalog.get_item(id).name();
1667                catalog.resolve_full_name(name).to_string()
1668            }
1669            ObjectId::NetworkPolicy(network_policy_id) => catalog
1670                .get_network_policy(network_policy_id)
1671                .name()
1672                .to_string(),
1673        };
1674        ErrorMessageObjectDescription::Object {
1675            object_type: catalog.get_object_type(object_id),
1676            object_name: Some(object_name),
1677        }
1678    }
1679
1680    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1681    pub fn from_sys_id(
1682        object_id: &SystemObjectId,
1683        catalog: &dyn SessionCatalog,
1684    ) -> ErrorMessageObjectDescription {
1685        match object_id {
1686            SystemObjectId::Object(object_id) => {
1687                ErrorMessageObjectDescription::from_id(object_id, catalog)
1688            }
1689            SystemObjectId::System => ErrorMessageObjectDescription::System,
1690        }
1691    }
1692
1693    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1694    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1695        match object_type {
1696            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1697                object_type,
1698                object_name: None,
1699            },
1700            SystemObjectType::System => ErrorMessageObjectDescription::System,
1701        }
1702    }
1703}
1704
1705impl Display for ErrorMessageObjectDescription {
1706    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1707        match self {
1708            ErrorMessageObjectDescription::Object {
1709                object_type,
1710                object_name,
1711            } => {
1712                let object_name = object_name
1713                    .as_ref()
1714                    .map(|object_name| format!(" {}", object_name.quoted()))
1715                    .unwrap_or_else(|| "".to_string());
1716                write!(f, "{object_type}{object_name}")
1717            }
1718            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1719        }
1720    }
1721}
1722
1723#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1724// These attributes are needed because the key of a map must be a string. We also
1725// get the added benefit of flattening this struct in it's serialized form.
1726#[serde(into = "BTreeMap<String, RoleId>")]
1727#[serde(try_from = "BTreeMap<String, RoleId>")]
1728/// Represents the grantee and a grantor of a role membership.
1729pub struct RoleMembership {
1730    /// Key is the role that some role is a member of, value is the grantor role ID.
1731    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1732    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1733    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1734    // migration.
1735    pub map: BTreeMap<RoleId, RoleId>,
1736}
1737
1738impl RoleMembership {
1739    /// Creates a new [`RoleMembership`].
1740    pub fn new() -> RoleMembership {
1741        RoleMembership {
1742            map: BTreeMap::new(),
1743        }
1744    }
1745}
1746
1747impl From<RoleMembership> for BTreeMap<String, RoleId> {
1748    fn from(value: RoleMembership) -> Self {
1749        value
1750            .map
1751            .into_iter()
1752            .map(|(k, v)| (k.to_string(), v))
1753            .collect()
1754    }
1755}
1756
1757impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1758    type Error = anyhow::Error;
1759
1760    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1761        Ok(RoleMembership {
1762            map: value
1763                .into_iter()
1764                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1765                .collect::<Result<_, anyhow::Error>>()?,
1766        })
1767    }
1768}
1769
1770/// Specification for objects that will be affected by a default privilege.
1771#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1772pub struct DefaultPrivilegeObject {
1773    /// The role id that created the object.
1774    pub role_id: RoleId,
1775    /// The database that the object is created in if Some, otherwise all databases.
1776    pub database_id: Option<DatabaseId>,
1777    /// The schema that the object is created in if Some, otherwise all databases.
1778    pub schema_id: Option<SchemaId>,
1779    /// The type of object.
1780    pub object_type: ObjectType,
1781}
1782
1783impl DefaultPrivilegeObject {
1784    /// Creates a new [`DefaultPrivilegeObject`].
1785    pub fn new(
1786        role_id: RoleId,
1787        database_id: Option<DatabaseId>,
1788        schema_id: Option<SchemaId>,
1789        object_type: ObjectType,
1790    ) -> DefaultPrivilegeObject {
1791        DefaultPrivilegeObject {
1792            role_id,
1793            database_id,
1794            schema_id,
1795            object_type,
1796        }
1797    }
1798}
1799
1800impl std::fmt::Display for DefaultPrivilegeObject {
1801    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1802        // TODO: Don't just wrap Debug.
1803        write!(f, "{self:?}")
1804    }
1805}
1806
1807/// Specification for the privileges that will be granted from default privileges.
1808#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1809pub struct DefaultPrivilegeAclItem {
1810    /// The role that will receive the privileges.
1811    pub grantee: RoleId,
1812    /// The specific privileges granted.
1813    pub acl_mode: AclMode,
1814}
1815
1816impl DefaultPrivilegeAclItem {
1817    /// Creates a new [`DefaultPrivilegeAclItem`].
1818    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1819        DefaultPrivilegeAclItem { grantee, acl_mode }
1820    }
1821
1822    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1823    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1824        MzAclItem {
1825            grantee: self.grantee,
1826            grantor,
1827            acl_mode: self.acl_mode,
1828        }
1829    }
1830}
1831
1832/// Which builtins to return in `BUILTINS::iter`.
1833///
1834/// All calls to `BUILTINS::iter` within the lifetime of an environmentd process
1835/// must provide an equal `BuiltinsConfig`. It is not allowed to change
1836/// dynamically.
1837#[derive(Debug, Clone)]
1838pub struct BuiltinsConfig {
1839    /// If true, include system builtin continual tasks.
1840    pub include_continual_tasks: bool,
1841}
1842
1843#[cfg(test)]
1844mod tests {
1845    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1846
1847    #[mz_ore::test]
1848    fn test_environment_id() {
1849        for (input, expected) in [
1850            (
1851                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1852                Ok(EnvironmentId {
1853                    cloud_provider: CloudProvider::Local,
1854                    cloud_provider_region: "az1".into(),
1855                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1856                    ordinal: 452,
1857                }),
1858            ),
1859            (
1860                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1861                Ok(EnvironmentId {
1862                    cloud_provider: CloudProvider::Aws,
1863                    cloud_provider_region: "us-east-1".into(),
1864                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1865                    ordinal: 0,
1866                }),
1867            ),
1868            (
1869                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1870                Ok(EnvironmentId {
1871                    cloud_provider: CloudProvider::Gcp,
1872                    cloud_provider_region: "us-central1".into(),
1873                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1874                    ordinal: 0,
1875                }),
1876            ),
1877            (
1878                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1879                Ok(EnvironmentId {
1880                    cloud_provider: CloudProvider::Azure,
1881                    cloud_provider_region: "australiaeast".into(),
1882                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1883                    ordinal: 0,
1884                }),
1885            ),
1886            (
1887                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1888                Ok(EnvironmentId {
1889                    cloud_provider: CloudProvider::Generic,
1890                    cloud_provider_region: "moon-station-11-darkside".into(),
1891                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1892                    ordinal: 0,
1893                }),
1894            ),
1895            ("", Err(InvalidEnvironmentIdError)),
1896            (
1897                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1898                Err(InvalidEnvironmentIdError),
1899            ),
1900            (
1901                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1902                Err(InvalidEnvironmentIdError),
1903            ),
1904            (
1905                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1906                Err(InvalidEnvironmentIdError),
1907            ),
1908        ] {
1909            let actual = input.parse();
1910            assert_eq!(expected, actual, "input = {}", input);
1911            if let Ok(actual) = actual {
1912                assert_eq!(input, actual.to_string(), "input = {}", input);
1913            }
1914        }
1915    }
1916}