Skip to main content

mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35    RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53    ClusterReplicaValue, ClusterSystemConfiguration, ClusterSystemConfigurationKey,
54    ClusterSystemConfigurationValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey,
55    ConfigValue, Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey,
56    DefaultPrivilegesValue, DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
57    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
58    ReplicaConfig, ReplicaSystemConfiguration, ReplicaSystemConfigurationKey,
59    ReplicaSystemConfigurationValue, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
60    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
61    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
62    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
63    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
64};
65use crate::durable::{
66    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
67    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
68    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
69    SCHEMA_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
70    SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
71    USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
72};
73use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
74
75type Timestamp = u64;
76
77/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
78/// An operation also logically groups multiple catalog updates together.
79#[derive(Derivative)]
80#[derivative(Debug)]
81pub struct Transaction<'a> {
82    #[derivative(Debug = "ignore")]
83    #[derivative(PartialEq = "ignore")]
84    durable_catalog: &'a mut dyn DurableCatalogState,
85    databases: TableTransaction<DatabaseKey, DatabaseValue>,
86    schemas: TableTransaction<SchemaKey, SchemaValue>,
87    items: TableTransaction<ItemKey, ItemValue>,
88    comments: TableTransaction<CommentKey, CommentValue>,
89    roles: TableTransaction<RoleKey, RoleValue>,
90    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
91    clusters: TableTransaction<ClusterKey, ClusterValue>,
92    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
93    introspection_sources:
94        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
95    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
96    configs: TableTransaction<ConfigKey, ConfigValue>,
97    settings: TableTransaction<SettingKey, SettingValue>,
98    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
99    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
100    cluster_system_configurations:
101        TableTransaction<ClusterSystemConfigurationKey, ClusterSystemConfigurationValue>,
102    replica_system_configurations:
103        TableTransaction<ReplicaSystemConfigurationKey, ReplicaSystemConfigurationValue>,
104    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
105    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
106    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
107    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
108    storage_collection_metadata:
109        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
110    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
111    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
112    // Don't make this a table transaction so that it's not read into the
113    // in-memory cache.
114    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
115    /// The upper of `durable_catalog` at the start of the transaction.
116    upper: mz_repr::Timestamp,
117    /// The ID of the current operation of this transaction.
118    op_id: Timestamp,
119}
120
121impl<'a> Transaction<'a> {
122    pub fn new(
123        durable_catalog: &'a mut dyn DurableCatalogState,
124        Snapshot {
125            databases,
126            schemas,
127            roles,
128            role_auth,
129            items,
130            comments,
131            clusters,
132            network_policies,
133            cluster_replicas,
134            introspection_sources,
135            id_allocator,
136            configs,
137            settings,
138            source_references,
139            system_object_mappings,
140            system_configurations,
141            cluster_system_configurations,
142            replica_system_configurations,
143            default_privileges,
144            system_privileges,
145            storage_collection_metadata,
146            unfinalized_shards,
147            txn_wal_shard,
148        }: Snapshot,
149        upper: mz_repr::Timestamp,
150    ) -> Result<Transaction<'a>, CatalogError> {
151        Ok(Transaction {
152            durable_catalog,
153            databases: TableTransaction::new_with_uniqueness_fn(
154                databases,
155                |a: &DatabaseValue, b| a.name == b.name,
156            )?,
157            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
158                a.database_id == b.database_id && a.name == b.name
159            })?,
160            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
161                a.schema_id == b.schema_id && a.name == b.name && {
162                    // `item_type` is slow, only compute if needed.
163                    let a_type = a.item_type();
164                    let b_type = b.item_type();
165                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
166                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
167                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
168                }
169            })?,
170            comments: TableTransaction::new(comments)?,
171            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
172                a.name == b.name
173            })?,
174            role_auth: TableTransaction::new(role_auth)?,
175            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
176                a.name == b.name
177            })?,
178            network_policies: TableTransaction::new_with_uniqueness_fn(
179                network_policies,
180                |a: &NetworkPolicyValue, b| a.name == b.name,
181            )?,
182            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
183                cluster_replicas,
184                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
185            )?,
186            introspection_sources: TableTransaction::new(introspection_sources)?,
187            id_allocator: TableTransaction::new(id_allocator)?,
188            configs: TableTransaction::new(configs)?,
189            settings: TableTransaction::new(settings)?,
190            source_references: TableTransaction::new(source_references)?,
191            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
192            system_configurations: TableTransaction::new(system_configurations)?,
193            cluster_system_configurations: TableTransaction::new(cluster_system_configurations)?,
194            replica_system_configurations: TableTransaction::new(replica_system_configurations)?,
195            default_privileges: TableTransaction::new(default_privileges)?,
196            system_privileges: TableTransaction::new(system_privileges)?,
197            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
198            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
199            // Uniqueness violations for this value occur at the key rather than
200            // the value (the key is the unit struct `()` so this is a singleton
201            // value).
202            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
203            audit_log_updates: Vec::new(),
204            upper,
205            op_id: 0,
206        })
207    }
208
209    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
210        let key = ItemKey { id: *id };
211        self.items
212            .get(&key)
213            .map(|v| DurableType::from_key_value(key, v.clone()))
214    }
215
216    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
217        self.items
218            .items()
219            .into_iter()
220            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
221            .sorted_by_key(|Item { id, .. }| *id)
222    }
223
224    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
225        self.insert_audit_log_events([event]);
226    }
227
228    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
229        let events = events
230            .into_iter()
231            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
232        self.audit_log_updates.extend(events);
233    }
234
235    pub fn insert_user_database(
236        &mut self,
237        database_name: &str,
238        owner_id: RoleId,
239        privileges: Vec<MzAclItem>,
240        temporary_oids: &HashSet<u32>,
241    ) -> Result<(DatabaseId, u32), CatalogError> {
242        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
243        let id = DatabaseId::User(id);
244        let oid = self.allocate_oid(temporary_oids)?;
245        self.insert_database(id, database_name, owner_id, privileges, oid)?;
246        Ok((id, oid))
247    }
248
249    pub(crate) fn insert_database(
250        &mut self,
251        id: DatabaseId,
252        database_name: &str,
253        owner_id: RoleId,
254        privileges: Vec<MzAclItem>,
255        oid: u32,
256    ) -> Result<u32, CatalogError> {
257        match self.databases.insert(
258            DatabaseKey { id },
259            DatabaseValue {
260                name: database_name.to_string(),
261                owner_id,
262                privileges,
263                oid,
264            },
265            self.op_id,
266        ) {
267            Ok(_) => Ok(oid),
268            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
269        }
270    }
271
272    pub fn insert_user_schema(
273        &mut self,
274        database_id: DatabaseId,
275        schema_name: &str,
276        owner_id: RoleId,
277        privileges: Vec<MzAclItem>,
278        temporary_oids: &HashSet<u32>,
279    ) -> Result<(SchemaId, u32), CatalogError> {
280        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
281        let id = SchemaId::User(id);
282        let oid = self.allocate_oid(temporary_oids)?;
283        self.insert_schema(
284            id,
285            Some(database_id),
286            schema_name.to_string(),
287            owner_id,
288            privileges,
289            oid,
290        )?;
291        Ok((id, oid))
292    }
293
294    pub fn insert_system_schema(
295        &mut self,
296        schema_id: u64,
297        schema_name: &str,
298        owner_id: RoleId,
299        privileges: Vec<MzAclItem>,
300        oid: u32,
301    ) -> Result<(), CatalogError> {
302        let id = SchemaId::System(schema_id);
303        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
304    }
305
306    pub(crate) fn insert_schema(
307        &mut self,
308        schema_id: SchemaId,
309        database_id: Option<DatabaseId>,
310        schema_name: String,
311        owner_id: RoleId,
312        privileges: Vec<MzAclItem>,
313        oid: u32,
314    ) -> Result<(), CatalogError> {
315        match self.schemas.insert(
316            SchemaKey { id: schema_id },
317            SchemaValue {
318                database_id,
319                name: schema_name.clone(),
320                owner_id,
321                privileges,
322                oid,
323            },
324            self.op_id,
325        ) {
326            Ok(_) => Ok(()),
327            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
328        }
329    }
330
331    pub fn insert_builtin_role(
332        &mut self,
333        id: RoleId,
334        name: String,
335        attributes: RoleAttributesRaw,
336        membership: RoleMembership,
337        vars: RoleVars,
338        oid: u32,
339    ) -> Result<RoleId, CatalogError> {
340        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
341        self.insert_role(id, name, attributes, membership, vars, oid)?;
342        Ok(id)
343    }
344
345    pub fn insert_user_role(
346        &mut self,
347        name: String,
348        attributes: RoleAttributesRaw,
349        membership: RoleMembership,
350        vars: RoleVars,
351        temporary_oids: &HashSet<u32>,
352    ) -> Result<(RoleId, u32), CatalogError> {
353        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
354        let id = RoleId::User(id);
355        let oid = self.allocate_oid(temporary_oids)?;
356        self.insert_role(id, name, attributes, membership, vars, oid)?;
357        Ok((id, oid))
358    }
359
360    fn insert_role(
361        &mut self,
362        id: RoleId,
363        name: String,
364        attributes: RoleAttributesRaw,
365        membership: RoleMembership,
366        vars: RoleVars,
367        oid: u32,
368    ) -> Result<(), CatalogError> {
369        if let Some(ref password) = attributes.password {
370            let hash = mz_auth::hash::scram256_hash(
371                password,
372                &attributes
373                    .scram_iterations
374                    .or_else(|| {
375                        soft_panic_or_log!(
376                            "Hash iterations must be set if a password is provided."
377                        );
378                        None
379                    })
380                    // This should never happen, but rather than panicking we'll
381                    // set a known secure value as a fallback.
382                    .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
383            )
384            .expect("password hash should be valid");
385            match self.role_auth.insert(
386                RoleAuthKey { role_id: id },
387                RoleAuthValue {
388                    password_hash: Some(hash),
389                    updated_at: SYSTEM_TIME(),
390                },
391                self.op_id,
392            ) {
393                Ok(_) => {}
394                Err(_) => {
395                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
396                }
397            }
398        }
399
400        match self.roles.insert(
401            RoleKey { id },
402            RoleValue {
403                name: name.clone(),
404                attributes: attributes.into(),
405                membership,
406                vars,
407                oid,
408            },
409            self.op_id,
410        ) {
411            Ok(_) => Ok(()),
412            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
413        }
414    }
415
416    /// Panics if any introspection source id is not a system id
417    pub fn insert_user_cluster(
418        &mut self,
419        cluster_id: ClusterId,
420        cluster_name: &str,
421        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
422        owner_id: RoleId,
423        privileges: Vec<MzAclItem>,
424        config: ClusterConfig,
425        temporary_oids: &HashSet<u32>,
426    ) -> Result<(), CatalogError> {
427        self.insert_cluster(
428            cluster_id,
429            cluster_name,
430            introspection_source_indexes,
431            owner_id,
432            privileges,
433            config,
434            temporary_oids,
435        )
436    }
437
438    /// Panics if any introspection source id is not a system id
439    pub fn insert_system_cluster(
440        &mut self,
441        cluster_name: &str,
442        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
443        privileges: Vec<MzAclItem>,
444        owner_id: RoleId,
445        config: ClusterConfig,
446        temporary_oids: &HashSet<u32>,
447    ) -> Result<ClusterId, CatalogError> {
448        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
449        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
450        self.insert_cluster(
451            cluster_id,
452            cluster_name,
453            introspection_source_indexes,
454            owner_id,
455            privileges,
456            config,
457            temporary_oids,
458        )?;
459        Ok(cluster_id)
460    }
461
462    fn insert_cluster(
463        &mut self,
464        cluster_id: ClusterId,
465        cluster_name: &str,
466        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
467        owner_id: RoleId,
468        privileges: Vec<MzAclItem>,
469        config: ClusterConfig,
470        temporary_oids: &HashSet<u32>,
471    ) -> Result<(), CatalogError> {
472        if let Err(_) = self.clusters.insert(
473            ClusterKey { id: cluster_id },
474            ClusterValue {
475                name: cluster_name.to_string(),
476                owner_id,
477                privileges,
478                config,
479            },
480            self.op_id,
481        ) {
482            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
483        };
484
485        let amount = usize_to_u64(introspection_source_indexes.len());
486        let oids = self.allocate_oids(amount, temporary_oids)?;
487        let introspection_source_indexes: Vec<_> = introspection_source_indexes
488            .into_iter()
489            .zip_eq(oids)
490            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
491            .collect();
492        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
493            let introspection_source_index = IntrospectionSourceIndex {
494                cluster_id,
495                name: builtin.name.to_string(),
496                item_id,
497                index_id,
498                oid,
499            };
500            let (key, value) = introspection_source_index.into_key_value();
501            self.introspection_sources
502                .insert(key, value, self.op_id)
503                .expect("no uniqueness violation");
504        }
505
506        Ok(())
507    }
508
509    pub fn rename_cluster(
510        &mut self,
511        cluster_id: ClusterId,
512        cluster_name: &str,
513        cluster_to_name: &str,
514    ) -> Result<(), CatalogError> {
515        let key = ClusterKey { id: cluster_id };
516
517        match self.clusters.update(
518            |k, v| {
519                if *k == key {
520                    let mut value = v.clone();
521                    value.name = cluster_to_name.to_string();
522                    Some(value)
523                } else {
524                    None
525                }
526            },
527            self.op_id,
528        )? {
529            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
530            Diff::ONE => Ok(()),
531            n => panic!(
532                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
533            ),
534        }
535    }
536
537    pub fn rename_cluster_replica(
538        &mut self,
539        replica_id: ReplicaId,
540        replica_name: &QualifiedReplica,
541        replica_to_name: &str,
542    ) -> Result<(), CatalogError> {
543        let key = ClusterReplicaKey { id: replica_id };
544
545        match self.cluster_replicas.update(
546            |k, v| {
547                if *k == key {
548                    let mut value = v.clone();
549                    value.name = replica_to_name.to_string();
550                    Some(value)
551                } else {
552                    None
553                }
554            },
555            self.op_id,
556        )? {
557            Diff::ZERO => {
558                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
559            }
560            Diff::ONE => Ok(()),
561            n => panic!(
562                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
563            ),
564        }
565    }
566
567    pub fn insert_cluster_replica_with_id(
568        &mut self,
569        cluster_id: ClusterId,
570        replica_id: ReplicaId,
571        replica_name: &str,
572        config: ReplicaConfig,
573        owner_id: RoleId,
574    ) -> Result<(), CatalogError> {
575        if let Err(_) = self.cluster_replicas.insert(
576            ClusterReplicaKey { id: replica_id },
577            ClusterReplicaValue {
578                cluster_id,
579                name: replica_name.into(),
580                config,
581                owner_id,
582            },
583            self.op_id,
584        ) {
585            let cluster = self
586                .clusters
587                .get(&ClusterKey { id: cluster_id })
588                .expect("cluster exists");
589            return Err(SqlCatalogError::DuplicateReplica(
590                replica_name.to_string(),
591                cluster.name.to_string(),
592            )
593            .into());
594        };
595        Ok(())
596    }
597
598    pub fn insert_user_network_policy(
599        &mut self,
600        name: String,
601        rules: Vec<NetworkPolicyRule>,
602        privileges: Vec<MzAclItem>,
603        owner_id: RoleId,
604        temporary_oids: &HashSet<u32>,
605    ) -> Result<NetworkPolicyId, CatalogError> {
606        let oid = self.allocate_oid(temporary_oids)?;
607        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
608        let id = NetworkPolicyId::User(id);
609        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
610    }
611
612    pub fn insert_network_policy(
613        &mut self,
614        id: NetworkPolicyId,
615        name: String,
616        rules: Vec<NetworkPolicyRule>,
617        privileges: Vec<MzAclItem>,
618        owner_id: RoleId,
619        oid: u32,
620    ) -> Result<NetworkPolicyId, CatalogError> {
621        match self.network_policies.insert(
622            NetworkPolicyKey { id },
623            NetworkPolicyValue {
624                name: name.clone(),
625                rules,
626                privileges,
627                owner_id,
628                oid,
629            },
630            self.op_id,
631        ) {
632            Ok(_) => Ok(id),
633            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
634        }
635    }
636
637    /// Updates persisted information about persisted introspection source
638    /// indexes.
639    ///
640    /// Panics if provided id is not a system id.
641    pub fn update_introspection_source_index_gids(
642        &mut self,
643        mappings: impl Iterator<
644            Item = (
645                ClusterId,
646                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
647            ),
648        >,
649    ) -> Result<(), CatalogError> {
650        for (cluster_id, updates) in mappings {
651            for (name, item_id, index_id, oid) in updates {
652                let introspection_source_index = IntrospectionSourceIndex {
653                    cluster_id,
654                    name,
655                    item_id,
656                    index_id,
657                    oid,
658                };
659                let (key, value) = introspection_source_index.into_key_value();
660
661                let prev = self
662                    .introspection_sources
663                    .set(key, Some(value), self.op_id)?;
664                if prev.is_none() {
665                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
666                        "{index_id}"
667                    ))
668                    .into());
669                }
670            }
671        }
672        Ok(())
673    }
674
675    pub fn insert_user_item(
676        &mut self,
677        id: CatalogItemId,
678        global_id: GlobalId,
679        schema_id: SchemaId,
680        item_name: &str,
681        create_sql: String,
682        owner_id: RoleId,
683        privileges: Vec<MzAclItem>,
684        temporary_oids: &HashSet<u32>,
685        versions: BTreeMap<RelationVersion, GlobalId>,
686    ) -> Result<u32, CatalogError> {
687        let oid = self.allocate_oid(temporary_oids)?;
688        self.insert_item(
689            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
690        )?;
691        Ok(oid)
692    }
693
694    pub fn insert_item(
695        &mut self,
696        id: CatalogItemId,
697        oid: u32,
698        global_id: GlobalId,
699        schema_id: SchemaId,
700        item_name: &str,
701        create_sql: String,
702        owner_id: RoleId,
703        privileges: Vec<MzAclItem>,
704        extra_versions: BTreeMap<RelationVersion, GlobalId>,
705    ) -> Result<(), CatalogError> {
706        match self.items.insert(
707            ItemKey { id },
708            ItemValue {
709                schema_id,
710                name: item_name.to_string(),
711                create_sql,
712                owner_id,
713                privileges,
714                oid,
715                global_id,
716                extra_versions,
717            },
718            self.op_id,
719        ) {
720            Ok(_) => Ok(()),
721            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
722        }
723    }
724
725    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
726        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
727    }
728
729    pub fn get_and_increment_id_by(
730        &mut self,
731        key: String,
732        amount: u64,
733    ) -> Result<Vec<u64>, CatalogError> {
734        assert!(
735            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
736            "system item IDs cannot be allocated outside of bootstrap"
737        );
738
739        let current_id = self
740            .id_allocator
741            .items()
742            .get(&IdAllocKey { name: key.clone() })
743            .unwrap_or_else(|| panic!("{key} id allocator missing"))
744            .next_id;
745        let next_id = current_id
746            .checked_add(amount)
747            .ok_or(SqlCatalogError::IdExhaustion)?;
748        let prev = self.id_allocator.set(
749            IdAllocKey { name: key },
750            Some(IdAllocValue { next_id }),
751            self.op_id,
752        )?;
753        assert_eq!(
754            prev,
755            Some(IdAllocValue {
756                next_id: current_id
757            })
758        );
759        Ok((current_id..next_id).collect())
760    }
761
762    pub fn allocate_system_item_ids(
763        &mut self,
764        amount: u64,
765    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
766        assert!(
767            !self.durable_catalog.is_bootstrap_complete(),
768            "we can only allocate system item IDs during bootstrap"
769        );
770        Ok(self
771            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
772            .into_iter()
773            // TODO(alter_table): Use separate ID allocators.
774            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
775            .collect())
776    }
777
778    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
779    /// from the `cluster_id` and `log_variant`.
780    ///
781    /// Introspection source indexes are a special edge case of items. They are considered system
782    /// items, but they are the only system item that can be created by the user at any time. All
783    /// other system items can only be created by the system during the startup of an upgrade.
784    ///
785    /// Furthermore, all other system item IDs are allocated deterministically in the same order
786    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
787    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
788    /// only one of them can successfully write the IDs down to the catalog. This removes the need
789    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
790    ///
791    /// Since introspection IDs can be allocated at any time, read-only instances would either need
792    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
793    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
794    ///
795    /// Introspection source index IDs are 64 bit integers, with the following format (not to
796    /// scale):
797    ///
798    /// -------------------------------------------------------------
799    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
800    /// |--------------------|------------------------|-------------|
801    /// |       8-bits       |         48-bits        |   8-bits    |
802    /// -------------------------------------------------------------
803    ///
804    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
805    ///                          to.
806    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
807    ///                          belongs to.
808    /// Log Variant:             A unique number indicating the log variant this index is on.
809    pub fn allocate_introspection_source_index_id(
810        cluster_id: &ClusterId,
811        log_variant: LogVariant,
812    ) -> (CatalogItemId, GlobalId) {
813        let cluster_variant: u8 = match cluster_id {
814            ClusterId::System(_) => 1,
815            ClusterId::User(_) => 2,
816        };
817        let cluster_id: u64 = cluster_id.inner_id();
818        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
819        assert_eq!(
820            CLUSTER_ID_MASK & cluster_id,
821            0,
822            "invalid cluster ID: {cluster_id}"
823        );
824        let log_variant: u8 = match log_variant {
825            LogVariant::Timely(TimelyLog::Operates) => 1,
826            LogVariant::Timely(TimelyLog::Channels) => 2,
827            LogVariant::Timely(TimelyLog::Elapsed) => 3,
828            LogVariant::Timely(TimelyLog::Histogram) => 4,
829            LogVariant::Timely(TimelyLog::Addresses) => 5,
830            LogVariant::Timely(TimelyLog::Parks) => 6,
831            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
832            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
833            LogVariant::Timely(TimelyLog::Reachability) => 9,
834            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
835            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
836            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
837            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
838            LogVariant::Differential(DifferentialLog::Sharing) => 14,
839            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
840            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
841            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
842            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
843            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
844            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
845            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
846            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
847            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
848            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
849            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
850            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
851            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
852            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
853            LogVariant::Compute(ComputeLog::LirMapping) => 30,
854            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
855            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
856            LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
857        };
858
859        let mut id: u64 = u64::from(cluster_variant) << 56;
860        id |= cluster_id << 8;
861        id |= u64::from(log_variant);
862
863        (
864            CatalogItemId::IntrospectionSourceIndex(id),
865            GlobalId::IntrospectionSourceIndex(id),
866        )
867    }
868
869    pub fn allocate_user_item_ids(
870        &mut self,
871        amount: u64,
872    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
873        Ok(self
874            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
875            .into_iter()
876            // TODO(alter_table): Use separate ID allocators.
877            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
878            .collect())
879    }
880
881    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
882        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
883        Ok(ReplicaId::System(id))
884    }
885
886    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
887        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
888    }
889
890    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
891    /// object.
892    #[mz_ore::instrument]
893    fn allocate_oids(
894        &mut self,
895        amount: u64,
896        temporary_oids: &HashSet<u32>,
897    ) -> Result<Vec<u32>, CatalogError> {
898        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
899        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
900        struct UserOid(u32);
901
902        impl UserOid {
903            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
904                if oid < FIRST_USER_OID {
905                    Err(anyhow!("invalid user OID {oid}"))
906                } else {
907                    Ok(UserOid(oid))
908                }
909            }
910        }
911
912        impl std::ops::AddAssign<u32> for UserOid {
913            fn add_assign(&mut self, rhs: u32) {
914                let (res, overflow) = self.0.overflowing_add(rhs);
915                self.0 = if overflow { FIRST_USER_OID + res } else { res };
916            }
917        }
918
919        if amount > u32::MAX.into() {
920            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
921        }
922
923        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
924        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
925        // However, benchmarking shows that this doesn't make a noticeable difference and the other
926        // approach requires making sure that allocator always stays in-sync which can be
927        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
928        let mut allocated_oids = HashSet::with_capacity(
929            self.databases.len()
930                + self.schemas.len()
931                + self.roles.len()
932                + self.items.len()
933                + self.introspection_sources.len()
934                + temporary_oids.len(),
935        );
936        self.databases.for_values(|_, value| {
937            allocated_oids.insert(value.oid);
938        });
939        self.schemas.for_values(|_, value| {
940            allocated_oids.insert(value.oid);
941        });
942        self.roles.for_values(|_, value| {
943            allocated_oids.insert(value.oid);
944        });
945        self.items.for_values(|_, value| {
946            allocated_oids.insert(value.oid);
947        });
948        self.introspection_sources.for_values(|_, value| {
949            allocated_oids.insert(value.oid);
950        });
951
952        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
953
954        let start_oid: u32 = self
955            .id_allocator
956            .items()
957            .get(&IdAllocKey {
958                name: OID_ALLOC_KEY.to_string(),
959            })
960            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
961            .next_id
962            .try_into()
963            .expect("we should never persist an oid outside of the u32 range");
964        let mut current_oid = UserOid::new(start_oid)
965            .expect("we should never persist an oid outside of user OID range");
966        let mut oids = Vec::new();
967        while oids.len() < u64_to_usize(amount) {
968            if !is_allocated(current_oid.0) {
969                oids.push(current_oid.0);
970            }
971            current_oid += 1;
972
973            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
974                // We've exhausted all possible OIDs and still don't have `amount`.
975                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
976            }
977        }
978
979        let next_id = current_oid.0;
980        let prev = self.id_allocator.set(
981            IdAllocKey {
982                name: OID_ALLOC_KEY.to_string(),
983            },
984            Some(IdAllocValue {
985                next_id: next_id.into(),
986            }),
987            self.op_id,
988        )?;
989        assert_eq!(
990            prev,
991            Some(IdAllocValue {
992                next_id: start_oid.into(),
993            })
994        );
995
996        Ok(oids)
997    }
998
999    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1000    /// object.
1001    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1002        self.allocate_oids(1, temporary_oids)
1003            .map(|oids| oids.into_element())
1004    }
1005
1006    /// Exports the current state of this transaction as a [`Snapshot`].
1007    ///
1008    /// This merges each `TableTransaction`'s initial data with its pending
1009    /// changes to produce the current view, then converts back to proto types.
1010    /// Used to persist transaction state between incremental DDL dry runs so
1011    /// the next dry run's fresh `Transaction` starts in sync with the
1012    /// accumulated `CatalogState`.
1013    pub fn current_snapshot(&self) -> Snapshot {
1014        Snapshot {
1015            databases: self.databases.current_items_proto(),
1016            schemas: self.schemas.current_items_proto(),
1017            roles: self.roles.current_items_proto(),
1018            role_auth: self.role_auth.current_items_proto(),
1019            items: self.items.current_items_proto(),
1020            comments: self.comments.current_items_proto(),
1021            clusters: self.clusters.current_items_proto(),
1022            network_policies: self.network_policies.current_items_proto(),
1023            cluster_replicas: self.cluster_replicas.current_items_proto(),
1024            introspection_sources: self.introspection_sources.current_items_proto(),
1025            id_allocator: self.id_allocator.current_items_proto(),
1026            configs: self.configs.current_items_proto(),
1027            settings: self.settings.current_items_proto(),
1028            system_object_mappings: self.system_gid_mapping.current_items_proto(),
1029            system_configurations: self.system_configurations.current_items_proto(),
1030            cluster_system_configurations: self.cluster_system_configurations.current_items_proto(),
1031            replica_system_configurations: self.replica_system_configurations.current_items_proto(),
1032            default_privileges: self.default_privileges.current_items_proto(),
1033            source_references: self.source_references.current_items_proto(),
1034            system_privileges: self.system_privileges.current_items_proto(),
1035            storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1036            unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1037            txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1038        }
1039    }
1040
1041    pub(crate) fn insert_id_allocator(
1042        &mut self,
1043        name: String,
1044        next_id: u64,
1045    ) -> Result<(), CatalogError> {
1046        match self.id_allocator.insert(
1047            IdAllocKey { name: name.clone() },
1048            IdAllocValue { next_id },
1049            self.op_id,
1050        ) {
1051            Ok(_) => Ok(()),
1052            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1053        }
1054    }
1055
1056    /// Removes the database `id` from the transaction.
1057    ///
1058    /// Returns an error if `id` is not found.
1059    ///
1060    /// Runtime is linear with respect to the total number of databases in the catalog.
1061    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1062    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1063        let prev = self
1064            .databases
1065            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1066        if prev.is_some() {
1067            Ok(())
1068        } else {
1069            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1070        }
1071    }
1072
1073    /// Removes all databases in `databases` from the transaction.
1074    ///
1075    /// Returns an error if any id in `databases` is not found.
1076    ///
1077    /// NOTE: On error, there still may be some databases removed from the transaction. It
1078    /// is up to the caller to either abort the transaction or commit.
1079    pub fn remove_databases(
1080        &mut self,
1081        databases: &BTreeSet<DatabaseId>,
1082    ) -> Result<(), CatalogError> {
1083        if databases.is_empty() {
1084            return Ok(());
1085        }
1086
1087        let to_remove = databases
1088            .iter()
1089            .map(|id| (DatabaseKey { id: *id }, None))
1090            .collect();
1091        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1092        prev.retain(|_k, val| val.is_none());
1093
1094        if !prev.is_empty() {
1095            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1096            return Err(SqlCatalogError::UnknownDatabase(err).into());
1097        }
1098
1099        Ok(())
1100    }
1101
1102    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1103    ///
1104    /// Returns an error if `(database_id, schema_id)` is not found.
1105    ///
1106    /// Runtime is linear with respect to the total number of schemas in the catalog.
1107    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1108    pub fn remove_schema(
1109        &mut self,
1110        database_id: &Option<DatabaseId>,
1111        schema_id: &SchemaId,
1112    ) -> Result<(), CatalogError> {
1113        let prev = self
1114            .schemas
1115            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1116        if prev.is_some() {
1117            Ok(())
1118        } else {
1119            let database_name = match database_id {
1120                Some(id) => format!("{id}."),
1121                None => "".to_string(),
1122            };
1123            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1124        }
1125    }
1126
1127    /// Removes all schemas in `schemas` from the transaction.
1128    ///
1129    /// Returns an error if any id in `schemas` is not found.
1130    ///
1131    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1132    /// is up to the caller to either abort the transaction or commit.
1133    pub fn remove_schemas(
1134        &mut self,
1135        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1136    ) -> Result<(), CatalogError> {
1137        if schemas.is_empty() {
1138            return Ok(());
1139        }
1140
1141        let to_remove = schemas
1142            .iter()
1143            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1144            .collect();
1145        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1146        prev.retain(|_k, v| v.is_none());
1147
1148        if !prev.is_empty() {
1149            let err = prev
1150                .keys()
1151                .map(|k| {
1152                    let db_spec = schemas.get(&k.id).expect("should_exist");
1153                    let db_name = match db_spec {
1154                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1155                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1156                    };
1157                    format!("{}.{}", db_name, k.id)
1158                })
1159                .join(", ");
1160
1161            return Err(SqlCatalogError::UnknownSchema(err).into());
1162        }
1163
1164        Ok(())
1165    }
1166
1167    pub fn remove_source_references(
1168        &mut self,
1169        source_id: CatalogItemId,
1170    ) -> Result<(), CatalogError> {
1171        let deleted = self
1172            .source_references
1173            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1174            .is_some();
1175        if deleted {
1176            Ok(())
1177        } else {
1178            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1179        }
1180    }
1181
1182    /// Removes all user roles in `roles` from the transaction.
1183    ///
1184    /// Returns an error if any id in `roles` is not found.
1185    ///
1186    /// NOTE: On error, there still may be some roles removed from the transaction. It
1187    /// is up to the caller to either abort the transaction or commit.
1188    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1189        assert!(
1190            roles.iter().all(|id| id.is_user()),
1191            "cannot delete non-user roles"
1192        );
1193        self.remove_roles(roles)
1194    }
1195
1196    /// Removes all roles in `roles` from the transaction.
1197    ///
1198    /// Returns an error if any id in `roles` is not found.
1199    ///
1200    /// NOTE: On error, there still may be some roles removed from the transaction. It
1201    /// is up to the caller to either abort the transaction or commit.
1202    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1203        if roles.is_empty() {
1204            return Ok(());
1205        }
1206
1207        let to_remove_keys = roles
1208            .iter()
1209            .map(|role_id| RoleKey { id: *role_id })
1210            .collect::<Vec<_>>();
1211
1212        let to_remove_roles = to_remove_keys
1213            .iter()
1214            .map(|role_key| (role_key.clone(), None))
1215            .collect();
1216
1217        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1218
1219        let to_remove_role_auth = to_remove_keys
1220            .iter()
1221            .map(|role_key| {
1222                (
1223                    RoleAuthKey {
1224                        role_id: role_key.id,
1225                    },
1226                    None,
1227                )
1228            })
1229            .collect();
1230
1231        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1232
1233        prev.retain(|_k, v| v.is_none());
1234        if !prev.is_empty() {
1235            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1236            return Err(SqlCatalogError::UnknownRole(err).into());
1237        }
1238
1239        role_auth_prev.retain(|_k, v| v.is_none());
1240        // The reason we don't to the same check as above is that the role auth table
1241        // is not required to have all roles in the role table.
1242
1243        Ok(())
1244    }
1245
1246    /// Removes all cluster in `clusters` from the transaction.
1247    ///
1248    /// Returns an error if any id in `clusters` is not found.
1249    ///
1250    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1251    /// the caller to either abort the transaction or commit.
1252    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1253        if clusters.is_empty() {
1254            return Ok(());
1255        }
1256
1257        let to_remove = clusters
1258            .iter()
1259            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1260            .collect();
1261        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1262
1263        prev.retain(|_k, v| v.is_none());
1264        if !prev.is_empty() {
1265            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1266            return Err(SqlCatalogError::UnknownCluster(err).into());
1267        }
1268
1269        // Cascade delete introspection sources and cluster replicas.
1270        //
1271        // TODO(benesch): this doesn't seem right. Cascade deletions should
1272        // be entirely the domain of the higher catalog layer, not the
1273        // storage layer.
1274        self.cluster_replicas
1275            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1276        self.introspection_sources
1277            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1278
1279        Ok(())
1280    }
1281
1282    /// Removes the cluster replica `id` from the transaction.
1283    ///
1284    /// Returns an error if `id` is not found.
1285    ///
1286    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1287    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1288    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1289        let deleted = self
1290            .cluster_replicas
1291            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1292            .is_some();
1293        if deleted {
1294            Ok(())
1295        } else {
1296            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1297        }
1298    }
1299
1300    /// Removes all cluster replicas in `replicas` from the transaction.
1301    ///
1302    /// Returns an error if any id in `replicas` is not found.
1303    ///
1304    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1305    /// is up to the caller to either abort the transaction or commit.
1306    pub fn remove_cluster_replicas(
1307        &mut self,
1308        replicas: &BTreeSet<ReplicaId>,
1309    ) -> Result<(), CatalogError> {
1310        if replicas.is_empty() {
1311            return Ok(());
1312        }
1313
1314        let to_remove = replicas
1315            .iter()
1316            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1317            .collect();
1318        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1319
1320        prev.retain(|_k, v| v.is_none());
1321        if !prev.is_empty() {
1322            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1323            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1324        }
1325
1326        Ok(())
1327    }
1328
1329    /// Removes item `id` from the transaction.
1330    ///
1331    /// Returns an error if `id` is not found.
1332    ///
1333    /// Runtime is linear with respect to the total number of items in the catalog.
1334    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1335    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1336        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1337        if prev.is_some() {
1338            Ok(())
1339        } else {
1340            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1341        }
1342    }
1343
1344    /// Removes all items in `ids` from the transaction.
1345    ///
1346    /// Returns an error if any id in `ids` is not found.
1347    ///
1348    /// NOTE: On error, there still may be some items removed from the transaction. It is
1349    /// up to the caller to either abort the transaction or commit.
1350    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1351        if ids.is_empty() {
1352            return Ok(());
1353        }
1354
1355        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1356        let n = self.items.delete_by_keys(ks, self.op_id).len();
1357        if n == ids.len() {
1358            Ok(())
1359        } else {
1360            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1361            let mut unknown = ids.difference(&item_ids);
1362            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1363        }
1364    }
1365
1366    /// Removes all system object mappings in `descriptions` from the transaction.
1367    ///
1368    /// Returns an error if any description in `descriptions` is not found.
1369    ///
1370    /// NOTE: On error, there still may be some items removed from the transaction. It is
1371    /// up to the caller to either abort the transaction or commit.
1372    pub fn remove_system_object_mappings(
1373        &mut self,
1374        descriptions: BTreeSet<SystemObjectDescription>,
1375    ) -> Result<(), CatalogError> {
1376        if descriptions.is_empty() {
1377            return Ok(());
1378        }
1379
1380        let ks: Vec<_> = descriptions
1381            .clone()
1382            .into_iter()
1383            .map(|desc| GidMappingKey {
1384                schema_name: desc.schema_name,
1385                object_type: desc.object_type,
1386                object_name: desc.object_name,
1387            })
1388            .collect();
1389        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1390
1391        if n == descriptions.len() {
1392            Ok(())
1393        } else {
1394            let item_descriptions = self
1395                .system_gid_mapping
1396                .items()
1397                .keys()
1398                .map(|k| SystemObjectDescription {
1399                    schema_name: k.schema_name.clone(),
1400                    object_type: k.object_type.clone(),
1401                    object_name: k.object_name.clone(),
1402                })
1403                .collect();
1404            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1405                format!(
1406                    "{} {}.{}",
1407                    desc.object_type, desc.schema_name, desc.object_name
1408                )
1409            });
1410            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1411        }
1412    }
1413
1414    /// Removes all introspection source indexes in `indexes` from the transaction.
1415    ///
1416    /// Returns an error if any index in `indexes` is not found.
1417    ///
1418    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1419    /// up to the caller to either abort the transaction or commit.
1420    pub fn remove_introspection_source_indexes(
1421        &mut self,
1422        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1423    ) -> Result<(), CatalogError> {
1424        if introspection_source_indexes.is_empty() {
1425            return Ok(());
1426        }
1427
1428        let ks: Vec<_> = introspection_source_indexes
1429            .clone()
1430            .into_iter()
1431            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1432            .collect();
1433        let n = self
1434            .introspection_sources
1435            .delete_by_keys(ks, self.op_id)
1436            .len();
1437        if n == introspection_source_indexes.len() {
1438            Ok(())
1439        } else {
1440            let txn_indexes = self
1441                .introspection_sources
1442                .items()
1443                .keys()
1444                .map(|k| (k.cluster_id, k.name.clone()))
1445                .collect();
1446            let mut unknown = introspection_source_indexes
1447                .difference(&txn_indexes)
1448                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1449            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1450        }
1451    }
1452
1453    /// Updates item `id` in the transaction to `item_name` and `item`.
1454    ///
1455    /// Returns an error if `id` is not found.
1456    ///
1457    /// Runtime is linear with respect to the total number of items in the catalog.
1458    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1459    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1460        let updated =
1461            self.items
1462                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1463        if updated {
1464            Ok(())
1465        } else {
1466            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1467        }
1468    }
1469
1470    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1471    /// corresponding value in `items`.
1472    ///
1473    /// Returns an error if any id in `items` is not found.
1474    ///
1475    /// NOTE: On error, there still may be some items updated in the transaction. It is
1476    /// up to the caller to either abort the transaction or commit.
1477    pub fn update_items(
1478        &mut self,
1479        items: BTreeMap<CatalogItemId, Item>,
1480    ) -> Result<(), CatalogError> {
1481        if items.is_empty() {
1482            return Ok(());
1483        }
1484
1485        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1486        let kvs: Vec<_> = items
1487            .clone()
1488            .into_iter()
1489            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1490            .collect();
1491        let n = self.items.update_by_keys(kvs, self.op_id)?;
1492        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1493        if n == update_ids.len() {
1494            Ok(())
1495        } else {
1496            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1497            let mut unknown = update_ids.difference(&item_ids);
1498            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1499        }
1500    }
1501
1502    /// Updates role `id` in the transaction to `role`.
1503    ///
1504    /// Returns an error if `id` is not found.
1505    ///
1506    /// Runtime is linear with respect to the total number of items in the catalog.
1507    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1508    /// You should model it after [`Self::update_items`].
1509    pub fn update_role(
1510        &mut self,
1511        id: RoleId,
1512        role: Role,
1513        password: PasswordAction,
1514    ) -> Result<(), CatalogError> {
1515        let key = RoleKey { id };
1516        if self.roles.get(&key).is_some() {
1517            let auth_key = RoleAuthKey { role_id: id };
1518
1519            match password {
1520                PasswordAction::Set(new_password) => {
1521                    let hash = mz_auth::hash::scram256_hash(
1522                        &new_password.password,
1523                        &new_password.scram_iterations,
1524                    )
1525                    .expect("password hash should be valid");
1526                    let value = RoleAuthValue {
1527                        password_hash: Some(hash),
1528                        updated_at: SYSTEM_TIME(),
1529                    };
1530
1531                    if self.role_auth.get(&auth_key).is_some() {
1532                        self.role_auth
1533                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1534                    } else {
1535                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1536                    }
1537                }
1538                PasswordAction::Clear => {
1539                    let value = RoleAuthValue {
1540                        password_hash: None,
1541                        updated_at: SYSTEM_TIME(),
1542                    };
1543                    if self.role_auth.get(&auth_key).is_some() {
1544                        self.role_auth
1545                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1546                    }
1547                }
1548                PasswordAction::NoChange => {}
1549            }
1550
1551            self.roles
1552                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1553
1554            Ok(())
1555        } else {
1556            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1557        }
1558    }
1559
1560    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1561    /// corresponding value in `roles`.
1562    ///
1563    /// This function does *not* write role_authentication information to the catalog.
1564    /// It is purely for updating the role itself.
1565    ///
1566    /// Returns an error if any id in `roles` is not found.
1567    ///
1568    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1569    /// up to the caller to either abort the transaction or commit.
1570    pub fn update_roles_without_auth(
1571        &mut self,
1572        roles: BTreeMap<RoleId, Role>,
1573    ) -> Result<(), CatalogError> {
1574        if roles.is_empty() {
1575            return Ok(());
1576        }
1577
1578        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1579        let kvs: Vec<_> = roles
1580            .into_iter()
1581            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1582            .collect();
1583        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1584        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1585
1586        if n == update_role_ids.len() {
1587            Ok(())
1588        } else {
1589            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1590            let mut unknown = update_role_ids.difference(&role_ids);
1591            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1592        }
1593    }
1594
1595    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1596    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1597    ///
1598    /// Panics if provided id is not a system id.
1599    pub fn update_system_object_mappings(
1600        &mut self,
1601        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1602    ) -> Result<(), CatalogError> {
1603        if mappings.is_empty() {
1604            return Ok(());
1605        }
1606
1607        let n = self.system_gid_mapping.update(
1608            |_k, v| {
1609                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1610                    let (_, new_value) = mapping.clone().into_key_value();
1611                    Some(new_value)
1612                } else {
1613                    None
1614                }
1615            },
1616            self.op_id,
1617        )?;
1618
1619        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1620            != mappings.len()
1621        {
1622            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1623            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1624        }
1625
1626        Ok(())
1627    }
1628
1629    /// Updates cluster `id` in the transaction to `cluster`.
1630    ///
1631    /// Returns an error if `id` is not found.
1632    ///
1633    /// Runtime is linear with respect to the total number of clusters in the catalog.
1634    /// DO NOT call this function in a loop.
1635    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1636        let updated = self.clusters.update_by_key(
1637            ClusterKey { id },
1638            cluster.into_key_value().1,
1639            self.op_id,
1640        )?;
1641        if updated {
1642            Ok(())
1643        } else {
1644            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1645        }
1646    }
1647
1648    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1649    ///
1650    /// Returns an error if `replica_id` is not found.
1651    ///
1652    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1653    /// DO NOT call this function in a loop.
1654    pub fn update_cluster_replica(
1655        &mut self,
1656        replica_id: ReplicaId,
1657        replica: ClusterReplica,
1658    ) -> Result<(), CatalogError> {
1659        let updated = self.cluster_replicas.update_by_key(
1660            ClusterReplicaKey { id: replica_id },
1661            replica.into_key_value().1,
1662            self.op_id,
1663        )?;
1664        if updated {
1665            Ok(())
1666        } else {
1667            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1668        }
1669    }
1670
1671    /// Updates database `id` in the transaction to `database`.
1672    ///
1673    /// Returns an error if `id` is not found.
1674    ///
1675    /// Runtime is linear with respect to the total number of databases in the catalog.
1676    /// DO NOT call this function in a loop.
1677    pub fn update_database(
1678        &mut self,
1679        id: DatabaseId,
1680        database: Database,
1681    ) -> Result<(), CatalogError> {
1682        let updated = self.databases.update_by_key(
1683            DatabaseKey { id },
1684            database.into_key_value().1,
1685            self.op_id,
1686        )?;
1687        if updated {
1688            Ok(())
1689        } else {
1690            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1691        }
1692    }
1693
1694    /// Updates schema `schema_id` in the transaction to `schema`.
1695    ///
1696    /// Returns an error if `schema_id` is not found.
1697    ///
1698    /// Runtime is linear with respect to the total number of schemas in the catalog.
1699    /// DO NOT call this function in a loop.
1700    pub fn update_schema(
1701        &mut self,
1702        schema_id: SchemaId,
1703        schema: Schema,
1704    ) -> Result<(), CatalogError> {
1705        let updated = self.schemas.update_by_key(
1706            SchemaKey { id: schema_id },
1707            schema.into_key_value().1,
1708            self.op_id,
1709        )?;
1710        if updated {
1711            Ok(())
1712        } else {
1713            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1714        }
1715    }
1716
1717    /// Updates `network_policy_id` in the transaction to `network policy`.
1718    ///
1719    /// Returns an error if `id` is not found.
1720    ///
1721    /// Runtime is linear with respect to the total number of databases in the catalog.
1722    /// DO NOT call this function in a loop.
1723    pub fn update_network_policy(
1724        &mut self,
1725        id: NetworkPolicyId,
1726        network_policy: NetworkPolicy,
1727    ) -> Result<(), CatalogError> {
1728        let updated = self.network_policies.update_by_key(
1729            NetworkPolicyKey { id },
1730            network_policy.into_key_value().1,
1731            self.op_id,
1732        )?;
1733        if updated {
1734            Ok(())
1735        } else {
1736            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1737        }
1738    }
1739    /// Removes all network policies in `network policies` from the transaction.
1740    ///
1741    /// Returns an error if any id in `network policy` is not found.
1742    ///
1743    /// NOTE: On error, there still may be some roles removed from the transaction. It
1744    /// is up to the caller to either abort the transaction or commit.
1745    pub fn remove_network_policies(
1746        &mut self,
1747        network_policies: &BTreeSet<NetworkPolicyId>,
1748    ) -> Result<(), CatalogError> {
1749        if network_policies.is_empty() {
1750            return Ok(());
1751        }
1752
1753        let to_remove = network_policies
1754            .iter()
1755            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1756            .collect();
1757        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1758        assert!(
1759            prev.iter().all(|(k, _)| k.id.is_user()),
1760            "cannot delete non-user network policy"
1761        );
1762
1763        prev.retain(|_k, v| v.is_none());
1764        if !prev.is_empty() {
1765            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1766            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1767        }
1768
1769        Ok(())
1770    }
1771    /// Set persisted default privilege.
1772    ///
1773    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1774    pub fn set_default_privilege(
1775        &mut self,
1776        role_id: RoleId,
1777        database_id: Option<DatabaseId>,
1778        schema_id: Option<SchemaId>,
1779        object_type: ObjectType,
1780        grantee: RoleId,
1781        privileges: Option<AclMode>,
1782    ) -> Result<(), CatalogError> {
1783        self.default_privileges.set(
1784            DefaultPrivilegesKey {
1785                role_id,
1786                database_id,
1787                schema_id,
1788                object_type,
1789                grantee,
1790            },
1791            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1792            self.op_id,
1793        )?;
1794        Ok(())
1795    }
1796
1797    /// Set persisted default privileges.
1798    pub fn set_default_privileges(
1799        &mut self,
1800        default_privileges: Vec<DefaultPrivilege>,
1801    ) -> Result<(), CatalogError> {
1802        if default_privileges.is_empty() {
1803            return Ok(());
1804        }
1805
1806        let default_privileges = default_privileges
1807            .into_iter()
1808            .map(DurableType::into_key_value)
1809            .map(|(k, v)| (k, Some(v)))
1810            .collect();
1811        self.default_privileges
1812            .set_many(default_privileges, self.op_id)?;
1813        Ok(())
1814    }
1815
1816    /// Set persisted system privilege.
1817    ///
1818    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1819    pub fn set_system_privilege(
1820        &mut self,
1821        grantee: RoleId,
1822        grantor: RoleId,
1823        acl_mode: Option<AclMode>,
1824    ) -> Result<(), CatalogError> {
1825        self.system_privileges.set(
1826            SystemPrivilegesKey { grantee, grantor },
1827            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1828            self.op_id,
1829        )?;
1830        Ok(())
1831    }
1832
1833    /// Set persisted system privileges.
1834    pub fn set_system_privileges(
1835        &mut self,
1836        system_privileges: Vec<MzAclItem>,
1837    ) -> Result<(), CatalogError> {
1838        if system_privileges.is_empty() {
1839            return Ok(());
1840        }
1841
1842        let system_privileges = system_privileges
1843            .into_iter()
1844            .map(DurableType::into_key_value)
1845            .map(|(k, v)| (k, Some(v)))
1846            .collect();
1847        self.system_privileges
1848            .set_many(system_privileges, self.op_id)?;
1849        Ok(())
1850    }
1851
1852    /// Set persisted setting.
1853    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1854        self.settings.set(
1855            SettingKey { name },
1856            value.map(|value| SettingValue { value }),
1857            self.op_id,
1858        )?;
1859        Ok(())
1860    }
1861
1862    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1863        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1864    }
1865
1866    /// Insert persisted introspection source index.
1867    pub fn insert_introspection_source_indexes(
1868        &mut self,
1869        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1870        temporary_oids: &HashSet<u32>,
1871    ) -> Result<(), CatalogError> {
1872        if introspection_source_indexes.is_empty() {
1873            return Ok(());
1874        }
1875
1876        let amount = usize_to_u64(introspection_source_indexes.len());
1877        let oids = self.allocate_oids(amount, temporary_oids)?;
1878        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1879            .into_iter()
1880            .zip_eq(oids)
1881            .map(
1882                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1883                    cluster_id,
1884                    name,
1885                    item_id,
1886                    index_id,
1887                    oid,
1888                },
1889            )
1890            .collect();
1891
1892        for introspection_source_index in introspection_source_indexes {
1893            let (key, value) = introspection_source_index.into_key_value();
1894            self.introspection_sources.insert(key, value, self.op_id)?;
1895        }
1896
1897        Ok(())
1898    }
1899
1900    /// Set persisted system object mappings.
1901    pub fn set_system_object_mappings(
1902        &mut self,
1903        mappings: Vec<SystemObjectMapping>,
1904    ) -> Result<(), CatalogError> {
1905        if mappings.is_empty() {
1906            return Ok(());
1907        }
1908
1909        let mappings = mappings
1910            .into_iter()
1911            .map(DurableType::into_key_value)
1912            .map(|(k, v)| (k, Some(v)))
1913            .collect();
1914        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1915        Ok(())
1916    }
1917
1918    /// Set persisted replica.
1919    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1920        if replicas.is_empty() {
1921            return Ok(());
1922        }
1923
1924        let replicas = replicas
1925            .into_iter()
1926            .map(DurableType::into_key_value)
1927            .map(|(k, v)| (k, Some(v)))
1928            .collect();
1929        self.cluster_replicas.set_many(replicas, self.op_id)?;
1930        Ok(())
1931    }
1932
1933    /// Set persisted configuration.
1934    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1935        match value {
1936            Some(value) => {
1937                let config = Config { key, value };
1938                let (key, value) = config.into_key_value();
1939                self.configs.set(key, Some(value), self.op_id)?;
1940            }
1941            None => {
1942                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1943            }
1944        }
1945        Ok(())
1946    }
1947
1948    /// Get the value of a persisted config.
1949    pub fn get_config(&self, key: String) -> Option<u64> {
1950        self.configs
1951            .get(&ConfigKey { key })
1952            .map(|entry| entry.value)
1953    }
1954
1955    /// Get the value of a persisted setting.
1956    pub fn get_setting(&self, name: String) -> Option<&str> {
1957        self.settings
1958            .get(&SettingKey { name })
1959            .map(|entry| &*entry.value)
1960    }
1961
1962    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1963        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1964            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1965    }
1966
1967    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1968        self.set_setting(
1969            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1970            Some(shard_id.to_string()),
1971        )
1972    }
1973
1974    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1975        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1976            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1977    }
1978
1979    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1980        self.set_setting(
1981            EXPRESSION_CACHE_SHARD_KEY.to_string(),
1982            Some(shard_id.to_string()),
1983        )
1984    }
1985
1986    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
1987    /// match the `with_0dt_deployment_max_wait` "system var" value.
1988    ///
1989    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1990    /// but use it in boot before Launch Darkly is available.
1991    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1992        self.set_config(
1993            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1994            Some(
1995                value
1996                    .as_millis()
1997                    .try_into()
1998                    .expect("max wait fits into u64"),
1999            ),
2000        )
2001    }
2002
2003    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
2004    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2005    /// value.
2006    ///
2007    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2008    /// but use it in boot before Launch Darkly is available.
2009    pub fn set_0dt_deployment_ddl_check_interval(
2010        &mut self,
2011        value: Duration,
2012    ) -> Result<(), CatalogError> {
2013        self.set_config(
2014            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2015            Some(
2016                value
2017                    .as_millis()
2018                    .try_into()
2019                    .expect("ddl check interval fits into u64"),
2020            ),
2021        )
2022    }
2023
2024    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
2025    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
2026    ///
2027    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2028    /// but use it in boot before Launch Darkly is available.
2029    pub fn set_enable_0dt_deployment_panic_after_timeout(
2030        &mut self,
2031        value: bool,
2032    ) -> Result<(), CatalogError> {
2033        self.set_config(
2034            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2035            Some(u64::from(value)),
2036        )
2037    }
2038
2039    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2040    /// match the `with_0dt_deployment_max_wait` "system var" value.
2041    ///
2042    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2043    /// but use it in boot before LaunchDarkly is available.
2044    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2045        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2046    }
2047
2048    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2049    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2050    /// value.
2051    ///
2052    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2053    /// use it in boot before LaunchDarkly is available.
2054    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2055        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2056    }
2057
2058    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2059    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2060    /// var" value.
2061    ///
2062    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2063    /// use it in boot before LaunchDarkly is available.
2064    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2065        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2066    }
2067
2068    /// Updates the catalog `system_config_synced` "config" value to true.
2069    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2070        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2071    }
2072
2073    pub fn update_comment(
2074        &mut self,
2075        object_id: CommentObjectId,
2076        sub_component: Option<usize>,
2077        comment: Option<String>,
2078    ) -> Result<(), CatalogError> {
2079        let key = CommentKey {
2080            object_id,
2081            sub_component,
2082        };
2083        let value = comment.map(|c| CommentValue { comment: c });
2084        self.comments.set(key, value, self.op_id)?;
2085
2086        Ok(())
2087    }
2088
2089    pub fn drop_comments(
2090        &mut self,
2091        object_ids: &BTreeSet<CommentObjectId>,
2092    ) -> Result<(), CatalogError> {
2093        if object_ids.is_empty() {
2094            return Ok(());
2095        }
2096
2097        self.comments
2098            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2099        Ok(())
2100    }
2101
2102    pub fn update_source_references(
2103        &mut self,
2104        source_id: CatalogItemId,
2105        references: Vec<SourceReference>,
2106        updated_at: u64,
2107    ) -> Result<(), CatalogError> {
2108        let key = SourceReferencesKey { source_id };
2109        let value = SourceReferencesValue {
2110            references,
2111            updated_at,
2112        };
2113        self.source_references.set(key, Some(value), self.op_id)?;
2114        Ok(())
2115    }
2116
2117    /// Upserts persisted system configuration `name` to `value`.
2118    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2119        let key = ServerConfigurationKey {
2120            name: name.to_string(),
2121        };
2122        let value = ServerConfigurationValue { value };
2123        self.system_configurations
2124            .set(key, Some(value), self.op_id)?;
2125        Ok(())
2126    }
2127
2128    /// Removes persisted system configuration `name`.
2129    pub fn remove_system_config(&mut self, name: &str) {
2130        let key = ServerConfigurationKey {
2131            name: name.to_string(),
2132        };
2133        self.system_configurations
2134            .set(key, None, self.op_id)
2135            .expect("cannot have uniqueness violation");
2136    }
2137
2138    /// Removes all persisted system configurations.
2139    pub fn clear_system_configs(&mut self) {
2140        self.system_configurations.delete(|_k, _v| true, self.op_id);
2141    }
2142
2143    /// Returns the persisted cluster-coherent scoped system configurations.
2144    pub fn get_cluster_system_configurations(
2145        &self,
2146    ) -> impl Iterator<Item = ClusterSystemConfiguration> + use<'_> {
2147        self.cluster_system_configurations
2148            .items()
2149            .into_iter()
2150            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2151    }
2152
2153    /// Upserts the persisted cluster-coherent scoped configuration `name` = `value`
2154    /// for `cluster_id`.
2155    pub fn upsert_cluster_system_config(
2156        &mut self,
2157        cluster_id: ClusterId,
2158        name: &str,
2159        value: String,
2160    ) -> Result<(), CatalogError> {
2161        let key = ClusterSystemConfigurationKey {
2162            cluster_id,
2163            name: name.to_string(),
2164        };
2165        let value = ClusterSystemConfigurationValue { value };
2166        self.cluster_system_configurations
2167            .set(key, Some(value), self.op_id)?;
2168        Ok(())
2169    }
2170
2171    /// Removes the persisted cluster-coherent scoped configuration `name` for
2172    /// `cluster_id`.
2173    pub fn remove_cluster_system_config(&mut self, cluster_id: ClusterId, name: &str) {
2174        let key = ClusterSystemConfigurationKey {
2175            cluster_id,
2176            name: name.to_string(),
2177        };
2178        self.cluster_system_configurations
2179            .set(key, None, self.op_id)
2180            .expect("cannot have uniqueness violation");
2181    }
2182
2183    /// Returns the persisted replica-local scoped system configurations.
2184    pub fn get_replica_system_configurations(
2185        &self,
2186    ) -> impl Iterator<Item = ReplicaSystemConfiguration> + use<'_> {
2187        self.replica_system_configurations
2188            .items()
2189            .into_iter()
2190            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2191    }
2192
2193    /// Upserts the persisted replica-local scoped configuration `name` = `value`
2194    /// for `replica_id`.
2195    pub fn upsert_replica_system_config(
2196        &mut self,
2197        replica_id: ReplicaId,
2198        name: &str,
2199        value: String,
2200    ) -> Result<(), CatalogError> {
2201        let key = ReplicaSystemConfigurationKey {
2202            replica_id,
2203            name: name.to_string(),
2204        };
2205        let value = ReplicaSystemConfigurationValue { value };
2206        self.replica_system_configurations
2207            .set(key, Some(value), self.op_id)?;
2208        Ok(())
2209    }
2210
2211    /// Removes the persisted replica-local scoped configuration `name` for
2212    /// `replica_id`.
2213    pub fn remove_replica_system_config(&mut self, replica_id: ReplicaId, name: &str) {
2214        let key = ReplicaSystemConfigurationKey {
2215            replica_id,
2216            name: name.to_string(),
2217        };
2218        self.replica_system_configurations
2219            .set(key, None, self.op_id)
2220            .expect("cannot have uniqueness violation");
2221    }
2222
2223    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2224        match self.configs.insert(
2225            ConfigKey { key: key.clone() },
2226            ConfigValue { value },
2227            self.op_id,
2228        ) {
2229            Ok(_) => Ok(()),
2230            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2231        }
2232    }
2233
2234    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2235        self.clusters
2236            .items()
2237            .into_iter()
2238            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2239    }
2240
2241    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2242        self.cluster_replicas
2243            .items()
2244            .into_iter()
2245            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2246    }
2247
2248    pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2249        self.databases
2250            .items()
2251            .into_iter()
2252            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2253    }
2254
2255    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2256        self.roles
2257            .items()
2258            .into_iter()
2259            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2260    }
2261
2262    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2263        self.network_policies
2264            .items()
2265            .into_iter()
2266            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2267    }
2268
2269    pub fn get_system_object_mappings(
2270        &self,
2271    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2272        self.system_gid_mapping
2273            .items()
2274            .into_iter()
2275            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2276    }
2277
2278    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2279        self.schemas
2280            .items()
2281            .into_iter()
2282            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2283    }
2284
2285    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2286        self.system_configurations
2287            .items()
2288            .into_iter()
2289            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2290    }
2291
2292    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2293        let key = SchemaKey { id: *id };
2294        self.schemas
2295            .get(&key)
2296            .map(|v| DurableType::from_key_value(key, v.clone()))
2297    }
2298
2299    pub fn get_introspection_source_indexes(
2300        &self,
2301        cluster_id: ClusterId,
2302    ) -> BTreeMap<&str, (GlobalId, u32)> {
2303        self.introspection_sources
2304            .items()
2305            .into_iter()
2306            .filter(|(k, _v)| k.cluster_id == cluster_id)
2307            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2308            .collect()
2309    }
2310
2311    pub fn get_catalog_content_version(&self) -> Option<&str> {
2312        self.settings
2313            .get(&SettingKey {
2314                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2315            })
2316            .map(|value| &*value.value)
2317    }
2318
2319    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2320        self.settings
2321            .get(&SettingKey {
2322                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2323            })
2324            .map(|value| value.value.clone())
2325    }
2326
2327    /// Commit the current operation within the transaction. This does not cause anything to be
2328    /// written durably, but signals to the current transaction that we are moving on to the next
2329    /// operation.
2330    ///
2331    /// Returns the updates of the committed operation.
2332    #[must_use]
2333    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2334        let updates = self.get_op_updates();
2335        self.commit_op();
2336        updates
2337    }
2338
2339    fn get_op_updates(&self) -> Vec<StateUpdate> {
2340        fn get_collection_op_updates<'a, T>(
2341            table_txn: &'a TableTransaction<T::Key, T::Value>,
2342            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2343            op: Timestamp,
2344        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2345        where
2346            T::Key: Ord + Eq + Clone + Debug,
2347            T::Value: Ord + Clone + Debug,
2348            T: DurableType,
2349        {
2350            table_txn
2351                .pending
2352                .iter()
2353                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2354                .filter_map(move |(k, v)| {
2355                    if v.ts == op {
2356                        let key = k.clone();
2357                        let value = v.value.clone();
2358                        let diff = v.diff.clone().try_into().expect("invalid diff");
2359                        let update = DurableType::from_key_value(key, value);
2360                        let kind = kind_fn(update);
2361                        Some((kind, diff))
2362                    } else {
2363                        None
2364                    }
2365                })
2366        }
2367
2368        fn get_large_collection_op_updates<'a, T>(
2369            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2370            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2371            op: Timestamp,
2372        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2373        where
2374            T::Key: Ord + Eq + Clone + Debug,
2375            T: DurableType<Value = ()>,
2376        {
2377            collection.iter().filter_map(move |(k, diff, ts)| {
2378                if *ts == op {
2379                    let key = k.clone();
2380                    let diff = diff.clone().try_into().expect("invalid diff");
2381                    let update = DurableType::from_key_value(key, ());
2382                    let kind = kind_fn(update);
2383                    Some((kind, diff))
2384                } else {
2385                    None
2386                }
2387            })
2388        }
2389
2390        let Transaction {
2391            durable_catalog: _,
2392            databases,
2393            schemas,
2394            items,
2395            comments,
2396            roles,
2397            role_auth,
2398            clusters,
2399            network_policies,
2400            cluster_replicas,
2401            introspection_sources,
2402            system_gid_mapping,
2403            system_configurations,
2404            cluster_system_configurations,
2405            replica_system_configurations,
2406            default_privileges,
2407            source_references,
2408            system_privileges,
2409            audit_log_updates,
2410            storage_collection_metadata,
2411            unfinalized_shards,
2412            // Not representable as a `StateUpdate`.
2413            id_allocator: _,
2414            configs: _,
2415            settings: _,
2416            txn_wal_shard: _,
2417            upper,
2418            op_id: _,
2419        } = &self;
2420
2421        let updates = std::iter::empty()
2422            .chain(get_collection_op_updates(
2423                roles,
2424                StateUpdateKind::Role,
2425                self.op_id,
2426            ))
2427            .chain(get_collection_op_updates(
2428                role_auth,
2429                StateUpdateKind::RoleAuth,
2430                self.op_id,
2431            ))
2432            .chain(get_collection_op_updates(
2433                databases,
2434                StateUpdateKind::Database,
2435                self.op_id,
2436            ))
2437            .chain(get_collection_op_updates(
2438                schemas,
2439                StateUpdateKind::Schema,
2440                self.op_id,
2441            ))
2442            .chain(get_collection_op_updates(
2443                default_privileges,
2444                StateUpdateKind::DefaultPrivilege,
2445                self.op_id,
2446            ))
2447            .chain(get_collection_op_updates(
2448                system_privileges,
2449                StateUpdateKind::SystemPrivilege,
2450                self.op_id,
2451            ))
2452            .chain(get_collection_op_updates(
2453                system_configurations,
2454                StateUpdateKind::SystemConfiguration,
2455                self.op_id,
2456            ))
2457            .chain(get_collection_op_updates(
2458                cluster_system_configurations,
2459                StateUpdateKind::ClusterSystemConfiguration,
2460                self.op_id,
2461            ))
2462            .chain(get_collection_op_updates(
2463                replica_system_configurations,
2464                StateUpdateKind::ReplicaSystemConfiguration,
2465                self.op_id,
2466            ))
2467            .chain(get_collection_op_updates(
2468                clusters,
2469                StateUpdateKind::Cluster,
2470                self.op_id,
2471            ))
2472            .chain(get_collection_op_updates(
2473                network_policies,
2474                StateUpdateKind::NetworkPolicy,
2475                self.op_id,
2476            ))
2477            .chain(get_collection_op_updates(
2478                introspection_sources,
2479                StateUpdateKind::IntrospectionSourceIndex,
2480                self.op_id,
2481            ))
2482            .chain(get_collection_op_updates(
2483                cluster_replicas,
2484                StateUpdateKind::ClusterReplica,
2485                self.op_id,
2486            ))
2487            .chain(get_collection_op_updates(
2488                system_gid_mapping,
2489                StateUpdateKind::SystemObjectMapping,
2490                self.op_id,
2491            ))
2492            .chain(get_collection_op_updates(
2493                items,
2494                StateUpdateKind::Item,
2495                self.op_id,
2496            ))
2497            .chain(get_collection_op_updates(
2498                comments,
2499                StateUpdateKind::Comment,
2500                self.op_id,
2501            ))
2502            .chain(get_collection_op_updates(
2503                source_references,
2504                StateUpdateKind::SourceReferences,
2505                self.op_id,
2506            ))
2507            .chain(get_collection_op_updates(
2508                storage_collection_metadata,
2509                StateUpdateKind::StorageCollectionMetadata,
2510                self.op_id,
2511            ))
2512            .chain(get_collection_op_updates(
2513                unfinalized_shards,
2514                StateUpdateKind::UnfinalizedShard,
2515                self.op_id,
2516            ))
2517            .chain(get_large_collection_op_updates(
2518                audit_log_updates,
2519                StateUpdateKind::AuditLog,
2520                self.op_id,
2521            ))
2522            .map(|(kind, diff)| StateUpdate {
2523                kind,
2524                ts: upper.clone(),
2525                diff,
2526            })
2527            .collect();
2528
2529        updates
2530    }
2531
2532    pub fn is_savepoint(&self) -> bool {
2533        self.durable_catalog.is_savepoint()
2534    }
2535
2536    fn commit_op(&mut self) {
2537        self.op_id += 1;
2538    }
2539
2540    pub fn op_id(&self) -> Timestamp {
2541        self.op_id
2542    }
2543
2544    pub fn upper(&self) -> mz_repr::Timestamp {
2545        self.upper
2546    }
2547
2548    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2549        let audit_log_updates = self
2550            .audit_log_updates
2551            .into_iter()
2552            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2553            .collect();
2554
2555        let txn_batch = TransactionBatch {
2556            databases: self.databases.pending(),
2557            schemas: self.schemas.pending(),
2558            items: self.items.pending(),
2559            comments: self.comments.pending(),
2560            roles: self.roles.pending(),
2561            role_auth: self.role_auth.pending(),
2562            clusters: self.clusters.pending(),
2563            cluster_replicas: self.cluster_replicas.pending(),
2564            network_policies: self.network_policies.pending(),
2565            introspection_sources: self.introspection_sources.pending(),
2566            id_allocator: self.id_allocator.pending(),
2567            configs: self.configs.pending(),
2568            source_references: self.source_references.pending(),
2569            settings: self.settings.pending(),
2570            system_gid_mapping: self.system_gid_mapping.pending(),
2571            system_configurations: self.system_configurations.pending(),
2572            cluster_system_configurations: self.cluster_system_configurations.pending(),
2573            replica_system_configurations: self.replica_system_configurations.pending(),
2574            default_privileges: self.default_privileges.pending(),
2575            system_privileges: self.system_privileges.pending(),
2576            storage_collection_metadata: self.storage_collection_metadata.pending(),
2577            unfinalized_shards: self.unfinalized_shards.pending(),
2578            txn_wal_shard: self.txn_wal_shard.pending(),
2579            audit_log_updates,
2580            upper: self.upper,
2581        };
2582        (txn_batch, self.durable_catalog)
2583    }
2584
2585    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2586    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2587    /// before proceeding. In general, this must be fatal to the calling process. We do not
2588    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2589    ///
2590    /// The transaction is committed at `commit_ts`.
2591    ///
2592    /// Returns what the upper was directly after the transaction committed.
2593    ///
2594    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2595    /// catalog is not writeable.
2596    #[mz_ore::instrument(level = "debug")]
2597    pub(crate) async fn commit_internal(
2598        self,
2599        commit_ts: mz_repr::Timestamp,
2600    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2601        let (mut txn_batch, durable_catalog) = self.into_parts();
2602        let TransactionBatch {
2603            databases,
2604            schemas,
2605            items,
2606            comments,
2607            roles,
2608            role_auth,
2609            clusters,
2610            cluster_replicas,
2611            network_policies,
2612            introspection_sources,
2613            id_allocator,
2614            configs,
2615            source_references,
2616            settings,
2617            system_gid_mapping,
2618            system_configurations,
2619            cluster_system_configurations,
2620            replica_system_configurations,
2621            default_privileges,
2622            system_privileges,
2623            storage_collection_metadata,
2624            unfinalized_shards,
2625            txn_wal_shard,
2626            audit_log_updates,
2627            upper: _,
2628        } = &mut txn_batch;
2629        // Consolidate in memory because it will likely be faster than consolidating after the
2630        // transaction has been made durable.
2631        differential_dataflow::consolidation::consolidate_updates(databases);
2632        differential_dataflow::consolidation::consolidate_updates(schemas);
2633        differential_dataflow::consolidation::consolidate_updates(items);
2634        differential_dataflow::consolidation::consolidate_updates(comments);
2635        differential_dataflow::consolidation::consolidate_updates(roles);
2636        differential_dataflow::consolidation::consolidate_updates(role_auth);
2637        differential_dataflow::consolidation::consolidate_updates(clusters);
2638        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2639        differential_dataflow::consolidation::consolidate_updates(network_policies);
2640        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2641        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2642        differential_dataflow::consolidation::consolidate_updates(configs);
2643        differential_dataflow::consolidation::consolidate_updates(settings);
2644        differential_dataflow::consolidation::consolidate_updates(source_references);
2645        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2646        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2647        differential_dataflow::consolidation::consolidate_updates(cluster_system_configurations);
2648        differential_dataflow::consolidation::consolidate_updates(replica_system_configurations);
2649        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2650        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2651        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2652        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2653        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2654        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2655
2656        let upper = durable_catalog
2657            .commit_transaction(txn_batch, commit_ts)
2658            .await?;
2659        Ok((durable_catalog, upper))
2660    }
2661
2662    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2663    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2664    /// before proceeding. In general, this must be fatal to the calling process. We do not
2665    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2666    ///
2667    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2668    /// catalog is not writeable.
2669    ///
2670    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2671    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2672    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2673    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2674    ///
2675    /// An alternative implementation would be for the caller to explicitly consume their updates
2676    /// after committing and only then apply the updates in-memory. While this removes assumptions
2677    /// about the caller in this method, in practice it results in duplicate work on every commit.
2678    #[mz_ore::instrument(level = "debug")]
2679    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2680        let op_updates = self.get_op_updates();
2681        assert!(
2682            op_updates.is_empty(),
2683            "unconsumed transaction updates: {op_updates:?}"
2684        );
2685
2686        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2687        // Drain all the updates from the commit since it is assumed that they were already applied.
2688        let updates = durable_storage.sync_updates(upper).await?;
2689        // Writable and savepoint catalogs should have consumed all updates before committing a
2690        // transaction, otherwise the commit was performed with an out of date state.
2691        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2692        // updates before committing.
2693        soft_assert_no_log!(
2694            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2695            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2696        );
2697        Ok(())
2698    }
2699}
2700
2701use crate::durable::async_trait;
2702
2703use super::objects::{RoleAuthKey, RoleAuthValue};
2704
2705#[async_trait]
2706impl StorageTxn for Transaction<'_> {
2707    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2708        self.storage_collection_metadata
2709            .items()
2710            .into_iter()
2711            .map(
2712                |(
2713                    StorageCollectionMetadataKey { id },
2714                    StorageCollectionMetadataValue { shard },
2715                )| { (*id, shard.clone()) },
2716            )
2717            .collect()
2718    }
2719
2720    fn insert_collection_metadata(
2721        &mut self,
2722        metadata: BTreeMap<GlobalId, ShardId>,
2723    ) -> Result<(), StorageError> {
2724        for (id, shard) in metadata {
2725            self.storage_collection_metadata
2726                .insert(
2727                    StorageCollectionMetadataKey { id },
2728                    StorageCollectionMetadataValue {
2729                        shard: shard.clone(),
2730                    },
2731                    self.op_id,
2732                )
2733                .map_err(|err| match err {
2734                    DurableCatalogError::DuplicateKey => {
2735                        StorageError::CollectionMetadataAlreadyExists(id)
2736                    }
2737                    DurableCatalogError::UniquenessViolation => {
2738                        StorageError::PersistShardAlreadyInUse(shard)
2739                    }
2740                    err => StorageError::Generic(anyhow::anyhow!(err)),
2741                })?;
2742        }
2743        Ok(())
2744    }
2745
2746    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2747        let ks: Vec<_> = ids
2748            .into_iter()
2749            .map(|id| StorageCollectionMetadataKey { id })
2750            .collect();
2751        self.storage_collection_metadata
2752            .delete_by_keys(ks, self.op_id)
2753            .into_iter()
2754            .map(
2755                |(
2756                    StorageCollectionMetadataKey { id },
2757                    StorageCollectionMetadataValue { shard },
2758                )| (id, shard),
2759            )
2760            .collect()
2761    }
2762
2763    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2764        self.unfinalized_shards
2765            .items()
2766            .into_iter()
2767            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2768            .collect()
2769    }
2770
2771    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError> {
2772        for shard in s {
2773            match self
2774                .unfinalized_shards
2775                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2776            {
2777                // Inserting duplicate keys has no effect.
2778                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2779                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2780            };
2781        }
2782        Ok(())
2783    }
2784
2785    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2786        let ks: Vec<_> = shards
2787            .into_iter()
2788            .map(|shard| UnfinalizedShardKey { shard })
2789            .collect();
2790        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2791    }
2792
2793    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2794        self.txn_wal_shard
2795            .values()
2796            .iter()
2797            .next()
2798            .map(|TxnWalShardValue { shard }| *shard)
2799    }
2800
2801    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError> {
2802        self.txn_wal_shard
2803            .insert((), TxnWalShardValue { shard }, self.op_id)
2804            .map_err(|err| match err {
2805                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2806                err => StorageError::Generic(anyhow::anyhow!(err)),
2807            })
2808    }
2809}
2810
2811/// Describes a set of changes to apply as the result of a catalog transaction.
2812#[derive(Debug, Clone, Default, PartialEq)]
2813pub struct TransactionBatch {
2814    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2815    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2816    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2817    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2818    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2819    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2820    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2821    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2822    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2823    pub(crate) introspection_sources: Vec<(
2824        proto::ClusterIntrospectionSourceIndexKey,
2825        proto::ClusterIntrospectionSourceIndexValue,
2826        Diff,
2827    )>,
2828    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2829    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2830    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2831    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2832    pub(crate) system_configurations: Vec<(
2833        proto::ServerConfigurationKey,
2834        proto::ServerConfigurationValue,
2835        Diff,
2836    )>,
2837    pub(crate) cluster_system_configurations: Vec<(
2838        proto::ClusterSystemConfigurationKey,
2839        proto::ClusterSystemConfigurationValue,
2840        Diff,
2841    )>,
2842    pub(crate) replica_system_configurations: Vec<(
2843        proto::ReplicaSystemConfigurationKey,
2844        proto::ReplicaSystemConfigurationValue,
2845        Diff,
2846    )>,
2847    pub(crate) default_privileges: Vec<(
2848        proto::DefaultPrivilegesKey,
2849        proto::DefaultPrivilegesValue,
2850        Diff,
2851    )>,
2852    pub(crate) source_references: Vec<(
2853        proto::SourceReferencesKey,
2854        proto::SourceReferencesValue,
2855        Diff,
2856    )>,
2857    pub(crate) system_privileges: Vec<(
2858        proto::SystemPrivilegesKey,
2859        proto::SystemPrivilegesValue,
2860        Diff,
2861    )>,
2862    pub(crate) storage_collection_metadata: Vec<(
2863        proto::StorageCollectionMetadataKey,
2864        proto::StorageCollectionMetadataValue,
2865        Diff,
2866    )>,
2867    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2868    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2869    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2870    /// The upper of the catalog when the transaction started.
2871    pub(crate) upper: mz_repr::Timestamp,
2872}
2873
2874impl TransactionBatch {
2875    pub fn is_empty(&self) -> bool {
2876        let TransactionBatch {
2877            databases,
2878            schemas,
2879            items,
2880            comments,
2881            roles,
2882            role_auth,
2883            clusters,
2884            cluster_replicas,
2885            network_policies,
2886            introspection_sources,
2887            id_allocator,
2888            configs,
2889            settings,
2890            source_references,
2891            system_gid_mapping,
2892            system_configurations,
2893            cluster_system_configurations,
2894            replica_system_configurations,
2895            default_privileges,
2896            system_privileges,
2897            storage_collection_metadata,
2898            unfinalized_shards,
2899            txn_wal_shard,
2900            audit_log_updates,
2901            upper: _,
2902        } = self;
2903        databases.is_empty()
2904            && schemas.is_empty()
2905            && items.is_empty()
2906            && comments.is_empty()
2907            && roles.is_empty()
2908            && role_auth.is_empty()
2909            && clusters.is_empty()
2910            && cluster_replicas.is_empty()
2911            && network_policies.is_empty()
2912            && introspection_sources.is_empty()
2913            && id_allocator.is_empty()
2914            && configs.is_empty()
2915            && settings.is_empty()
2916            && source_references.is_empty()
2917            && system_gid_mapping.is_empty()
2918            && system_configurations.is_empty()
2919            && cluster_system_configurations.is_empty()
2920            && replica_system_configurations.is_empty()
2921            && default_privileges.is_empty()
2922            && system_privileges.is_empty()
2923            && storage_collection_metadata.is_empty()
2924            && unfinalized_shards.is_empty()
2925            && txn_wal_shard.is_empty()
2926            && audit_log_updates.is_empty()
2927    }
2928}
2929
2930#[derive(Debug, Clone, PartialEq, Eq)]
2931struct TransactionUpdate<V> {
2932    value: V,
2933    ts: Timestamp,
2934    diff: Diff,
2935}
2936
2937/// Utility trait to check for plan validity.
2938trait UniqueName {
2939    /// Does the item have a unique name? If yes, we can check for name equality in validity
2940    /// checking.
2941    const HAS_UNIQUE_NAME: bool;
2942    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2943    fn unique_name(&self) -> &str;
2944}
2945
2946mod unique_name {
2947    use crate::durable::objects::*;
2948
2949    macro_rules! impl_unique_name {
2950        ($($t:ty),* $(,)?) => {
2951            $(
2952                impl crate::durable::transaction::UniqueName for $t {
2953                    const HAS_UNIQUE_NAME: bool = true;
2954                    fn unique_name(&self) -> &str {
2955                        &self.name
2956                    }
2957                }
2958            )*
2959        };
2960    }
2961
2962    macro_rules! impl_no_unique_name {
2963        ($($t:ty),* $(,)?) => {
2964            $(
2965                impl crate::durable::transaction::UniqueName for $t {
2966                    const HAS_UNIQUE_NAME: bool = false;
2967                    fn unique_name(&self) -> &str {
2968                       ""
2969                    }
2970                }
2971            )*
2972        };
2973    }
2974
2975    impl_unique_name! {
2976        ClusterReplicaValue,
2977        ClusterValue,
2978        DatabaseValue,
2979        ItemValue,
2980        NetworkPolicyValue,
2981        RoleValue,
2982        SchemaValue,
2983    }
2984
2985    impl_no_unique_name!(
2986        (),
2987        ClusterIntrospectionSourceIndexValue,
2988        ClusterSystemConfigurationValue,
2989        CommentValue,
2990        ConfigValue,
2991        DefaultPrivilegesValue,
2992        GidMappingValue,
2993        IdAllocValue,
2994        ReplicaSystemConfigurationValue,
2995        ServerConfigurationValue,
2996        SettingValue,
2997        SourceReferencesValue,
2998        StorageCollectionMetadataValue,
2999        SystemPrivilegesValue,
3000        TxnWalShardValue,
3001        RoleAuthValue,
3002    );
3003
3004    #[cfg(test)]
3005    mod test {
3006        impl_no_unique_name!(String,);
3007    }
3008}
3009
3010/// TableTransaction emulates some features of a typical SQL transaction over
3011/// table for a Collection.
3012///
3013/// It supports:
3014/// - uniqueness constraints
3015/// - transactional reads and writes (including read-your-writes before commit)
3016///
3017/// `K` is the primary key type. Multiple entries with the same key are disallowed.
3018/// `V` is the an arbitrary value type.
3019#[derive(Debug)]
3020struct TableTransaction<K, V> {
3021    initial: BTreeMap<K, V>,
3022    // The desired updates to keys after commit.
3023    // Invariant: Value is sorted by `ts`.
3024    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
3025    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
3026}
3027
3028impl<K, V> TableTransaction<K, V>
3029where
3030    K: Ord + Eq + Clone + Debug,
3031    V: Ord + Clone + Debug + UniqueName,
3032{
3033    /// Create a new TableTransaction with initial data.
3034    ///
3035    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
3036    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
3037    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
3038    /// over.
3039    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
3040    where
3041        K: RustType<KP>,
3042        V: RustType<VP>,
3043    {
3044        let initial = initial
3045            .into_iter()
3046            .map(RustType::from_proto)
3047            .collect::<Result<_, _>>()?;
3048
3049        Ok(Self {
3050            initial,
3051            pending: BTreeMap::new(),
3052            uniqueness_violation: None,
3053        })
3054    }
3055
3056    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
3057    /// that determines whether there is a uniqueness violation among two values.
3058    fn new_with_uniqueness_fn<KP, VP>(
3059        initial: BTreeMap<KP, VP>,
3060        uniqueness_violation: fn(a: &V, b: &V) -> bool,
3061    ) -> Result<Self, TryFromProtoError>
3062    where
3063        K: RustType<KP>,
3064        V: RustType<VP>,
3065    {
3066        let initial = initial
3067            .into_iter()
3068            .map(RustType::from_proto)
3069            .collect::<Result<_, _>>()?;
3070
3071        Ok(Self {
3072            initial,
3073            pending: BTreeMap::new(),
3074            uniqueness_violation: Some(uniqueness_violation),
3075        })
3076    }
3077
3078    /// Consumes and returns the pending changes and their diffs. `Diff` is
3079    /// guaranteed to be 1 or -1.
3080    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
3081    where
3082        K: RustType<KP>,
3083        V: RustType<VP>,
3084    {
3085        soft_assert_no_log!(self.verify().is_ok());
3086        // Pending describes the desired final state for some keys. K,V pairs should be
3087        // retracted if they already exist and were deleted or are being updated.
3088        self.pending
3089            .into_iter()
3090            .flat_map(|(k, v)| {
3091                let mut v: Vec<_> = v
3092                    .into_iter()
3093                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3094                    .collect();
3095                differential_dataflow::consolidation::consolidate(&mut v);
3096                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3097            })
3098            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3099            .collect()
3100    }
3101
3102    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
3103    ///
3104    /// Runtime is O(n^2), where n is the number of items in `self`, if
3105    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
3106    fn verify(&self) -> Result<(), DurableCatalogError> {
3107        if let Some(uniqueness_violation) = self.uniqueness_violation {
3108            // Compare each value to each other value and ensure they are unique.
3109            let items = self.values();
3110            if V::HAS_UNIQUE_NAME {
3111                let by_name: BTreeMap<_, _> = items
3112                    .iter()
3113                    .enumerate()
3114                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3115                    .collect();
3116                for (i, vi) in items.iter().enumerate() {
3117                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3118                        if i != *j && uniqueness_violation(vi, *vj) {
3119                            return Err(DurableCatalogError::UniquenessViolation);
3120                        }
3121                    }
3122                }
3123            } else {
3124                for (i, vi) in items.iter().enumerate() {
3125                    for (j, vj) in items.iter().enumerate() {
3126                        if i != j && uniqueness_violation(vi, vj) {
3127                            return Err(DurableCatalogError::UniquenessViolation);
3128                        }
3129                    }
3130                }
3131            }
3132        }
3133        soft_assert_no_log!(
3134            self.pending
3135                .values()
3136                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3137            "pending should be sorted by timestamp: {:?}",
3138            self.pending
3139        );
3140        Ok(())
3141    }
3142
3143    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3144    ///
3145    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3146    /// items in `keys`.
3147    fn verify_keys<'a>(
3148        &self,
3149        keys: impl IntoIterator<Item = &'a K>,
3150    ) -> Result<(), DurableCatalogError>
3151    where
3152        K: 'a,
3153    {
3154        if let Some(uniqueness_violation) = self.uniqueness_violation {
3155            let entries: Vec<_> = keys
3156                .into_iter()
3157                .filter_map(|key| self.get(key).map(|value| (key, value)))
3158                .collect();
3159            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3160            for (ki, vi) in self.items() {
3161                for (kj, vj) in &entries {
3162                    if ki != *kj && uniqueness_violation(vi, vj) {
3163                        return Err(DurableCatalogError::UniquenessViolation);
3164                    }
3165                }
3166            }
3167        }
3168        soft_assert_no_log!(self.verify().is_ok());
3169        Ok(())
3170    }
3171
3172    /// Iterates over the items viewable in the current transaction in arbitrary
3173    /// order and applies `f` on all key, value pairs.
3174    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3175        let mut seen = BTreeSet::new();
3176        for k in self.pending.keys() {
3177            seen.insert(k);
3178            let v = self.get(k);
3179            // Deleted items don't exist so shouldn't be visited, but still suppress
3180            // visiting the key later.
3181            if let Some(v) = v {
3182                f(k, v);
3183            }
3184        }
3185        for (k, v) in self.initial.iter() {
3186            // Add on initial items that don't have updates.
3187            if !seen.contains(k) {
3188                f(k, v);
3189            }
3190        }
3191    }
3192
3193    /// Returns the current value of `k`.
3194    fn get(&self, k: &K) -> Option<&V> {
3195        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3196        let mut updates = Vec::with_capacity(pending.len() + 1);
3197        if let Some(initial) = self.initial.get(k) {
3198            updates.push((initial, Diff::ONE));
3199        }
3200        updates.extend(
3201            pending
3202                .into_iter()
3203                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3204        );
3205
3206        differential_dataflow::consolidation::consolidate(&mut updates);
3207        assert!(updates.len() <= 1);
3208        updates.into_iter().next().map(|(v, _)| v)
3209    }
3210
3211    /// Returns the items viewable in the current transaction. The items are
3212    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3213    /// [`Self::for_values`].
3214    // Used by tests.
3215    #[cfg(test)]
3216    fn items_cloned(&self) -> BTreeMap<K, V> {
3217        let mut items = BTreeMap::new();
3218        self.for_values(|k, v| {
3219            items.insert(k.clone(), v.clone());
3220        });
3221        items
3222    }
3223
3224    /// Returns the current items as proto-typed key-value pairs, suitable for
3225    /// constructing a [`Snapshot`]. This merges `initial` and `pending` to
3226    /// produce the current view and converts back to proto types.
3227    fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3228    where
3229        K: RustType<KP>,
3230        V: RustType<VP>,
3231        KP: Ord,
3232    {
3233        let mut items = BTreeMap::new();
3234        self.for_values(|k, v| {
3235            items.insert(k.into_proto(), v.into_proto());
3236        });
3237        items
3238    }
3239
3240    /// Returns the items viewable in the current transaction as references. Returns a map
3241    /// of references.
3242    fn items(&self) -> BTreeMap<&K, &V> {
3243        let mut items = BTreeMap::new();
3244        self.for_values(|k, v| {
3245            items.insert(k, v);
3246        });
3247        items
3248    }
3249
3250    /// Returns the values viewable in the current transaction as references.
3251    fn values(&self) -> BTreeSet<&V> {
3252        let mut items = BTreeSet::new();
3253        self.for_values(|_, v| {
3254            items.insert(v);
3255        });
3256        items
3257    }
3258
3259    /// Returns the number of items viewable in the current transaction.
3260    fn len(&self) -> usize {
3261        let mut count = 0;
3262        self.for_values(|_, _| {
3263            count += 1;
3264        });
3265        count
3266    }
3267
3268    /// Iterates over the items viewable in the current transaction, and provides a
3269    /// map where additional pending items can be inserted, which will be appended
3270    /// to current pending items. Does not verify uniqueness.
3271    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3272        &mut self,
3273        mut f: F,
3274    ) {
3275        let mut pending = BTreeMap::new();
3276        self.for_values(|k, v| f(&mut pending, k, v));
3277        for (k, updates) in pending {
3278            self.pending.entry(k).or_default().extend(updates);
3279        }
3280    }
3281
3282    /// Inserts a new k,v pair.
3283    ///
3284    /// Returns an error if the uniqueness check failed or the key already exists.
3285    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3286        let mut violation = None;
3287        self.for_values(|for_k, for_v| {
3288            if &k == for_k {
3289                violation = Some(DurableCatalogError::DuplicateKey);
3290            }
3291            if let Some(uniqueness_violation) = self.uniqueness_violation {
3292                if uniqueness_violation(for_v, &v) {
3293                    violation = Some(DurableCatalogError::UniquenessViolation);
3294                }
3295            }
3296        });
3297        if let Some(violation) = violation {
3298            return Err(violation);
3299        }
3300        self.pending.entry(k).or_default().push(TransactionUpdate {
3301            value: v,
3302            ts,
3303            diff: Diff::ONE,
3304        });
3305        soft_assert_no_log!(self.verify().is_ok());
3306        Ok(())
3307    }
3308
3309    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3310    /// value should be updated, otherwise `None`. Returns the number of changed
3311    /// entries.
3312    ///
3313    /// Returns an error if the uniqueness check failed.
3314    ///
3315    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3316    /// better performance.
3317    fn update<F: Fn(&K, &V) -> Option<V>>(
3318        &mut self,
3319        f: F,
3320        ts: Timestamp,
3321    ) -> Result<Diff, DurableCatalogError> {
3322        let mut changed = Diff::ZERO;
3323        let mut keys = BTreeSet::new();
3324        // Keep a copy of pending in case of uniqueness violation.
3325        let pending = self.pending.clone();
3326        self.for_values_mut(|p, k, v| {
3327            if let Some(next) = f(k, v) {
3328                changed += Diff::ONE;
3329                keys.insert(k.clone());
3330                let updates = p.entry(k.clone()).or_default();
3331                updates.push(TransactionUpdate {
3332                    value: v.clone(),
3333                    ts,
3334                    diff: Diff::MINUS_ONE,
3335                });
3336                updates.push(TransactionUpdate {
3337                    value: next,
3338                    ts,
3339                    diff: Diff::ONE,
3340                });
3341            }
3342        });
3343        // Check for uniqueness violation.
3344        if let Err(err) = self.verify_keys(&keys) {
3345            self.pending = pending;
3346            Err(err)
3347        } else {
3348            Ok(changed)
3349        }
3350    }
3351
3352    /// Updates `k`, `v` pair if `k` already exists in `self`.
3353    ///
3354    /// Returns `true` if `k` was updated, `false` otherwise.
3355    /// Returns an error if the uniqueness check failed.
3356    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3357        if let Some(cur_v) = self.get(&k) {
3358            if v != *cur_v {
3359                self.set(k, Some(v), ts)?;
3360            }
3361            Ok(true)
3362        } else {
3363            Ok(false)
3364        }
3365    }
3366
3367    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3368    ///
3369    /// Returns the number of changed entries.
3370    /// Returns an error if the uniqueness check failed.
3371    fn update_by_keys(
3372        &mut self,
3373        kvs: impl IntoIterator<Item = (K, V)>,
3374        ts: Timestamp,
3375    ) -> Result<Diff, DurableCatalogError> {
3376        let kvs: Vec<_> = kvs
3377            .into_iter()
3378            .filter_map(|(k, v)| match self.get(&k) {
3379                // Record if updating this entry would be a no-op.
3380                Some(cur_v) => Some((*cur_v == v, k, v)),
3381                None => None,
3382            })
3383            .collect();
3384        let changed = kvs.len();
3385        let changed =
3386            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3387        let kvs = kvs
3388            .into_iter()
3389            // Filter out no-ops to save some work.
3390            .filter(|(no_op, _, _)| !no_op)
3391            .map(|(_, k, v)| (k, Some(v)))
3392            .collect();
3393        self.set_many(kvs, ts)?;
3394        Ok(changed)
3395    }
3396
3397    /// Set the value for a key. Returns the previous entry if the key existed,
3398    /// otherwise None.
3399    ///
3400    /// Returns an error if the uniqueness check failed.
3401    ///
3402    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3403    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3404        let prev = self.get(&k).cloned();
3405        let entry = self.pending.entry(k.clone()).or_default();
3406        let restore_len = entry.len();
3407
3408        match (v, prev.clone()) {
3409            (Some(v), Some(prev)) => {
3410                entry.push(TransactionUpdate {
3411                    value: prev,
3412                    ts,
3413                    diff: Diff::MINUS_ONE,
3414                });
3415                entry.push(TransactionUpdate {
3416                    value: v,
3417                    ts,
3418                    diff: Diff::ONE,
3419                });
3420            }
3421            (Some(v), None) => {
3422                entry.push(TransactionUpdate {
3423                    value: v,
3424                    ts,
3425                    diff: Diff::ONE,
3426                });
3427            }
3428            (None, Some(prev)) => {
3429                entry.push(TransactionUpdate {
3430                    value: prev,
3431                    ts,
3432                    diff: Diff::MINUS_ONE,
3433                });
3434            }
3435            (None, None) => {}
3436        }
3437
3438        // Check for uniqueness violation.
3439        if let Err(err) = self.verify_keys([&k]) {
3440            // Revert self.pending to the state it was in before calling this
3441            // function.
3442            let pending = self.pending.get_mut(&k).expect("inserted above");
3443            pending.truncate(restore_len);
3444            Err(err)
3445        } else {
3446            Ok(prev)
3447        }
3448    }
3449
3450    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3451    /// otherwise None.
3452    ///
3453    /// Returns an error if any uniqueness check failed.
3454    fn set_many(
3455        &mut self,
3456        kvs: BTreeMap<K, Option<V>>,
3457        ts: Timestamp,
3458    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3459        if kvs.is_empty() {
3460            return Ok(BTreeMap::new());
3461        }
3462
3463        let mut prevs = BTreeMap::new();
3464        let mut restores = BTreeMap::new();
3465
3466        for (k, v) in kvs {
3467            let prev = self.get(&k).cloned();
3468            let entry = self.pending.entry(k.clone()).or_default();
3469            restores.insert(k.clone(), entry.len());
3470
3471            match (v, prev.clone()) {
3472                (Some(v), Some(prev)) => {
3473                    entry.push(TransactionUpdate {
3474                        value: prev,
3475                        ts,
3476                        diff: Diff::MINUS_ONE,
3477                    });
3478                    entry.push(TransactionUpdate {
3479                        value: v,
3480                        ts,
3481                        diff: Diff::ONE,
3482                    });
3483                }
3484                (Some(v), None) => {
3485                    entry.push(TransactionUpdate {
3486                        value: v,
3487                        ts,
3488                        diff: Diff::ONE,
3489                    });
3490                }
3491                (None, Some(prev)) => {
3492                    entry.push(TransactionUpdate {
3493                        value: prev,
3494                        ts,
3495                        diff: Diff::MINUS_ONE,
3496                    });
3497                }
3498                (None, None) => {}
3499            }
3500
3501            prevs.insert(k, prev);
3502        }
3503
3504        // Check for uniqueness violation.
3505        if let Err(err) = self.verify_keys(prevs.keys()) {
3506            for (k, restore_len) in restores {
3507                // Revert self.pending to the state it was in before calling this
3508                // function.
3509                let pending = self.pending.get_mut(&k).expect("inserted above");
3510                pending.truncate(restore_len);
3511            }
3512            Err(err)
3513        } else {
3514            Ok(prevs)
3515        }
3516    }
3517
3518    /// Deletes items for which `f` returns true. Returns the keys and values of
3519    /// the deleted entries.
3520    ///
3521    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3522    /// better performance.
3523    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3524        let mut deleted = Vec::new();
3525        self.for_values_mut(|p, k, v| {
3526            if f(k, v) {
3527                deleted.push((k.clone(), v.clone()));
3528                p.entry(k.clone()).or_default().push(TransactionUpdate {
3529                    value: v.clone(),
3530                    ts,
3531                    diff: Diff::MINUS_ONE,
3532                });
3533            }
3534        });
3535        soft_assert_no_log!(self.verify().is_ok());
3536        deleted
3537    }
3538
3539    /// Deletes item with key `k`.
3540    ///
3541    /// Returns the value of the deleted entry, if it existed.
3542    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3543        self.set(k, None, ts)
3544            .expect("deleting an entry cannot violate uniqueness")
3545    }
3546
3547    /// Deletes items with key in `ks`.
3548    ///
3549    /// Returns the keys and values of the deleted entries.
3550    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3551        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3552        let prevs = self
3553            .set_many(kvs, ts)
3554            .expect("deleting entries cannot violate uniqueness");
3555        prevs
3556            .into_iter()
3557            .filter_map(|(k, v)| v.map(|v| (k, v)))
3558            .collect()
3559    }
3560}
3561
3562#[cfg(test)]
3563#[allow(clippy::unwrap_used)]
3564mod tests {
3565    use super::*;
3566
3567    use mz_controller::clusters::ReplicaLogging;
3568    use mz_ore::now::SYSTEM_TIME;
3569    use mz_ore::{assert_none, assert_ok};
3570    use mz_persist_client::cache::PersistClientCache;
3571    use mz_persist_types::PersistLocation;
3572    use semver::Version;
3573
3574    use crate::durable::{
3575        ReplicaConfig, ReplicaLocation, TestCatalogStateBuilder, test_bootstrap_args,
3576    };
3577    use crate::memory;
3578
3579    #[mz_ore::test]
3580    fn test_table_transaction_simple() {
3581        fn uniqueness_violation(a: &String, b: &String) -> bool {
3582            a == b
3583        }
3584        let mut table = TableTransaction::new_with_uniqueness_fn(
3585            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3586            uniqueness_violation,
3587        )
3588        .unwrap();
3589
3590        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3591        // for DurableCatalogError.
3592        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3593        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3594        assert!(
3595            table
3596                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3597                .is_err()
3598        );
3599        assert!(
3600            table
3601                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3602                .is_err()
3603        );
3604    }
3605
3606    #[mz_ore::test]
3607    fn test_table_transaction() {
3608        fn uniqueness_violation(a: &String, b: &String) -> bool {
3609            a == b
3610        }
3611        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3612
3613        fn commit(
3614            table: &mut BTreeMap<Vec<u8>, String>,
3615            mut pending: Vec<(Vec<u8>, String, Diff)>,
3616        ) {
3617            // Sort by diff so that we process retractions first.
3618            pending.sort_by(|a, b| a.2.cmp(&b.2));
3619            for (k, v, diff) in pending {
3620                if diff == Diff::MINUS_ONE {
3621                    let prev = table.remove(&k);
3622                    assert_eq!(prev, Some(v));
3623                } else if diff == Diff::ONE {
3624                    let prev = table.insert(k, v);
3625                    assert_eq!(prev, None);
3626                } else {
3627                    panic!("unexpected diff: {diff}");
3628                }
3629            }
3630        }
3631
3632        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3633        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3634        let mut table_txn =
3635            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3636        assert_eq!(table_txn.items_cloned(), table);
3637        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3638        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3639        assert_eq!(
3640            table_txn.items_cloned(),
3641            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3642        );
3643        assert_eq!(
3644            table_txn
3645                .update(|_k, _v| Some("v3".to_string()), 2)
3646                .unwrap(),
3647            Diff::ONE
3648        );
3649
3650        // Uniqueness violation.
3651        table_txn
3652            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3653            .unwrap_err();
3654
3655        table_txn
3656            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3657            .unwrap();
3658        assert_eq!(
3659            table_txn.items_cloned(),
3660            BTreeMap::from([
3661                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3662                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3663            ])
3664        );
3665        let err = table_txn
3666            .update(|_k, _v| Some("v1".to_string()), 5)
3667            .unwrap_err();
3668        assert!(
3669            matches!(err, DurableCatalogError::UniquenessViolation),
3670            "unexpected err: {err:?}"
3671        );
3672        let pending = table_txn.pending();
3673        assert_eq!(
3674            pending,
3675            vec![
3676                (
3677                    1i64.to_le_bytes().to_vec(),
3678                    "v1".to_string(),
3679                    Diff::MINUS_ONE
3680                ),
3681                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3682                (
3683                    2i64.to_le_bytes().to_vec(),
3684                    "v2".to_string(),
3685                    Diff::MINUS_ONE
3686                ),
3687                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3688            ]
3689        );
3690        commit(&mut table, pending);
3691        assert_eq!(
3692            table,
3693            BTreeMap::from([
3694                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3695                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3696            ])
3697        );
3698
3699        let mut table_txn =
3700            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3701        // Deleting then creating an item that has a uniqueness violation should work.
3702        assert_eq!(
3703            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3704            1
3705        );
3706        table_txn
3707            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3708            .unwrap();
3709        // Uniqueness violation in value.
3710        table_txn
3711            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3712            .unwrap_err();
3713        // Key already exists, expect error.
3714        table_txn
3715            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3716            .unwrap_err();
3717        assert_eq!(
3718            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3719            1
3720        );
3721        // Both the inserts work now because the key and uniqueness violation are gone.
3722        table_txn
3723            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3724            .unwrap();
3725        table_txn
3726            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3727            .unwrap();
3728        let pending = table_txn.pending();
3729        assert_eq!(
3730            pending,
3731            vec![
3732                (
3733                    1i64.to_le_bytes().to_vec(),
3734                    "v3".to_string(),
3735                    Diff::MINUS_ONE
3736                ),
3737                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3738                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3739            ]
3740        );
3741        commit(&mut table, pending);
3742        assert_eq!(
3743            table,
3744            BTreeMap::from([
3745                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3746                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3747                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3748            ])
3749        );
3750
3751        let mut table_txn =
3752            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3753        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3754        table_txn
3755            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3756            .unwrap();
3757
3758        commit(&mut table, table_txn.pending());
3759        assert_eq!(
3760            table,
3761            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3762        );
3763
3764        let mut table_txn =
3765            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3766        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3767        table_txn
3768            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3769            .unwrap();
3770        commit(&mut table, table_txn.pending());
3771        assert_eq!(
3772            table,
3773            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3774        );
3775
3776        // Verify we don't try to delete v3 or v4 during commit.
3777        let mut table_txn =
3778            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3779        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3780        table_txn
3781            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3782            .unwrap();
3783        table_txn
3784            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3785            .unwrap_err();
3786        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3787        table_txn
3788            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3789            .unwrap();
3790        commit(&mut table, table_txn.pending());
3791        assert_eq!(
3792            table.clone().into_iter().collect::<Vec<_>>(),
3793            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3794        );
3795
3796        // Test `set`.
3797        let mut table_txn =
3798            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3799        // Uniqueness violation.
3800        table_txn
3801            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3802            .unwrap_err();
3803        table_txn
3804            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3805            .unwrap();
3806        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3807        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3808        let pending = table_txn.pending();
3809        assert_eq!(
3810            pending,
3811            vec![
3812                (
3813                    1i64.to_le_bytes().to_vec(),
3814                    "v5".to_string(),
3815                    Diff::MINUS_ONE
3816                ),
3817                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3818            ]
3819        );
3820        commit(&mut table, pending);
3821        assert_eq!(
3822            table,
3823            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3824        );
3825
3826        // Duplicate `set`.
3827        let mut table_txn =
3828            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3829        table_txn
3830            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3831            .unwrap();
3832        let pending = table_txn.pending::<Vec<u8>, String>();
3833        assert!(pending.is_empty());
3834
3835        // Test `set_many`.
3836        let mut table_txn =
3837            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3838        // Uniqueness violation.
3839        table_txn
3840            .set_many(
3841                BTreeMap::from([
3842                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3843                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3844                ]),
3845                0,
3846            )
3847            .unwrap_err();
3848        table_txn
3849            .set_many(
3850                BTreeMap::from([
3851                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3852                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3853                ]),
3854                1,
3855            )
3856            .unwrap();
3857        table_txn
3858            .set_many(
3859                BTreeMap::from([
3860                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3861                    (3i64.to_le_bytes().to_vec(), None),
3862                ]),
3863                2,
3864            )
3865            .unwrap();
3866        let pending = table_txn.pending();
3867        assert_eq!(
3868            pending,
3869            vec![
3870                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3871                (
3872                    3i64.to_le_bytes().to_vec(),
3873                    "v6".to_string(),
3874                    Diff::MINUS_ONE
3875                ),
3876                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3877            ]
3878        );
3879        commit(&mut table, pending);
3880        assert_eq!(
3881            table,
3882            BTreeMap::from([
3883                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3884                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3885            ])
3886        );
3887
3888        // Duplicate `set_many`.
3889        let mut table_txn =
3890            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3891        table_txn
3892            .set_many(
3893                BTreeMap::from([
3894                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3895                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3896                ]),
3897                0,
3898            )
3899            .unwrap();
3900        let pending = table_txn.pending::<Vec<u8>, String>();
3901        assert!(pending.is_empty());
3902        commit(&mut table, pending);
3903        assert_eq!(
3904            table,
3905            BTreeMap::from([
3906                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3907                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3908            ])
3909        );
3910
3911        // Test `update_by_key`
3912        let mut table_txn =
3913            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3914        // Uniqueness violation.
3915        table_txn
3916            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3917            .unwrap_err();
3918        assert!(
3919            table_txn
3920                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3921                .unwrap()
3922        );
3923        assert!(
3924            !table_txn
3925                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3926                .unwrap()
3927        );
3928        let pending = table_txn.pending();
3929        assert_eq!(
3930            pending,
3931            vec![
3932                (
3933                    1i64.to_le_bytes().to_vec(),
3934                    "v6".to_string(),
3935                    Diff::MINUS_ONE
3936                ),
3937                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3938            ]
3939        );
3940        commit(&mut table, pending);
3941        assert_eq!(
3942            table,
3943            BTreeMap::from([
3944                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3945                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3946            ])
3947        );
3948
3949        // Duplicate `update_by_key`.
3950        let mut table_txn =
3951            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3952        assert!(
3953            table_txn
3954                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3955                .unwrap()
3956        );
3957        let pending = table_txn.pending::<Vec<u8>, String>();
3958        assert!(pending.is_empty());
3959        commit(&mut table, pending);
3960        assert_eq!(
3961            table,
3962            BTreeMap::from([
3963                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3964                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3965            ])
3966        );
3967
3968        // Test `update_by_keys`
3969        let mut table_txn =
3970            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3971        // Uniqueness violation.
3972        table_txn
3973            .update_by_keys(
3974                [
3975                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3976                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3977                ],
3978                0,
3979            )
3980            .unwrap_err();
3981        let n = table_txn
3982            .update_by_keys(
3983                [
3984                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3985                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3986                ],
3987                1,
3988            )
3989            .unwrap();
3990        assert_eq!(n, Diff::ONE);
3991        let n = table_txn
3992            .update_by_keys(
3993                [
3994                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3995                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3996                ],
3997                2,
3998            )
3999            .unwrap();
4000        assert_eq!(n, Diff::ZERO);
4001        let pending = table_txn.pending();
4002        assert_eq!(
4003            pending,
4004            vec![
4005                (
4006                    1i64.to_le_bytes().to_vec(),
4007                    "v8".to_string(),
4008                    Diff::MINUS_ONE
4009                ),
4010                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
4011            ]
4012        );
4013        commit(&mut table, pending);
4014        assert_eq!(
4015            table,
4016            BTreeMap::from([
4017                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4018                (42i64.to_le_bytes().to_vec(), "v7".to_string())
4019            ])
4020        );
4021
4022        // Duplicate `update_by_keys`.
4023        let mut table_txn =
4024            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4025        let n = table_txn
4026            .update_by_keys(
4027                [
4028                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4029                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
4030                ],
4031                0,
4032            )
4033            .unwrap();
4034        assert_eq!(n, Diff::from(2));
4035        let pending = table_txn.pending::<Vec<u8>, String>();
4036        assert!(pending.is_empty());
4037        commit(&mut table, pending);
4038        assert_eq!(
4039            table,
4040            BTreeMap::from([
4041                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4042                (42i64.to_le_bytes().to_vec(), "v7".to_string())
4043            ])
4044        );
4045
4046        // Test `delete_by_key`
4047        let mut table_txn =
4048            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4049        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
4050        assert_eq!(prev, Some("v9".to_string()));
4051        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
4052        assert_none!(prev);
4053        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
4054        assert_none!(prev);
4055        let pending = table_txn.pending();
4056        assert_eq!(
4057            pending,
4058            vec![(
4059                1i64.to_le_bytes().to_vec(),
4060                "v9".to_string(),
4061                Diff::MINUS_ONE
4062            ),]
4063        );
4064        commit(&mut table, pending);
4065        assert_eq!(
4066            table,
4067            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
4068        );
4069
4070        // Test `delete_by_keys`
4071        let mut table_txn =
4072            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4073        let prevs = table_txn.delete_by_keys(
4074            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4075            0,
4076        );
4077        assert_eq!(
4078            prevs,
4079            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
4080        );
4081        let prevs = table_txn.delete_by_keys(
4082            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4083            1,
4084        );
4085        assert_eq!(prevs, vec![]);
4086        let prevs = table_txn.delete_by_keys(
4087            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4088            2,
4089        );
4090        assert_eq!(prevs, vec![]);
4091        let pending = table_txn.pending();
4092        assert_eq!(
4093            pending,
4094            vec![(
4095                42i64.to_le_bytes().to_vec(),
4096                "v7".to_string(),
4097                Diff::MINUS_ONE
4098            ),]
4099        );
4100        commit(&mut table, pending);
4101        assert_eq!(table, BTreeMap::new());
4102    }
4103
4104    #[mz_ore::test(tokio::test)]
4105    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
4106    async fn test_savepoint() {
4107        const VERSION: Version = Version::new(26, 0, 0);
4108        let mut persist_cache = PersistClientCache::new_no_metrics();
4109        persist_cache.cfg.build_version = VERSION;
4110        let persist_client = persist_cache
4111            .open(PersistLocation::new_in_mem())
4112            .await
4113            .unwrap();
4114        let state_builder = TestCatalogStateBuilder::new(persist_client)
4115            .with_default_deploy_generation()
4116            .with_version(VERSION);
4117
4118        // Initialize catalog.
4119        let _ = state_builder
4120            .clone()
4121            .unwrap_build()
4122            .await
4123            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4124            .await
4125            .unwrap()
4126            .0;
4127        let mut savepoint_state = state_builder
4128            .unwrap_build()
4129            .await
4130            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4131            .await
4132            .unwrap()
4133            .0;
4134
4135        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4136        assert!(!initial_snapshot.is_empty());
4137
4138        let db_name = "db";
4139        let db_owner = RoleId::User(42);
4140        let db_privileges = Vec::new();
4141        let mut txn = savepoint_state.transaction().await.unwrap();
4142        let (db_id, db_oid) = txn
4143            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4144            .unwrap();
4145        let commit_ts = txn.upper();
4146        txn.commit_internal(commit_ts).await.unwrap();
4147        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4148        let update = updates.into_element();
4149
4150        assert_eq!(update.diff, StateDiff::Addition);
4151
4152        let db = match update.kind {
4153            memory::objects::StateUpdateKind::Database(db) => db,
4154            update => panic!("unexpected update: {update:?}"),
4155        };
4156
4157        assert_eq!(db_id, db.id);
4158        assert_eq!(db_oid, db.oid);
4159        assert_eq!(db_name, db.name);
4160        assert_eq!(db_owner, db.owner_id);
4161        assert_eq!(db_privileges, db.privileges);
4162    }
4163
4164    /// Regression test for DB-147: inserting a replica with an explicit id must not consume the
4165    /// `IdAlloc` counter, and the durable allocator must advance independently so a later
4166    /// allocation never collides with an explicitly inserted id.
4167    #[mz_ore::test(tokio::test)]
4168    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
4169    async fn test_insert_replica_with_id_does_not_consume_allocator() {
4170        const VERSION: Version = Version::new(26, 0, 0);
4171        let mut persist_cache = PersistClientCache::new_no_metrics();
4172        persist_cache.cfg.build_version = VERSION;
4173        let persist_client = persist_cache
4174            .open(PersistLocation::new_in_mem())
4175            .await
4176            .unwrap();
4177        let state_builder = TestCatalogStateBuilder::new(persist_client)
4178            .with_default_deploy_generation()
4179            .with_version(VERSION);
4180        let mut state = state_builder
4181            .unwrap_build()
4182            .await
4183            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4184            .await
4185            .unwrap()
4186            .0;
4187
4188        // The cluster does not need to exist: `insert_cluster_replica_with_id` only writes a
4189        // `cluster_replicas` row and does not validate the referenced cluster.
4190        let cluster_id = ClusterId::User(1);
4191        let owner_id = RoleId::User(1);
4192        let config = ReplicaConfig {
4193            location: ReplicaLocation::Managed {
4194                size: "1".to_string(),
4195                availability_zones: Vec::new(),
4196                internal: false,
4197                billed_as: None,
4198                pending: false,
4199            },
4200            logging: ReplicaLogging {
4201                log_logging: false,
4202                interval: Some(Duration::from_secs(1)),
4203            },
4204        };
4205
4206        // Step 1: allocate one user replica id out-of-band via the durable allocator.
4207        let commit_ts = state.current_upper().await;
4208        let a = state
4209            .allocate_user_replica_ids(1, commit_ts)
4210            .await
4211            .unwrap()
4212            .into_element();
4213        assert!(a.is_user());
4214
4215        // Step 2: insert a replica with that explicit id and commit.
4216        let mut txn = state.transaction().await.unwrap();
4217        txn.insert_cluster_replica_with_id(cluster_id, a, "explicit", config, owner_id)
4218            .unwrap();
4219        let commit_ts = txn.upper();
4220        txn.commit_internal(commit_ts).await.unwrap();
4221
4222        // Step 3: allocate one more user replica id.
4223        let commit_ts = state.current_upper().await;
4224        let b = state
4225            .allocate_user_replica_ids(1, commit_ts)
4226            .await
4227            .unwrap()
4228            .into_element();
4229
4230        // Step 4: the explicit insert consumed zero ids, so the allocator's next value is the
4231        // direct successor of the first allocation.
4232        assert_eq!(b.inner_id(), a.inner_id() + 1);
4233
4234        // Step 5: the explicitly inserted replica is present in the committed state.
4235        let txn = state.transaction().await.unwrap();
4236        let found = txn
4237            .get_cluster_replicas()
4238            .any(|replica| replica.replica_id == a);
4239        assert!(found, "explicitly inserted replica {a} not found");
4240    }
4241
4242    #[mz_ore::test]
4243    fn test_allocate_introspection_source_index_id() {
4244        let cluster_variant: u8 = 0b0000_0001;
4245        let cluster_id_inner: u64 =
4246            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4247        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4248
4249        let cluster_id = ClusterId::System(cluster_id_inner);
4250        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4251
4252        let introspection_source_index_id: u64 =
4253            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4254
4255        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4256        {
4257            let mut cluster_variant_mask = 0xFF << 56;
4258            cluster_variant_mask &= introspection_source_index_id;
4259            cluster_variant_mask >>= 56;
4260            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4261        }
4262
4263        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4264        {
4265            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4266            cluster_id_inner_mask &= introspection_source_index_id;
4267            cluster_id_inner_mask >>= 8;
4268            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4269        }
4270
4271        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4272        {
4273            let mut log_variant_mask = 0xFF;
4274            log_variant_mask &= introspection_source_index_id;
4275            assert_eq!(
4276                log_variant_mask,
4277                u64::from(timely_messages_received_log_variant)
4278            );
4279        }
4280
4281        let (catalog_item_id, global_id) =
4282            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4283
4284        assert_eq!(
4285            catalog_item_id,
4286            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4287        );
4288        assert_eq!(
4289            global_id,
4290            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4291        );
4292    }
4293}