Skip to main content

mz_sql/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![warn(missing_docs)]
11
12//! Catalog abstraction layer.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::error::Error;
17use std::fmt;
18use std::fmt::{Debug, Display, Formatter};
19use std::num::NonZeroU32;
20use std::str::FromStr;
21use std::sync::LazyLock;
22use std::time::Instant;
23
24use chrono::{DateTime, Utc};
25use mz_auth::password::Password;
26use mz_build_info::BuildInfo;
27use mz_cloud_provider::{CloudProvider, InvalidCloudProviderError};
28use mz_controller_types::{ClusterId, ReplicaId};
29use mz_expr::MirScalarExpr;
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_ore::str::StrExt;
32use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
33use mz_repr::explain::ExprHumanizer;
34use mz_repr::network_policy_id::NetworkPolicyId;
35use mz_repr::role_id::RoleId;
36use mz_repr::{
37    CatalogItemId, ColumnName, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
38};
39use mz_sql_parser::ast::{Expr, QualifiedReplica, UnresolvedItemName};
40use mz_storage_types::connections::inline::{ConnectionResolver, ReferencedConnection};
41use mz_storage_types::connections::{Connection, ConnectionContext};
42use mz_storage_types::sources::{SourceDesc, SourceExportDataConfig, SourceExportDetails};
43use proptest_derive::Arbitrary;
44use regex::Regex;
45use serde::{Deserialize, Serialize};
46use uuid::Uuid;
47
48use crate::func::Func;
49use crate::names::{
50    Aug, CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ObjectId, PartialItemName,
51    QualifiedItemName, QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId,
52    SchemaSpecifier, SystemObjectId,
53};
54use crate::plan::statement::StatementDesc;
55use crate::plan::statement::ddl::PlannedRoleAttributes;
56use crate::plan::{ClusterSchedule, CreateClusterPlan, PlanError, PlanNotice, query};
57use crate::session::vars::{OwnedVarInput, SystemVars};
58
59/// A catalog keeps track of SQL objects and session state available to the
60/// planner.
61///
62/// The `sql` crate is agnostic to any particular catalog implementation. This
63/// trait describes the required interface.
64///
65/// The SQL standard mandates a catalog hierarchy of exactly three layers. A
66/// catalog contains databases, databases contain schemas, and schemas contain
67/// catalog items, like sources, sinks, view, and indexes.
68///
69/// There are two classes of operations provided by a catalog:
70///
71///   * Resolution operations, like [`resolve_item`]. These fill in missing name
72///     components based upon connection defaults, e.g., resolving the partial
73///     name `view42` to the fully-specified name `materialize.public.view42`.
74///
75///   * Lookup operations, like [`SessionCatalog::get_item`]. These retrieve
76///     metadata about a catalog entity based on a fully-specified name that is
77///     known to be valid (i.e., because the name was successfully resolved, or
78///     was constructed based on the output of a prior lookup operation). These
79///     functions panic if called with invalid input.
80///
81///   * Session management, such as managing variables' states and adding
82///     notices to the session.
83///
84/// [`get_databases`]: SessionCatalog::get_databases
85/// [`get_item`]: SessionCatalog::get_item
86/// [`resolve_item`]: SessionCatalog::resolve_item
87pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionResolver {
88    /// Returns the id of the role that is issuing the query.
89    fn active_role_id(&self) -> &RoleId;
90
91    /// Returns the database to use if one is not explicitly specified.
92    fn active_database_name(&self) -> Option<&str> {
93        self.active_database()
94            .map(|id| self.get_database(id))
95            .map(|db| db.name())
96    }
97
98    /// Returns the database to use if one is not explicitly specified.
99    fn active_database(&self) -> Option<&DatabaseId>;
100
101    /// Returns the cluster to use if one is not explicitly specified.
102    fn active_cluster(&self) -> &str;
103
104    /// Returns the resolved search paths for the current user. (Invalid search paths are skipped.)
105    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)];
106
107    /// Returns the descriptor of the named prepared statement on the session, or
108    /// None if the prepared statement does not exist.
109    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc>;
110
111    /// Retrieves a reference to the specified portal's descriptor.
112    ///
113    /// If there is no such portal, returns `None`.
114    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc>;
115
116    /// Resolves the named database.
117    ///
118    /// If `database_name` exists in the catalog, it returns a reference to the
119    /// resolved database; otherwise it returns an error.
120    fn resolve_database(&self, database_name: &str) -> Result<&dyn CatalogDatabase, CatalogError>;
121
122    /// Gets a database by its ID.
123    ///
124    /// Panics if `id` does not specify a valid database.
125    fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase;
126
127    /// Gets all databases.
128    fn get_databases(&self) -> Vec<&dyn CatalogDatabase>;
129
130    /// Resolves a partially-specified schema name.
131    ///
132    /// If the schema exists in the catalog, it returns a reference to the
133    /// resolved schema; otherwise it returns an error.
134    fn resolve_schema(
135        &self,
136        database_name: Option<&str>,
137        schema_name: &str,
138    ) -> Result<&dyn CatalogSchema, CatalogError>;
139
140    /// Resolves a schema name within a specified database.
141    ///
142    /// If the schema exists in the database, it returns a reference to the
143    /// resolved schema; otherwise it returns an error.
144    fn resolve_schema_in_database(
145        &self,
146        database_spec: &ResolvedDatabaseSpecifier,
147        schema_name: &str,
148    ) -> Result<&dyn CatalogSchema, CatalogError>;
149
150    /// Gets a schema by its ID.
151    ///
152    /// Panics if `id` does not specify a valid schema.
153    fn get_schema(
154        &self,
155        database_spec: &ResolvedDatabaseSpecifier,
156        schema_spec: &SchemaSpecifier,
157    ) -> &dyn CatalogSchema;
158
159    /// Gets all schemas.
160    fn get_schemas(&self) -> Vec<&dyn CatalogSchema>;
161
162    /// Gets the mz_internal schema id.
163    fn get_mz_internal_schema_id(&self) -> SchemaId;
164
165    /// Gets the mz_unsafe schema id.
166    fn get_mz_unsafe_schema_id(&self) -> SchemaId;
167
168    /// Returns true if `schema` is an internal system schema, false otherwise
169    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool;
170
171    /// Resolves the named role.
172    fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>;
173
174    /// Resolves the named network policy.
175    fn resolve_network_policy(
176        &self,
177        network_policy_name: &str,
178    ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>;
179
180    /// Gets a role by its ID.
181    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>;
182
183    /// Gets a role by its ID.
184    ///
185    /// Panics if `id` does not specify a valid role.
186    fn get_role(&self, id: &RoleId) -> &dyn CatalogRole;
187
188    /// Gets all roles.
189    fn get_roles(&self) -> Vec<&dyn CatalogRole>;
190
191    /// Gets the id of the `mz_system` role.
192    fn mz_system_role_id(&self) -> RoleId;
193
194    /// Collects all role IDs that `id` is transitively a member of.
195    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId>;
196
197    /// Resolves the named cluster.
198    /// Gets a network_policy by its ID.
199    ///
200    /// Panics if `id` does not specify a valid role.
201    fn get_network_policy(&self, id: &NetworkPolicyId) -> &dyn CatalogNetworkPolicy;
202
203    /// Gets all roles.
204    fn get_network_policies(&self) -> Vec<&dyn CatalogNetworkPolicy>;
205
206    ///
207    /// If the provided name is `None`, resolves the currently active cluster.
208    fn resolve_cluster<'a, 'b>(
209        &'a self,
210        cluster_name: Option<&'b str>,
211    ) -> Result<&'a dyn CatalogCluster<'a>, CatalogError>;
212
213    /// Resolves the named cluster replica.
214    fn resolve_cluster_replica<'a, 'b>(
215        &'a self,
216        cluster_replica_name: &'b QualifiedReplica,
217    ) -> Result<&'a dyn CatalogClusterReplica<'a>, CatalogError>;
218
219    /// Resolves a partially-specified item name, that is NOT a function or
220    /// type. (For resolving functions or types, please use
221    /// [SessionCatalog::resolve_function] or [SessionCatalog::resolve_type].)
222    ///
223    /// If the partial name has a database component, it searches only the
224    /// specified database; otherwise, it searches the active database. If the
225    /// partial name has a schema component, it searches only the specified
226    /// schema; otherwise, it searches a default set of schemas within the
227    /// selected database. It returns an error if none of the searched schemas
228    /// contain an item whose name matches the item component of the partial
229    /// name.
230    ///
231    /// Note that it is not an error if the named item appears in more than one
232    /// of the search schemas. The catalog implementation must choose one.
233    fn resolve_item(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
234
235    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
236    /// functions within the catalog.
237    fn resolve_function(
238        &self,
239        item_name: &PartialItemName,
240    ) -> Result<&dyn CatalogItem, CatalogError>;
241
242    /// Performs the same operation as [`SessionCatalog::resolve_item`] but for
243    /// types within the catalog.
244    fn resolve_type(&self, item_name: &PartialItemName) -> Result<&dyn CatalogItem, CatalogError>;
245
246    /// Resolves `name` to a type or item, preferring the type if both exist.
247    fn resolve_item_or_type(
248        &self,
249        name: &PartialItemName,
250    ) -> Result<&dyn CatalogItem, CatalogError> {
251        if let Ok(ty) = self.resolve_type(name) {
252            return Ok(ty);
253        }
254        self.resolve_item(name)
255    }
256
257    /// Gets a type named `name` from exactly one of the system schemas.
258    ///
259    /// # Panics
260    /// - If `name` is not an entry in any system schema
261    /// - If more than one system schema has an entry named `name`.
262    fn get_system_type(&self, name: &str) -> &dyn CatalogItem;
263
264    /// Gets an item by its ID.
265    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn CatalogItem>;
266
267    /// Tries to get an item by a [`GlobalId`], returning `None` if the [`GlobalId`] does not
268    /// exist.
269    ///
270    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
271    fn try_get_item_by_global_id<'a>(
272        &'a self,
273        id: &GlobalId,
274    ) -> Option<Box<dyn CatalogCollectionItem + 'a>>;
275
276    /// Gets an item by its ID.
277    ///
278    /// Panics if `id` does not specify a valid item.
279    fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem;
280
281    /// Gets an item by a [`GlobalId`].
282    ///
283    /// Panics if `id` does not specify a valid item.
284    ///
285    /// Note: A single Catalog Item can have multiple [`GlobalId`]s associated with it.
286    fn get_item_by_global_id<'a>(&'a self, id: &GlobalId) -> Box<dyn CatalogCollectionItem + 'a>;
287
288    /// Gets all items.
289    fn get_items(&self) -> Vec<&dyn CatalogItem>;
290
291    /// Looks up an item by its name.
292    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
293
294    /// Looks up a type by its name.
295    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn CatalogItem>;
296
297    /// Gets a cluster by ID.
298    fn get_cluster(&self, id: ClusterId) -> &dyn CatalogCluster<'_>;
299
300    /// Gets all clusters.
301    fn get_clusters(&self) -> Vec<&dyn CatalogCluster<'_>>;
302
303    /// Gets a cluster replica by ID.
304    fn get_cluster_replica(
305        &self,
306        cluster_id: ClusterId,
307        replica_id: ReplicaId,
308    ) -> &dyn CatalogClusterReplica<'_>;
309
310    /// Gets all cluster replicas.
311    fn get_cluster_replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
312
313    /// Gets all system privileges.
314    fn get_system_privileges(&self) -> &PrivilegeMap;
315
316    /// Gets all default privileges.
317    fn get_default_privileges(
318        &self,
319    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)>;
320
321    /// Finds a name like `name` that is not already in use.
322    ///
323    /// If `name` itself is available, it is returned unchanged.
324    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName;
325
326    /// Returns a fully qualified human readable name from fully qualified non-human readable name
327    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName;
328
329    /// Returns a fully qualified human readable schema name from fully qualified non-human
330    /// readable schema name
331    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName;
332
333    /// Returns the [`CatalogItemId`] for from a [`GlobalId`].
334    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId;
335
336    /// Returns the [`GlobalId`] for the specificed Catalog Item, at the specified version.
337    fn resolve_global_id(
338        &self,
339        item_id: &CatalogItemId,
340        version: RelationVersionSelector,
341    ) -> GlobalId;
342
343    /// Returns the configuration of the catalog.
344    fn config(&self) -> &CatalogConfig;
345
346    /// Returns the number of milliseconds since the system epoch. For normal use
347    /// this means the Unix epoch. This can safely be mocked in tests and start
348    /// at 0.
349    fn now(&self) -> EpochMillis;
350
351    /// Returns the set of supported AWS PrivateLink availability zone ids.
352    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>>;
353
354    /// Returns true if the session has `restrict_to_user_objects` active.
355    ///
356    /// Defaults to false so that non-session catalog implementations (e.g. those used during
357    /// catalog rehydration) are unaffected.
358    fn restrict_to_user_objects(&self) -> bool {
359        false
360    }
361
362    /// Returns system vars
363    fn system_vars(&self) -> &SystemVars;
364
365    /// Returns mutable system vars
366    ///
367    /// Clients should use this this method carefully, as changes to the backing
368    /// state here are not guarateed to be persisted. The motivating use case
369    /// for this method was ensuring that features are temporary turned on so
370    /// catalog rehydration does not break due to unsupported SQL syntax.
371    fn system_vars_mut(&mut self) -> &mut SystemVars;
372
373    /// Returns the [`RoleId`] of the owner of an object by its ID.
374    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId>;
375
376    /// Returns the [`PrivilegeMap`] of the object.
377    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap>;
378
379    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
380    ///
381    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
382    /// earlier in the list than the roots. This is particularly userful for the order to drop
383    /// objects.
384    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId>;
385
386    /// Returns all the IDs of all objects that depend on `id`, including `id` themselves.
387    ///
388    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
389    /// earlier in the list than `id`. This is particularly userful for the order to drop
390    /// objects.
391    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId>;
392
393    /// Returns all possible privileges associated with an object type.
394    fn all_object_privileges(&self, object_type: SystemObjectType) -> AclMode;
395
396    /// Returns the object type of `object_id`.
397    fn get_object_type(&self, object_id: &ObjectId) -> ObjectType;
398
399    /// Returns the system object type of `id`.
400    fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType;
401
402    /// Returns the minimal qualification required to unambiguously specify
403    /// `qualified_name`.
404    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName;
405
406    /// Adds a [`PlanNotice`] that will be displayed to the user if the plan
407    /// successfully executes.
408    fn add_notice(&self, notice: PlanNotice);
409
410    /// Returns the associated comments for the given `id`
411    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>>;
412
413    /// Reports whether the specified cluster size is a modern "cc" size rather
414    /// than a legacy T-shirt size.
415    fn is_cluster_size_cc(&self, size: &str) -> bool;
416}
417
418/// Configuration associated with a catalog.
419#[derive(Debug, Clone)]
420pub struct CatalogConfig {
421    /// Returns the time at which the catalog booted.
422    pub start_time: DateTime<Utc>,
423    /// Returns the instant at which the catalog booted.
424    pub start_instant: Instant,
425    /// A random integer associated with this instance of the catalog.
426    ///
427    /// NOTE(benesch): this is only necessary for producing unique Kafka sink
428    /// topics. Perhaps we can remove this when database-issues#977 is complete.
429    pub nonce: u64,
430    /// A persistent ID associated with the environment.
431    pub environment_id: EnvironmentId,
432    /// A transient UUID associated with this process.
433    pub session_id: Uuid,
434    /// Information about this build of Materialize.
435    pub build_info: &'static BuildInfo,
436    /// Function that returns a wall clock now time; can safely be mocked to return
437    /// 0.
438    pub now: NowFn,
439    /// Context for source and sink connections.
440    pub connection_context: ConnectionContext,
441    /// Helm chart version
442    pub helm_chart_version: Option<String>,
443}
444
445/// A database in a [`SessionCatalog`].
446pub trait CatalogDatabase {
447    /// Returns a fully-specified name of the database.
448    fn name(&self) -> &str;
449
450    /// Returns a stable ID for the database.
451    fn id(&self) -> DatabaseId;
452
453    /// Returns whether the database contains schemas.
454    fn has_schemas(&self) -> bool;
455
456    /// Returns the schemas of the database as a map from schema name to
457    /// schema ID.
458    fn schema_ids(&self) -> &BTreeMap<String, SchemaId>;
459
460    /// Returns the schemas of the database.
461    fn schemas(&self) -> Vec<&dyn CatalogSchema>;
462
463    /// Returns the ID of the owning role.
464    fn owner_id(&self) -> RoleId;
465
466    /// Returns the privileges associated with the database.
467    fn privileges(&self) -> &PrivilegeMap;
468}
469
470/// A schema in a [`SessionCatalog`].
471pub trait CatalogSchema {
472    /// Returns a fully-specified id of the database
473    fn database(&self) -> &ResolvedDatabaseSpecifier;
474
475    /// Returns a fully-specified name of the schema.
476    fn name(&self) -> &QualifiedSchemaName;
477
478    /// Returns a stable ID for the schema.
479    fn id(&self) -> &SchemaSpecifier;
480
481    /// Lists the `CatalogItem`s for the schema.
482    fn has_items(&self) -> bool;
483
484    /// Returns the IDs of the items in the schema.
485    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_>;
486
487    /// Returns the ID of the owning role.
488    fn owner_id(&self) -> RoleId;
489
490    /// Returns the privileges associated with the schema.
491    fn privileges(&self) -> &PrivilegeMap;
492}
493
494/// Parameters used to modify password
495#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
496pub struct PasswordConfig {
497    /// The Password.
498    pub password: Password,
499    /// a non default iteration count for hashing the password.
500    pub scram_iterations: NonZeroU32,
501}
502
503/// A modification of a role password in the catalog
504#[derive(Debug, Clone, Eq, PartialEq, Arbitrary)]
505pub enum PasswordAction {
506    /// Set a new password.
507    Set(PasswordConfig),
508    /// Remove the existing password.
509    Clear,
510    /// Leave the existing password unchanged.
511    NoChange,
512}
513
514/// The authenticator that auto-provisioned a role on first login.
515#[derive(
516    Debug,
517    Copy,
518    Clone,
519    Eq,
520    PartialEq,
521    Ord,
522    PartialOrd,
523    Serialize,
524    Deserialize,
525    Arbitrary
526)]
527pub enum AutoProvisionSource {
528    /// Role was auto-provisioned by [`mz_auth::AuthenticatorKind::Oidc`].
529    Oidc,
530    /// Role was auto-provisioned by [`mz_auth::AuthenticatorKind::Frontegg`].
531    Frontegg,
532    /// Role was auto-provisioned by [`mz_auth::AuthenticatorKind::None`]
533    None,
534}
535
536/// A raw representation of attributes belonging to a [`CatalogRole`] that we might
537/// get as input from the user. This includes the password.
538/// This struct explicitly does not implement `Serialize` or `Deserialize` to avoid
539/// accidentally serializing passwords.
540#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Arbitrary)]
541pub struct RoleAttributesRaw {
542    /// Indicates whether the role has inheritance of privileges.
543    pub inherit: bool,
544    /// The raw password of the role. This is for self managed auth, not cloud.
545    pub password: Option<Password>,
546    /// Hash iterations used to securely store passwords. This is for self-managed auth
547    pub scram_iterations: Option<NonZeroU32>,
548    /// Whether or not this user is a superuser.
549    pub superuser: Option<bool>,
550    /// Whether this role is login
551    pub login: Option<bool>,
552    /// The authenticator that auto-provisioned this role, if any.
553    pub auto_provision_source: Option<AutoProvisionSource>,
554    // Force use of constructor.
555    _private: (),
556}
557
558/// Attributes belonging to a [`CatalogRole`].
559#[derive(
560    Debug,
561    Clone,
562    Eq,
563    Serialize,
564    Deserialize,
565    PartialEq,
566    Ord,
567    PartialOrd,
568    Arbitrary
569)]
570pub struct RoleAttributes {
571    /// Indicates whether the role has inheritance of privileges.
572    pub inherit: bool,
573    /// Whether or not this user is a superuser.
574    pub superuser: Option<bool>,
575    /// Whether this role is login
576    pub login: Option<bool>,
577    /// The authenticator that auto-provisioned this role, if any.
578    pub auto_provision_source: Option<AutoProvisionSource>,
579    // Force use of constructor.
580    _private: (),
581}
582
583impl RoleAttributesRaw {
584    /// Creates a new [`RoleAttributesRaw`] with default attributes.
585    pub const fn new() -> RoleAttributesRaw {
586        RoleAttributesRaw {
587            inherit: true,
588            password: None,
589            scram_iterations: None,
590            superuser: None,
591            login: None,
592            auto_provision_source: None,
593            _private: (),
594        }
595    }
596
597    /// Adds all attributes excluding password.
598    pub const fn with_all(mut self) -> RoleAttributesRaw {
599        self.inherit = true;
600        self.superuser = Some(true);
601        self.login = Some(true);
602        self
603    }
604}
605
606impl RoleAttributes {
607    /// Creates a new [`RoleAttributes`] with default attributes.
608    pub const fn new() -> RoleAttributes {
609        RoleAttributes {
610            inherit: true,
611            superuser: None,
612            login: None,
613            auto_provision_source: None,
614            _private: (),
615        }
616    }
617
618    /// Adds all attributes except password and auto_provision_source.
619    pub const fn with_all(mut self) -> RoleAttributes {
620        self.inherit = true;
621        self.superuser = Some(true);
622        self.login = Some(true);
623        self
624    }
625
626    /// Returns whether or not the role has inheritence of privileges.
627    pub const fn is_inherit(&self) -> bool {
628        self.inherit
629    }
630}
631
632impl From<RoleAttributesRaw> for RoleAttributes {
633    fn from(
634        RoleAttributesRaw {
635            inherit,
636            superuser,
637            login,
638            auto_provision_source,
639            ..
640        }: RoleAttributesRaw,
641    ) -> RoleAttributes {
642        RoleAttributes {
643            inherit,
644            superuser,
645            login,
646            auto_provision_source,
647            _private: (),
648        }
649    }
650}
651
652impl From<RoleAttributes> for RoleAttributesRaw {
653    fn from(
654        RoleAttributes {
655            inherit,
656            superuser,
657            login,
658            auto_provision_source,
659            ..
660        }: RoleAttributes,
661    ) -> RoleAttributesRaw {
662        RoleAttributesRaw {
663            inherit,
664            password: None,
665            scram_iterations: None,
666            superuser,
667            login,
668            auto_provision_source,
669            _private: (),
670        }
671    }
672}
673
674impl From<PlannedRoleAttributes> for RoleAttributesRaw {
675    fn from(
676        PlannedRoleAttributes {
677            inherit,
678            password,
679            scram_iterations,
680            superuser,
681            login,
682            ..
683        }: PlannedRoleAttributes,
684    ) -> RoleAttributesRaw {
685        let default_attributes = RoleAttributesRaw::new();
686        RoleAttributesRaw {
687            inherit: inherit.unwrap_or(default_attributes.inherit),
688            password,
689            scram_iterations,
690            superuser,
691            login,
692            auto_provision_source: None,
693            _private: (),
694        }
695    }
696}
697
698/// Default variable values for a [`CatalogRole`].
699#[derive(Default, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
700pub struct RoleVars {
701    /// Map of variable names to their value.
702    pub map: BTreeMap<String, OwnedVarInput>,
703}
704
705/// A role in a [`SessionCatalog`].
706pub trait CatalogRole {
707    /// Returns a fully-specified name of the role.
708    fn name(&self) -> &str;
709
710    /// Returns a stable ID for the role.
711    fn id(&self) -> RoleId;
712
713    /// Returns all role IDs that this role is an immediate a member of, and the grantor of that
714    /// membership.
715    ///
716    /// Key is the role that some role is a member of, value is the grantor role ID.
717    fn membership(&self) -> &BTreeMap<RoleId, RoleId>;
718
719    /// Returns the attributes associated with this role.
720    fn attributes(&self) -> &RoleAttributes;
721
722    /// Returns all variables that this role has a default value stored for.
723    fn vars(&self) -> &BTreeMap<String, OwnedVarInput>;
724}
725
726/// A network policy in a [`SessionCatalog`].
727pub trait CatalogNetworkPolicy {
728    /// Returns a fully-specified name of the NetworkPolicy.
729    fn name(&self) -> &str;
730
731    /// Returns a stable ID for the NetworkPolicy.
732    fn id(&self) -> NetworkPolicyId;
733
734    /// Returns the ID of the owning NetworkPolicy.
735    fn owner_id(&self) -> RoleId;
736
737    /// Returns the privileges associated with the NetworkPolicy.
738    fn privileges(&self) -> &PrivilegeMap;
739}
740
741/// A cluster in a [`SessionCatalog`].
742pub trait CatalogCluster<'a> {
743    /// Returns a fully-specified name of the cluster.
744    fn name(&self) -> &str;
745
746    /// Returns a stable ID for the cluster.
747    fn id(&self) -> ClusterId;
748
749    /// Returns the objects that are bound to this cluster.
750    fn bound_objects(&self) -> &BTreeSet<CatalogItemId>;
751
752    /// Returns the replicas of the cluster as a map from replica name to
753    /// replica ID.
754    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId>;
755
756    /// Returns the replicas of the cluster.
757    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>>;
758
759    /// Returns the replica belonging to the cluster with replica ID `id`.
760    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_>;
761
762    /// Returns the ID of the owning role.
763    fn owner_id(&self) -> RoleId;
764
765    /// Returns the privileges associated with the cluster.
766    fn privileges(&self) -> &PrivilegeMap;
767
768    /// Returns true if this cluster is a managed cluster.
769    fn is_managed(&self) -> bool;
770
771    /// Returns the size of the cluster, if the cluster is a managed cluster.
772    fn managed_size(&self) -> Option<&str>;
773
774    /// Returns the schedule of the cluster, if the cluster is a managed cluster.
775    fn schedule(&self) -> Option<&ClusterSchedule>;
776
777    /// Returns the replication factor of the cluster, if the cluster is a managed cluster.
778    fn replication_factor(&self) -> Option<u32>;
779
780    /// Try to convert this cluster into a [`CreateClusterPlan`].
781    // TODO(jkosh44) Make this infallible and convert to `to_plan`.
782    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError>;
783}
784
785/// A cluster replica in a [`SessionCatalog`]
786pub trait CatalogClusterReplica<'a>: Debug {
787    /// Returns the name of the cluster replica.
788    fn name(&self) -> &str;
789
790    /// Returns a stable ID for the cluster that the replica belongs to.
791    fn cluster_id(&self) -> ClusterId;
792
793    /// Returns a stable ID for the replica.
794    fn replica_id(&self) -> ReplicaId;
795
796    /// Returns the ID of the owning role.
797    fn owner_id(&self) -> RoleId;
798
799    /// Returns whether or not the replica is internal
800    fn internal(&self) -> bool;
801}
802
803/// An item in a [`SessionCatalog`].
804///
805/// Note that "item" has a very specific meaning in the context of a SQL
806/// catalog, and refers to the various entities that belong to a schema.
807pub trait CatalogItem {
808    /// Returns the fully qualified name of the catalog item.
809    fn name(&self) -> &QualifiedItemName;
810
811    /// Returns the [`CatalogItemId`] for the item.
812    fn id(&self) -> CatalogItemId;
813
814    /// Returns the [`GlobalId`]s associated with this item.
815    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_>;
816
817    /// Returns the catalog item's OID.
818    fn oid(&self) -> u32;
819
820    /// Returns the resolved function.
821    ///
822    /// If the catalog item is not of a type that produces functions (i.e.,
823    /// anything other than a function), it returns an error.
824    fn func(&self) -> Result<&'static Func, CatalogError>;
825
826    /// Returns the resolved source connection.
827    ///
828    /// If the catalog item is not of a type that contains a `SourceDesc`
829    /// (i.e., anything other than sources), it returns an error.
830    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, CatalogError>;
831
832    /// Returns the resolved connection.
833    ///
834    /// If the catalog item is not a connection, it returns an error.
835    fn connection(&self) -> Result<Connection<ReferencedConnection>, CatalogError>;
836
837    /// Returns the type of the catalog item.
838    fn item_type(&self) -> CatalogItemType;
839
840    /// A normalized SQL statement that describes how to create the catalog
841    /// item.
842    fn create_sql(&self) -> &str;
843
844    /// Returns the IDs of the catalog items upon which this catalog item
845    /// directly references.
846    fn references(&self) -> &ResolvedIds;
847
848    /// Returns the IDs of the catalog items upon which this catalog item
849    /// depends.
850    fn uses(&self) -> BTreeSet<CatalogItemId>;
851
852    /// Returns the IDs of the catalog items that directly reference this catalog item.
853    fn referenced_by(&self) -> &[CatalogItemId];
854
855    /// Returns the IDs of the catalog items that depend upon this catalog item.
856    fn used_by(&self) -> &[CatalogItemId];
857
858    /// Reports whether this catalog entry is a subsource and, if it is, the
859    /// ingestion it is an export of, as well as the item it exports.
860    fn subsource_details(
861        &self,
862    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)>;
863
864    /// Reports whether this catalog entry is a source export and, if it is, the
865    /// ingestion it is an export of, as well as the item it exports.
866    fn source_export_details(
867        &self,
868    ) -> Option<(
869        CatalogItemId,
870        &UnresolvedItemName,
871        &SourceExportDetails,
872        &SourceExportDataConfig<ReferencedConnection>,
873    )>;
874
875    /// Reports whether this catalog item is a progress source.
876    fn is_progress_source(&self) -> bool;
877
878    /// If this catalog item is a source, it return the IDs of its progress collection.
879    fn progress_id(&self) -> Option<CatalogItemId>;
880
881    /// Returns the index details associated with the catalog item, if the
882    /// catalog item is an index.
883    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)>;
884
885    /// Returns the column defaults associated with the catalog item, if the
886    /// catalog item is a table that accepts writes.
887    fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
888
889    /// The item this catalog item replaces, if any.
890    fn replacement_target(&self) -> Option<CatalogItemId>;
891
892    /// Returns the type information associated with the catalog item, if the
893    /// catalog item is a type.
894    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;
895
896    /// Returns the ID of the owning role.
897    fn owner_id(&self) -> RoleId;
898
899    /// Returns the privileges associated with the item.
900    fn privileges(&self) -> &PrivilegeMap;
901
902    /// Returns the cluster the item belongs to.
903    fn cluster_id(&self) -> Option<ClusterId>;
904
905    /// Returns the [`CatalogCollectionItem`] for a specific version of this
906    /// [`CatalogItem`].
907    fn at_version(&self, version: RelationVersionSelector) -> Box<dyn CatalogCollectionItem>;
908
909    /// The latest version of this item, if it's version-able.
910    fn latest_version(&self) -> Option<RelationVersion>;
911}
912
913/// An item in a [`SessionCatalog`] and the specific "collection"/pTVC that it
914/// refers to.
915pub trait CatalogCollectionItem: CatalogItem + Send + Sync {
916    /// Returns a description of the result set produced by the catalog item.
917    ///
918    /// If the catalog item is not of a type that produces data (e.g., a sink or
919    /// an index), it returns `None`.
920    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>>;
921
922    /// The [`GlobalId`] for this item.
923    fn global_id(&self) -> GlobalId;
924}
925
926/// The type of a [`CatalogItem`].
927#[derive(
928    Debug,
929    Deserialize,
930    Clone,
931    Copy,
932    Eq,
933    Hash,
934    Ord,
935    PartialEq,
936    PartialOrd,
937    Serialize
938)]
939pub enum CatalogItemType {
940    /// A table.
941    Table,
942    /// A source.
943    Source,
944    /// A sink.
945    Sink,
946    /// A view.
947    View,
948    /// A materialized view.
949    MaterializedView,
950    /// An index.
951    Index,
952    /// A type.
953    Type,
954    /// A func.
955    Func,
956    /// A secret.
957    Secret,
958    /// A connection.
959    Connection,
960}
961
962impl CatalogItemType {
963    /// Reports whether the given type of item conflicts with items of type
964    /// `CatalogItemType::Type`.
965    ///
966    /// In PostgreSQL, even though types live in a separate namespace from other
967    /// schema objects, creating a table, view, or materialized view creates a
968    /// type named after that relation. This prevents creating a type with the
969    /// same name as a relational object, even though types and relational
970    /// objects live in separate namespaces. (Indexes are even weirder; while
971    /// they don't get a type with the same name, they get an entry in
972    /// `pg_class` that prevents *record* types of the same name as the index,
973    /// but not other types of types, like enums.)
974    ///
975    /// We don't presently construct types that mirror relational objects,
976    /// though we likely will need to in the future for full PostgreSQL
977    /// compatibility (see database-issues#7142). For now, we use this method to
978    /// prevent creating types and relational objects that have the same name, so
979    /// that it is a backwards compatible change in the future to introduce a
980    /// type named after each relational object in the system.
981    pub fn conflicts_with_type(&self) -> bool {
982        match self {
983            CatalogItemType::Table => true,
984            CatalogItemType::Source => true,
985            CatalogItemType::View => true,
986            CatalogItemType::MaterializedView => true,
987            CatalogItemType::Index => true,
988            CatalogItemType::Type => true,
989            CatalogItemType::Sink => false,
990            CatalogItemType::Func => false,
991            CatalogItemType::Secret => false,
992            CatalogItemType::Connection => false,
993        }
994    }
995}
996
997impl fmt::Display for CatalogItemType {
998    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
999        match self {
1000            CatalogItemType::Table => f.write_str("table"),
1001            CatalogItemType::Source => f.write_str("source"),
1002            CatalogItemType::Sink => f.write_str("sink"),
1003            CatalogItemType::View => f.write_str("view"),
1004            CatalogItemType::MaterializedView => f.write_str("materialized view"),
1005            CatalogItemType::Index => f.write_str("index"),
1006            CatalogItemType::Type => f.write_str("type"),
1007            CatalogItemType::Func => f.write_str("func"),
1008            CatalogItemType::Secret => f.write_str("secret"),
1009            CatalogItemType::Connection => f.write_str("connection"),
1010        }
1011    }
1012}
1013
1014impl From<CatalogItemType> for ObjectType {
1015    fn from(value: CatalogItemType) -> Self {
1016        match value {
1017            CatalogItemType::Table => ObjectType::Table,
1018            CatalogItemType::Source => ObjectType::Source,
1019            CatalogItemType::Sink => ObjectType::Sink,
1020            CatalogItemType::View => ObjectType::View,
1021            CatalogItemType::MaterializedView => ObjectType::MaterializedView,
1022            CatalogItemType::Index => ObjectType::Index,
1023            CatalogItemType::Type => ObjectType::Type,
1024            CatalogItemType::Func => ObjectType::Func,
1025            CatalogItemType::Secret => ObjectType::Secret,
1026            CatalogItemType::Connection => ObjectType::Connection,
1027        }
1028    }
1029}
1030
1031impl From<CatalogItemType> for mz_audit_log::ObjectType {
1032    fn from(value: CatalogItemType) -> Self {
1033        match value {
1034            CatalogItemType::Table => mz_audit_log::ObjectType::Table,
1035            CatalogItemType::Source => mz_audit_log::ObjectType::Source,
1036            CatalogItemType::View => mz_audit_log::ObjectType::View,
1037            CatalogItemType::MaterializedView => mz_audit_log::ObjectType::MaterializedView,
1038            CatalogItemType::Index => mz_audit_log::ObjectType::Index,
1039            CatalogItemType::Type => mz_audit_log::ObjectType::Type,
1040            CatalogItemType::Sink => mz_audit_log::ObjectType::Sink,
1041            CatalogItemType::Func => mz_audit_log::ObjectType::Func,
1042            CatalogItemType::Secret => mz_audit_log::ObjectType::Secret,
1043            CatalogItemType::Connection => mz_audit_log::ObjectType::Connection,
1044        }
1045    }
1046}
1047
1048/// Details about a type in the catalog.
1049#[derive(Clone, Debug, Eq, PartialEq)]
1050pub struct CatalogTypeDetails<T: TypeReference> {
1051    /// The ID of the type with this type as the array element, if available.
1052    pub array_id: Option<CatalogItemId>,
1053    /// The description of this type.
1054    pub typ: CatalogType<T>,
1055    /// Additional metadata about the type in PostgreSQL, if relevant.
1056    pub pg_metadata: Option<CatalogTypePgMetadata>,
1057}
1058
1059/// Additional PostgreSQL metadata about a type.
1060#[derive(Clone, Debug, Eq, PartialEq)]
1061pub struct CatalogTypePgMetadata {
1062    /// The OID of the `typinput` function in PostgreSQL.
1063    pub typinput_oid: u32,
1064    /// The OID of the `typreceive` function in PostgreSQL.
1065    pub typreceive_oid: u32,
1066}
1067
1068/// Represents a reference to type in the catalog
1069pub trait TypeReference {
1070    /// The actual type used to reference a `CatalogType`
1071    type Reference: Clone + Debug + Eq + PartialEq;
1072}
1073
1074/// Reference to a type by it's name
1075#[derive(Clone, Debug, Eq, PartialEq)]
1076pub struct NameReference;
1077
1078impl TypeReference for NameReference {
1079    type Reference = &'static str;
1080}
1081
1082/// Reference to a type by it's global ID
1083#[derive(Clone, Debug, Eq, PartialEq)]
1084pub struct IdReference;
1085
1086impl TypeReference for IdReference {
1087    type Reference = CatalogItemId;
1088}
1089
1090/// A type stored in the catalog.
1091///
1092/// The variants correspond one-to-one with [`mz_repr::SqlScalarType`], but with type
1093/// modifiers removed and with embedded types replaced with references to other
1094/// types in the catalog.
1095#[allow(missing_docs)]
1096#[derive(Clone, Debug, Eq, PartialEq)]
1097pub enum CatalogType<T: TypeReference> {
1098    AclItem,
1099    Array {
1100        element_reference: T::Reference,
1101    },
1102    Bool,
1103    Bytes,
1104    Char,
1105    Date,
1106    Float32,
1107    Float64,
1108    Int16,
1109    Int32,
1110    Int64,
1111    UInt16,
1112    UInt32,
1113    UInt64,
1114    MzTimestamp,
1115    Interval,
1116    Jsonb,
1117    List {
1118        element_reference: T::Reference,
1119        element_modifiers: Vec<i64>,
1120    },
1121    Map {
1122        key_reference: T::Reference,
1123        key_modifiers: Vec<i64>,
1124        value_reference: T::Reference,
1125        value_modifiers: Vec<i64>,
1126    },
1127    Numeric,
1128    Oid,
1129    PgLegacyChar,
1130    PgLegacyName,
1131    Pseudo,
1132    Range {
1133        element_reference: T::Reference,
1134    },
1135    Record {
1136        fields: Vec<CatalogRecordField<T>>,
1137    },
1138    RegClass,
1139    RegProc,
1140    RegType,
1141    String,
1142    Time,
1143    Timestamp,
1144    TimestampTz,
1145    Uuid,
1146    VarChar,
1147    Int2Vector,
1148    MzAclItem,
1149}
1150
1151impl CatalogType<IdReference> {
1152    /// Returns the relation description for the type, if the type is a record
1153    /// type.
1154    pub fn desc(&self, catalog: &dyn SessionCatalog) -> Result<Option<RelationDesc>, PlanError> {
1155        match &self {
1156            CatalogType::Record { fields } => {
1157                let mut desc = RelationDesc::builder();
1158                for f in fields {
1159                    let name = f.name.clone();
1160                    let ty = query::scalar_type_from_catalog(
1161                        catalog,
1162                        f.type_reference,
1163                        &f.type_modifiers,
1164                    )?;
1165                    // TODO: support plumbing `NOT NULL` constraints through
1166                    // `CREATE TYPE`.
1167                    let ty = ty.nullable(true);
1168                    desc = desc.with_column(name, ty);
1169                }
1170                Ok(Some(desc.finish()))
1171            }
1172            _ => Ok(None),
1173        }
1174    }
1175}
1176
1177/// A description of a field in a [`CatalogType::Record`].
1178#[derive(Clone, Debug, Eq, PartialEq)]
1179pub struct CatalogRecordField<T: TypeReference> {
1180    /// The name of the field.
1181    pub name: ColumnName,
1182    /// The ID of the type of the field.
1183    pub type_reference: T::Reference,
1184    /// Modifiers to apply to the type.
1185    pub type_modifiers: Vec<i64>,
1186}
1187
1188#[derive(Clone, Debug, Eq, PartialEq)]
1189/// Mirrored from [PostgreSQL's `typcategory`][typcategory].
1190///
1191/// Note that Materialize also uses a number of pseudotypes when planning, but
1192/// we have yet to need to integrate them with `TypeCategory`.
1193///
1194/// [typcategory]:
1195/// https://www.postgresql.org/docs/9.6/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE
1196pub enum TypeCategory {
1197    /// Array type.
1198    Array,
1199    /// Bit string type.
1200    BitString,
1201    /// Boolean type.
1202    Boolean,
1203    /// Composite type.
1204    Composite,
1205    /// Date/time type.
1206    DateTime,
1207    /// Enum type.
1208    Enum,
1209    /// Geometric type.
1210    Geometric,
1211    /// List type. Materialize specific.
1212    List,
1213    /// Network address type.
1214    NetworkAddress,
1215    /// Numeric type.
1216    Numeric,
1217    /// Pseudo type.
1218    Pseudo,
1219    /// Range type.
1220    Range,
1221    /// String type.
1222    String,
1223    /// Timestamp type.
1224    Timespan,
1225    /// User-defined type.
1226    UserDefined,
1227    /// Unknown type.
1228    Unknown,
1229}
1230
1231impl fmt::Display for TypeCategory {
1232    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1233        f.write_str(match self {
1234            TypeCategory::Array => "array",
1235            TypeCategory::BitString => "bit-string",
1236            TypeCategory::Boolean => "boolean",
1237            TypeCategory::Composite => "composite",
1238            TypeCategory::DateTime => "date-time",
1239            TypeCategory::Enum => "enum",
1240            TypeCategory::Geometric => "geometric",
1241            TypeCategory::List => "list",
1242            TypeCategory::NetworkAddress => "network-address",
1243            TypeCategory::Numeric => "numeric",
1244            TypeCategory::Pseudo => "pseudo",
1245            TypeCategory::Range => "range",
1246            TypeCategory::String => "string",
1247            TypeCategory::Timespan => "timespan",
1248            TypeCategory::UserDefined => "user-defined",
1249            TypeCategory::Unknown => "unknown",
1250        })
1251    }
1252}
1253
1254/// Identifies an environment.
1255///
1256/// Outside of tests, an environment ID can be constructed only from a string of
1257/// the following form:
1258///
1259/// ```text
1260/// <CLOUD PROVIDER>-<CLOUD PROVIDER REGION>-<ORGANIZATION ID>-<ORDINAL>
1261/// ```
1262///
1263/// The fields have the following formats:
1264///
1265/// * The cloud provider consists of one or more alphanumeric characters.
1266/// * The cloud provider region consists of one or more alphanumeric or hyphen
1267///   characters.
1268/// * The organization ID is a UUID in its canonical text format.
1269/// * The ordinal is a decimal number with between one and eight digits.
1270///
1271/// There is no way to construct an environment ID from parts, to ensure that
1272/// the `Display` representation is parseable according to the above rules.
1273// NOTE(benesch): ideally we'd have accepted the components of the environment
1274// ID using separate command-line arguments, or at least a string format that
1275// used a field separator that did not appear in the fields. Alas. We can't
1276// easily change it now, as it's used as the e.g. default sink progress topic.
1277#[derive(Debug, Clone, PartialEq)]
1278pub struct EnvironmentId {
1279    cloud_provider: CloudProvider,
1280    cloud_provider_region: String,
1281    organization_id: Uuid,
1282    ordinal: u64,
1283}
1284
1285impl EnvironmentId {
1286    /// Creates a dummy `EnvironmentId` for use in tests.
1287    pub fn for_tests() -> EnvironmentId {
1288        EnvironmentId {
1289            cloud_provider: CloudProvider::Local,
1290            cloud_provider_region: "az1".into(),
1291            organization_id: Uuid::new_v4(),
1292            ordinal: 0,
1293        }
1294    }
1295
1296    /// Returns the cloud provider associated with this environment ID.
1297    pub fn cloud_provider(&self) -> &CloudProvider {
1298        &self.cloud_provider
1299    }
1300
1301    /// Returns the cloud provider region associated with this environment ID.
1302    pub fn cloud_provider_region(&self) -> &str {
1303        &self.cloud_provider_region
1304    }
1305
1306    /// Returns the name of the region associted with this environment ID.
1307    ///
1308    /// A region is a combination of [`EnvironmentId::cloud_provider`] and
1309    /// [`EnvironmentId::cloud_provider_region`].
1310    pub fn region(&self) -> String {
1311        format!("{}/{}", self.cloud_provider, self.cloud_provider_region)
1312    }
1313
1314    /// Returns the organization ID associated with this environment ID.
1315    pub fn organization_id(&self) -> Uuid {
1316        self.organization_id
1317    }
1318
1319    /// Returns the ordinal associated with this environment ID.
1320    pub fn ordinal(&self) -> u64 {
1321        self.ordinal
1322    }
1323}
1324
1325// *Warning*: once the LaunchDarkly integration is live, our contexts will be
1326// populated using this key. Consequently, any changes to that trait
1327// implementation will also have to be reflected in the existing feature
1328// targeting config in LaunchDarkly, otherwise environments might receive
1329// different configs upon restart.
1330impl fmt::Display for EnvironmentId {
1331    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1332        write!(
1333            f,
1334            "{}-{}-{}-{}",
1335            self.cloud_provider, self.cloud_provider_region, self.organization_id, self.ordinal
1336        )
1337    }
1338}
1339
1340impl FromStr for EnvironmentId {
1341    type Err = InvalidEnvironmentIdError;
1342
1343    fn from_str(s: &str) -> Result<EnvironmentId, InvalidEnvironmentIdError> {
1344        static MATCHER: LazyLock<Regex> = LazyLock::new(|| {
1345            Regex::new(
1346                "^(?P<cloud_provider>[[:alnum:]]+)-\
1347                  (?P<cloud_provider_region>[[:alnum:]\\-]+)-\
1348                  (?P<organization_id>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})-\
1349                  (?P<ordinal>\\d{1,8})$"
1350            ).unwrap()
1351        });
1352        let captures = MATCHER.captures(s).ok_or(InvalidEnvironmentIdError)?;
1353        Ok(EnvironmentId {
1354            cloud_provider: CloudProvider::from_str(&captures["cloud_provider"])?,
1355            cloud_provider_region: captures["cloud_provider_region"].into(),
1356            organization_id: captures["organization_id"]
1357                .parse()
1358                .map_err(|_| InvalidEnvironmentIdError)?,
1359            ordinal: captures["ordinal"]
1360                .parse()
1361                .map_err(|_| InvalidEnvironmentIdError)?,
1362        })
1363    }
1364}
1365
1366/// The error type for [`EnvironmentId::from_str`].
1367#[derive(Debug, Clone, PartialEq)]
1368pub struct InvalidEnvironmentIdError;
1369
1370impl fmt::Display for InvalidEnvironmentIdError {
1371    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1372        f.write_str("invalid environment ID")
1373    }
1374}
1375
1376impl Error for InvalidEnvironmentIdError {}
1377
1378impl From<InvalidCloudProviderError> for InvalidEnvironmentIdError {
1379    fn from(_: InvalidCloudProviderError) -> Self {
1380        InvalidEnvironmentIdError
1381    }
1382}
1383
1384/// An error returned by the catalog.
1385#[derive(Clone, Debug, Eq, PartialEq)]
1386pub enum CatalogError {
1387    /// Unknown database.
1388    UnknownDatabase(String),
1389    /// Database already exists.
1390    DatabaseAlreadyExists(String),
1391    /// Unknown schema.
1392    UnknownSchema(String),
1393    /// Schema already exists.
1394    SchemaAlreadyExists(String),
1395    /// Unknown role.
1396    UnknownRole(String),
1397    /// Role already exists.
1398    RoleAlreadyExists(String),
1399    /// Network Policy already exists.
1400    NetworkPolicyAlreadyExists(String),
1401    /// Unknown cluster.
1402    UnknownCluster(String),
1403    /// Unexpected builtin cluster.
1404    UnexpectedBuiltinCluster(String),
1405    /// Unexpected builtin cluster.
1406    UnexpectedBuiltinClusterType(String),
1407    /// Cluster already exists.
1408    ClusterAlreadyExists(String),
1409    /// Unknown cluster replica.
1410    UnknownClusterReplica(String),
1411    /// Unknown cluster replica size.
1412    UnknownClusterReplicaSize(String),
1413    /// Duplicate Replica. #[error("cannot create multiple replicas named '{0}' on cluster '{1}'")]
1414    DuplicateReplica(String, String),
1415    /// Unknown item.
1416    UnknownItem(String),
1417    /// Item already exists.
1418    ItemAlreadyExists(CatalogItemId, String),
1419    /// Unknown function.
1420    UnknownFunction {
1421        /// The identifier of the function we couldn't find
1422        name: String,
1423        /// A suggested alternative to the named function.
1424        alternative: Option<String>,
1425    },
1426    /// Unknown type.
1427    UnknownType {
1428        /// The identifier of the type we couldn't find.
1429        name: String,
1430    },
1431    /// Unknown connection.
1432    UnknownConnection(String),
1433    /// Unknown network policy.
1434    UnknownNetworkPolicy(String),
1435    /// Expected the catalog item to have the given type, but it did not.
1436    UnexpectedType {
1437        /// The item's name.
1438        name: String,
1439        /// The actual type of the item.
1440        actual_type: CatalogItemType,
1441        /// The expected type of the item.
1442        expected_type: CatalogItemType,
1443    },
1444    /// Ran out of unique IDs.
1445    IdExhaustion,
1446    /// Ran out of unique OIDs.
1447    OidExhaustion,
1448    /// Timeline already exists.
1449    TimelineAlreadyExists(String),
1450    /// Id Allocator already exists.
1451    IdAllocatorAlreadyExists(String),
1452    /// Config already exists.
1453    ConfigAlreadyExists(String),
1454    /// Builtin migrations failed.
1455    FailedBuiltinSchemaMigration(String),
1456    /// StorageCollectionMetadata already exists.
1457    StorageCollectionMetadataAlreadyExists(GlobalId),
1458}
1459
1460impl fmt::Display for CatalogError {
1461    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1462        match self {
1463            Self::UnknownDatabase(name) => write!(f, "unknown database '{}'", name),
1464            Self::DatabaseAlreadyExists(name) => write!(f, "database '{name}' already exists"),
1465            Self::UnknownFunction { name, .. } => write!(f, "function \"{}\" does not exist", name),
1466            Self::UnknownType { name, .. } => write!(f, "type \"{}\" does not exist", name),
1467            Self::UnknownConnection(name) => write!(f, "connection \"{}\" does not exist", name),
1468            Self::UnknownSchema(name) => write!(f, "unknown schema '{}'", name),
1469            Self::SchemaAlreadyExists(name) => write!(f, "schema '{name}' already exists"),
1470            Self::UnknownRole(name) => write!(f, "unknown role '{}'", name),
1471            Self::RoleAlreadyExists(name) => write!(f, "role '{name}' already exists"),
1472            Self::NetworkPolicyAlreadyExists(name) => {
1473                write!(f, "network policy '{name}' already exists")
1474            }
1475            Self::UnknownCluster(name) => write!(f, "unknown cluster '{}'", name),
1476            Self::UnknownNetworkPolicy(name) => write!(f, "unknown network policy '{}'", name),
1477            Self::UnexpectedBuiltinCluster(name) => {
1478                write!(f, "Unexpected builtin cluster '{}'", name)
1479            }
1480            Self::UnexpectedBuiltinClusterType(name) => {
1481                write!(f, "Unexpected builtin cluster type'{}'", name)
1482            }
1483            Self::ClusterAlreadyExists(name) => write!(f, "cluster '{name}' already exists"),
1484            Self::UnknownClusterReplica(name) => {
1485                write!(f, "unknown cluster replica '{}'", name)
1486            }
1487            Self::UnknownClusterReplicaSize(name) => {
1488                write!(f, "unknown cluster replica size '{}'", name)
1489            }
1490            Self::DuplicateReplica(replica_name, cluster_name) => write!(
1491                f,
1492                "cannot create multiple replicas named '{replica_name}' on cluster '{cluster_name}'"
1493            ),
1494            Self::UnknownItem(name) => write!(f, "unknown catalog item '{}'", name),
1495            Self::ItemAlreadyExists(_gid, name) => {
1496                write!(f, "catalog item '{name}' already exists")
1497            }
1498            Self::UnexpectedType {
1499                name,
1500                actual_type,
1501                expected_type,
1502            } => {
1503                write!(f, "\"{name}\" is a {actual_type} not a {expected_type}")
1504            }
1505            Self::IdExhaustion => write!(f, "id counter overflows i64"),
1506            Self::OidExhaustion => write!(f, "oid counter overflows u32"),
1507            Self::TimelineAlreadyExists(name) => write!(f, "timeline '{name}' already exists"),
1508            Self::IdAllocatorAlreadyExists(name) => {
1509                write!(f, "ID allocator '{name}' already exists")
1510            }
1511            Self::ConfigAlreadyExists(key) => write!(f, "config '{key}' already exists"),
1512            Self::FailedBuiltinSchemaMigration(objects) => {
1513                write!(f, "failed to migrate schema of builtin objects: {objects}")
1514            }
1515            Self::StorageCollectionMetadataAlreadyExists(key) => {
1516                write!(f, "storage metadata for '{key}' already exists")
1517            }
1518        }
1519    }
1520}
1521
1522impl CatalogError {
1523    /// Returns any applicable hints for [`CatalogError`].
1524    pub fn hint(&self) -> Option<String> {
1525        match self {
1526            CatalogError::UnknownFunction { alternative, .. } => {
1527                match alternative {
1528                    None => Some("No function matches the given name and argument types. You might need to add explicit type casts.".into()),
1529                    Some(alt) => Some(format!("Try using {alt}")),
1530                }
1531            }
1532            _ => None,
1533        }
1534    }
1535}
1536
1537impl Error for CatalogError {}
1538
1539// Enum variant docs would be useless here.
1540#[allow(missing_docs)]
1541#[derive(
1542    Debug,
1543    Clone,
1544    PartialOrd,
1545    Ord,
1546    PartialEq,
1547    Eq,
1548    Hash,
1549    Copy,
1550    Deserialize,
1551    Serialize
1552)]
1553/// The types of objects stored in the catalog.
1554pub enum ObjectType {
1555    Table,
1556    View,
1557    MaterializedView,
1558    Source,
1559    Sink,
1560    Index,
1561    Type,
1562    Role,
1563    Cluster,
1564    ClusterReplica,
1565    Secret,
1566    Connection,
1567    Database,
1568    Schema,
1569    Func,
1570    NetworkPolicy,
1571}
1572
1573impl ObjectType {
1574    /// Reports if the object type can be treated as a relation.
1575    pub fn is_relation(&self) -> bool {
1576        match self {
1577            ObjectType::Table
1578            | ObjectType::View
1579            | ObjectType::MaterializedView
1580            | ObjectType::Source => true,
1581            ObjectType::Sink
1582            | ObjectType::Index
1583            | ObjectType::Type
1584            | ObjectType::Secret
1585            | ObjectType::Connection
1586            | ObjectType::Func
1587            | ObjectType::Database
1588            | ObjectType::Schema
1589            | ObjectType::Cluster
1590            | ObjectType::ClusterReplica
1591            | ObjectType::Role
1592            | ObjectType::NetworkPolicy => false,
1593        }
1594    }
1595}
1596
1597impl From<mz_sql_parser::ast::ObjectType> for ObjectType {
1598    fn from(value: mz_sql_parser::ast::ObjectType) -> Self {
1599        match value {
1600            mz_sql_parser::ast::ObjectType::Table => ObjectType::Table,
1601            mz_sql_parser::ast::ObjectType::View => ObjectType::View,
1602            mz_sql_parser::ast::ObjectType::MaterializedView => ObjectType::MaterializedView,
1603            mz_sql_parser::ast::ObjectType::Source => ObjectType::Source,
1604            mz_sql_parser::ast::ObjectType::Subsource => ObjectType::Source,
1605            mz_sql_parser::ast::ObjectType::Sink => ObjectType::Sink,
1606            mz_sql_parser::ast::ObjectType::Index => ObjectType::Index,
1607            mz_sql_parser::ast::ObjectType::Type => ObjectType::Type,
1608            mz_sql_parser::ast::ObjectType::Role => ObjectType::Role,
1609            mz_sql_parser::ast::ObjectType::Cluster => ObjectType::Cluster,
1610            mz_sql_parser::ast::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1611            mz_sql_parser::ast::ObjectType::Secret => ObjectType::Secret,
1612            mz_sql_parser::ast::ObjectType::Connection => ObjectType::Connection,
1613            mz_sql_parser::ast::ObjectType::Database => ObjectType::Database,
1614            mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema,
1615            mz_sql_parser::ast::ObjectType::Func => ObjectType::Func,
1616            mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1617        }
1618    }
1619}
1620
1621impl From<CommentObjectId> for ObjectType {
1622    fn from(value: CommentObjectId) -> ObjectType {
1623        match value {
1624            CommentObjectId::Table(_) => ObjectType::Table,
1625            CommentObjectId::View(_) => ObjectType::View,
1626            CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1627            CommentObjectId::Source(_) => ObjectType::Source,
1628            CommentObjectId::Sink(_) => ObjectType::Sink,
1629            CommentObjectId::Index(_) => ObjectType::Index,
1630            CommentObjectId::Func(_) => ObjectType::Func,
1631            CommentObjectId::Connection(_) => ObjectType::Connection,
1632            CommentObjectId::Type(_) => ObjectType::Type,
1633            CommentObjectId::Secret(_) => ObjectType::Secret,
1634            CommentObjectId::Role(_) => ObjectType::Role,
1635            CommentObjectId::Database(_) => ObjectType::Database,
1636            CommentObjectId::Schema(_) => ObjectType::Schema,
1637            CommentObjectId::Cluster(_) => ObjectType::Cluster,
1638            CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1639            CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1640        }
1641    }
1642}
1643
1644impl Display for ObjectType {
1645    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1646        f.write_str(match self {
1647            ObjectType::Table => "TABLE",
1648            ObjectType::View => "VIEW",
1649            ObjectType::MaterializedView => "MATERIALIZED VIEW",
1650            ObjectType::Source => "SOURCE",
1651            ObjectType::Sink => "SINK",
1652            ObjectType::Index => "INDEX",
1653            ObjectType::Type => "TYPE",
1654            ObjectType::Role => "ROLE",
1655            ObjectType::Cluster => "CLUSTER",
1656            ObjectType::ClusterReplica => "CLUSTER REPLICA",
1657            ObjectType::Secret => "SECRET",
1658            ObjectType::Connection => "CONNECTION",
1659            ObjectType::Database => "DATABASE",
1660            ObjectType::Schema => "SCHEMA",
1661            ObjectType::Func => "FUNCTION",
1662            ObjectType::NetworkPolicy => "NETWORK POLICY",
1663        })
1664    }
1665}
1666
1667#[derive(
1668    Debug,
1669    Clone,
1670    PartialOrd,
1671    Ord,
1672    PartialEq,
1673    Eq,
1674    Hash,
1675    Copy,
1676    Deserialize,
1677    Serialize
1678)]
1679/// The types of objects in the system.
1680pub enum SystemObjectType {
1681    /// Catalog object type.
1682    Object(ObjectType),
1683    /// Entire system.
1684    System,
1685}
1686
1687impl SystemObjectType {
1688    /// Reports if the object type can be treated as a relation.
1689    pub fn is_relation(&self) -> bool {
1690        match self {
1691            SystemObjectType::Object(object_type) => object_type.is_relation(),
1692            SystemObjectType::System => false,
1693        }
1694    }
1695}
1696
1697impl Display for SystemObjectType {
1698    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1699        match self {
1700            SystemObjectType::Object(object_type) => std::fmt::Display::fmt(&object_type, f),
1701            SystemObjectType::System => f.write_str("SYSTEM"),
1702        }
1703    }
1704}
1705
1706/// Enum used to format object names in error messages.
1707#[derive(Debug, Clone, PartialEq, Eq)]
1708pub enum ErrorMessageObjectDescription {
1709    /// The name of a specific object.
1710    Object {
1711        /// Type of object.
1712        object_type: ObjectType,
1713        /// Name of object.
1714        object_name: Option<String>,
1715    },
1716    /// The name of the entire system.
1717    System,
1718}
1719
1720impl ErrorMessageObjectDescription {
1721    /// Generate a new [`ErrorMessageObjectDescription`] from an [`ObjectId`].
1722    pub fn from_id(
1723        object_id: &ObjectId,
1724        catalog: &dyn SessionCatalog,
1725    ) -> ErrorMessageObjectDescription {
1726        let object_name = match object_id {
1727            ObjectId::Cluster(cluster_id) => catalog.get_cluster(*cluster_id).name().to_string(),
1728            ObjectId::ClusterReplica((cluster_id, replica_id)) => catalog
1729                .get_cluster_replica(*cluster_id, *replica_id)
1730                .name()
1731                .to_string(),
1732            ObjectId::Database(database_id) => catalog.get_database(database_id).name().to_string(),
1733            ObjectId::Schema((database_spec, schema_spec)) => {
1734                let name = catalog.get_schema(database_spec, schema_spec).name();
1735                catalog.resolve_full_schema_name(name).to_string()
1736            }
1737            ObjectId::Role(role_id) => catalog.get_role(role_id).name().to_string(),
1738            ObjectId::Item(id) => {
1739                let name = catalog.get_item(id).name();
1740                catalog.resolve_full_name(name).to_string()
1741            }
1742            ObjectId::NetworkPolicy(network_policy_id) => catalog
1743                .get_network_policy(network_policy_id)
1744                .name()
1745                .to_string(),
1746        };
1747        ErrorMessageObjectDescription::Object {
1748            object_type: catalog.get_object_type(object_id),
1749            object_name: Some(object_name),
1750        }
1751    }
1752
1753    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectId`].
1754    pub fn from_sys_id(
1755        object_id: &SystemObjectId,
1756        catalog: &dyn SessionCatalog,
1757    ) -> ErrorMessageObjectDescription {
1758        match object_id {
1759            SystemObjectId::Object(object_id) => {
1760                ErrorMessageObjectDescription::from_id(object_id, catalog)
1761            }
1762            SystemObjectId::System => ErrorMessageObjectDescription::System,
1763        }
1764    }
1765
1766    /// Generate a new [`ErrorMessageObjectDescription`] from a [`SystemObjectType`].
1767    pub fn from_object_type(object_type: SystemObjectType) -> ErrorMessageObjectDescription {
1768        match object_type {
1769            SystemObjectType::Object(object_type) => ErrorMessageObjectDescription::Object {
1770                object_type,
1771                object_name: None,
1772            },
1773            SystemObjectType::System => ErrorMessageObjectDescription::System,
1774        }
1775    }
1776}
1777
1778impl Display for ErrorMessageObjectDescription {
1779    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1780        match self {
1781            ErrorMessageObjectDescription::Object {
1782                object_type,
1783                object_name,
1784            } => {
1785                let object_name = object_name
1786                    .as_ref()
1787                    .map(|object_name| format!(" {}", object_name.quoted()))
1788                    .unwrap_or_else(|| "".to_string());
1789                write!(f, "{object_type}{object_name}")
1790            }
1791            ErrorMessageObjectDescription::System => f.write_str("SYSTEM"),
1792        }
1793    }
1794}
1795
1796#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
1797// These attributes are needed because the key of a map must be a string. We also
1798// get the added benefit of flattening this struct in it's serialized form.
1799#[serde(into = "BTreeMap<String, RoleId>")]
1800#[serde(try_from = "BTreeMap<String, RoleId>")]
1801/// Represents the grantee and a grantor of a role membership.
1802pub struct RoleMembership {
1803    /// Key is the role that some role is a member of, value is the grantor role ID.
1804    // TODO(jkosh44) This structure does not allow a role to have multiple of the same membership
1805    // from different grantors. This isn't a problem now since we don't implement ADMIN OPTION, but
1806    // we should figure this out before implementing ADMIN OPTION. It will likely require a messy
1807    // migration.
1808    pub map: BTreeMap<RoleId, RoleId>,
1809}
1810
1811impl RoleMembership {
1812    /// Creates a new [`RoleMembership`].
1813    pub fn new() -> RoleMembership {
1814        RoleMembership {
1815            map: BTreeMap::new(),
1816        }
1817    }
1818}
1819
1820impl From<RoleMembership> for BTreeMap<String, RoleId> {
1821    fn from(value: RoleMembership) -> Self {
1822        value
1823            .map
1824            .into_iter()
1825            .map(|(k, v)| (k.to_string(), v))
1826            .collect()
1827    }
1828}
1829
1830impl TryFrom<BTreeMap<String, RoleId>> for RoleMembership {
1831    type Error = anyhow::Error;
1832
1833    fn try_from(value: BTreeMap<String, RoleId>) -> Result<Self, Self::Error> {
1834        Ok(RoleMembership {
1835            map: value
1836                .into_iter()
1837                .map(|(k, v)| Ok((RoleId::from_str(&k)?, v)))
1838                .collect::<Result<_, anyhow::Error>>()?,
1839        })
1840    }
1841}
1842
1843/// Specification for objects that will be affected by a default privilege.
1844#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1845pub struct DefaultPrivilegeObject {
1846    /// The role id that created the object.
1847    pub role_id: RoleId,
1848    /// The database that the object is created in if Some, otherwise all databases.
1849    pub database_id: Option<DatabaseId>,
1850    /// The schema that the object is created in if Some, otherwise all databases.
1851    pub schema_id: Option<SchemaId>,
1852    /// The type of object.
1853    pub object_type: ObjectType,
1854}
1855
1856impl DefaultPrivilegeObject {
1857    /// Creates a new [`DefaultPrivilegeObject`].
1858    pub fn new(
1859        role_id: RoleId,
1860        database_id: Option<DatabaseId>,
1861        schema_id: Option<SchemaId>,
1862        object_type: ObjectType,
1863    ) -> DefaultPrivilegeObject {
1864        DefaultPrivilegeObject {
1865            role_id,
1866            database_id,
1867            schema_id,
1868            object_type,
1869        }
1870    }
1871}
1872
1873impl std::fmt::Display for DefaultPrivilegeObject {
1874    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1875        // TODO: Don't just wrap Debug.
1876        write!(f, "{self:?}")
1877    }
1878}
1879
1880/// Specification for the privileges that will be granted from default privileges.
1881#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
1882pub struct DefaultPrivilegeAclItem {
1883    /// The role that will receive the privileges.
1884    pub grantee: RoleId,
1885    /// The specific privileges granted.
1886    pub acl_mode: AclMode,
1887}
1888
1889impl DefaultPrivilegeAclItem {
1890    /// Creates a new [`DefaultPrivilegeAclItem`].
1891    pub fn new(grantee: RoleId, acl_mode: AclMode) -> DefaultPrivilegeAclItem {
1892        DefaultPrivilegeAclItem { grantee, acl_mode }
1893    }
1894
1895    /// Converts this [`DefaultPrivilegeAclItem`] into an [`MzAclItem`].
1896    pub fn mz_acl_item(self, grantor: RoleId) -> MzAclItem {
1897        MzAclItem {
1898            grantee: self.grantee,
1899            grantor,
1900            acl_mode: self.acl_mode,
1901        }
1902    }
1903}
1904
1905#[cfg(test)]
1906mod tests {
1907    use super::{CloudProvider, EnvironmentId, InvalidEnvironmentIdError};
1908
1909    #[mz_ore::test]
1910    fn test_environment_id() {
1911        for (input, expected) in [
1912            (
1913                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1914                Ok(EnvironmentId {
1915                    cloud_provider: CloudProvider::Local,
1916                    cloud_provider_region: "az1".into(),
1917                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1918                    ordinal: 452,
1919                }),
1920            ),
1921            (
1922                "aws-us-east-1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1923                Ok(EnvironmentId {
1924                    cloud_provider: CloudProvider::Aws,
1925                    cloud_provider_region: "us-east-1".into(),
1926                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1927                    ordinal: 0,
1928                }),
1929            ),
1930            (
1931                "gcp-us-central1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1932                Ok(EnvironmentId {
1933                    cloud_provider: CloudProvider::Gcp,
1934                    cloud_provider_region: "us-central1".into(),
1935                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1936                    ordinal: 0,
1937                }),
1938            ),
1939            (
1940                "azure-australiaeast-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1941                Ok(EnvironmentId {
1942                    cloud_provider: CloudProvider::Azure,
1943                    cloud_provider_region: "australiaeast".into(),
1944                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1945                    ordinal: 0,
1946                }),
1947            ),
1948            (
1949                "generic-moon-station-11-darkside-1497a3b7-a455-4fc4-8752-b44a94b5f90a-0",
1950                Ok(EnvironmentId {
1951                    cloud_provider: CloudProvider::Generic,
1952                    cloud_provider_region: "moon-station-11-darkside".into(),
1953                    organization_id: "1497a3b7-a455-4fc4-8752-b44a94b5f90a".parse().unwrap(),
1954                    ordinal: 0,
1955                }),
1956            ),
1957            ("", Err(InvalidEnvironmentIdError)),
1958            (
1959                "local-az1-1497a3b7-a455-4fc4-8752-b44a94b5f90a-123456789",
1960                Err(InvalidEnvironmentIdError),
1961            ),
1962            (
1963                "local-1497a3b7-a455-4fc4-8752-b44a94b5f90a-452",
1964                Err(InvalidEnvironmentIdError),
1965            ),
1966            (
1967                "local-az1-1497a3b7-a455-4fc48752-b44a94b5f90a-452",
1968                Err(InvalidEnvironmentIdError),
1969            ),
1970        ] {
1971            let actual = input.parse();
1972            assert_eq!(expected, actual, "input = {}", input);
1973            if let Ok(actual) = actual {
1974                assert_eq!(input, actual.to_string(), "input = {}", input);
1975            }
1976        }
1977    }
1978}