mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35    RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53    ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54    Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55    DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57    ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67    SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68    SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69    USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70    USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
77/// An operation also logically groups multiple catalog updates together.
78#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81    #[derivative(Debug = "ignore")]
82    #[derivative(PartialEq = "ignore")]
83    durable_catalog: &'a mut dyn DurableCatalogState,
84    databases: TableTransaction<DatabaseKey, DatabaseValue>,
85    schemas: TableTransaction<SchemaKey, SchemaValue>,
86    items: TableTransaction<ItemKey, ItemValue>,
87    comments: TableTransaction<CommentKey, CommentValue>,
88    roles: TableTransaction<RoleKey, RoleValue>,
89    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90    clusters: TableTransaction<ClusterKey, ClusterValue>,
91    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92    introspection_sources:
93        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95    configs: TableTransaction<ConfigKey, ConfigValue>,
96    settings: TableTransaction<SettingKey, SettingValue>,
97    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103    storage_collection_metadata:
104        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107    // Don't make this a table transaction so that it's not read into the
108    // in-memory cache.
109    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110    /// The upper of `durable_catalog` at the start of the transaction.
111    upper: mz_repr::Timestamp,
112    /// The ID of the current operation of this transaction.
113    op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117    pub fn new(
118        durable_catalog: &'a mut dyn DurableCatalogState,
119        Snapshot {
120            databases,
121            schemas,
122            roles,
123            role_auth,
124            items,
125            comments,
126            clusters,
127            network_policies,
128            cluster_replicas,
129            introspection_sources,
130            id_allocator,
131            configs,
132            settings,
133            source_references,
134            system_object_mappings,
135            system_configurations,
136            default_privileges,
137            system_privileges,
138            storage_collection_metadata,
139            unfinalized_shards,
140            txn_wal_shard,
141        }: Snapshot,
142        upper: mz_repr::Timestamp,
143    ) -> Result<Transaction<'a>, CatalogError> {
144        Ok(Transaction {
145            durable_catalog,
146            databases: TableTransaction::new_with_uniqueness_fn(
147                databases,
148                |a: &DatabaseValue, b| a.name == b.name,
149            )?,
150            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151                a.database_id == b.database_id && a.name == b.name
152            })?,
153            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154                a.schema_id == b.schema_id && a.name == b.name && {
155                    // `item_type` is slow, only compute if needed.
156                    let a_type = a.item_type();
157                    let b_type = b.item_type();
158                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161                }
162            })?,
163            comments: TableTransaction::new(comments)?,
164            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165                a.name == b.name
166            })?,
167            role_auth: TableTransaction::new(role_auth)?,
168            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169                a.name == b.name
170            })?,
171            network_policies: TableTransaction::new_with_uniqueness_fn(
172                network_policies,
173                |a: &NetworkPolicyValue, b| a.name == b.name,
174            )?,
175            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176                cluster_replicas,
177                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178            )?,
179            introspection_sources: TableTransaction::new(introspection_sources)?,
180            id_allocator: TableTransaction::new(id_allocator)?,
181            configs: TableTransaction::new(configs)?,
182            settings: TableTransaction::new(settings)?,
183            source_references: TableTransaction::new(source_references)?,
184            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185            system_configurations: TableTransaction::new(system_configurations)?,
186            default_privileges: TableTransaction::new(default_privileges)?,
187            system_privileges: TableTransaction::new(system_privileges)?,
188            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190            // Uniqueness violations for this value occur at the key rather than
191            // the value (the key is the unit struct `()` so this is a singleton
192            // value).
193            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194            audit_log_updates: Vec::new(),
195            upper,
196            op_id: 0,
197        })
198    }
199
200    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201        let key = ItemKey { id: *id };
202        self.items
203            .get(&key)
204            .map(|v| DurableType::from_key_value(key, v.clone()))
205    }
206
207    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208        self.items
209            .items()
210            .into_iter()
211            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212            .sorted_by_key(|Item { id, .. }| *id)
213    }
214
215    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216        self.insert_audit_log_events([event]);
217    }
218
219    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220        let events = events
221            .into_iter()
222            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223        self.audit_log_updates.extend(events);
224    }
225
226    pub fn insert_user_database(
227        &mut self,
228        database_name: &str,
229        owner_id: RoleId,
230        privileges: Vec<MzAclItem>,
231        temporary_oids: &HashSet<u32>,
232    ) -> Result<(DatabaseId, u32), CatalogError> {
233        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234        let id = DatabaseId::User(id);
235        let oid = self.allocate_oid(temporary_oids)?;
236        self.insert_database(id, database_name, owner_id, privileges, oid)?;
237        Ok((id, oid))
238    }
239
240    pub(crate) fn insert_database(
241        &mut self,
242        id: DatabaseId,
243        database_name: &str,
244        owner_id: RoleId,
245        privileges: Vec<MzAclItem>,
246        oid: u32,
247    ) -> Result<u32, CatalogError> {
248        match self.databases.insert(
249            DatabaseKey { id },
250            DatabaseValue {
251                name: database_name.to_string(),
252                owner_id,
253                privileges,
254                oid,
255            },
256            self.op_id,
257        ) {
258            Ok(_) => Ok(oid),
259            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260        }
261    }
262
263    pub fn insert_user_schema(
264        &mut self,
265        database_id: DatabaseId,
266        schema_name: &str,
267        owner_id: RoleId,
268        privileges: Vec<MzAclItem>,
269        temporary_oids: &HashSet<u32>,
270    ) -> Result<(SchemaId, u32), CatalogError> {
271        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272        let id = SchemaId::User(id);
273        let oid = self.allocate_oid(temporary_oids)?;
274        self.insert_schema(
275            id,
276            Some(database_id),
277            schema_name.to_string(),
278            owner_id,
279            privileges,
280            oid,
281        )?;
282        Ok((id, oid))
283    }
284
285    pub fn insert_system_schema(
286        &mut self,
287        schema_id: u64,
288        schema_name: &str,
289        owner_id: RoleId,
290        privileges: Vec<MzAclItem>,
291        oid: u32,
292    ) -> Result<(), CatalogError> {
293        let id = SchemaId::System(schema_id);
294        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295    }
296
297    pub(crate) fn insert_schema(
298        &mut self,
299        schema_id: SchemaId,
300        database_id: Option<DatabaseId>,
301        schema_name: String,
302        owner_id: RoleId,
303        privileges: Vec<MzAclItem>,
304        oid: u32,
305    ) -> Result<(), CatalogError> {
306        match self.schemas.insert(
307            SchemaKey { id: schema_id },
308            SchemaValue {
309                database_id,
310                name: schema_name.clone(),
311                owner_id,
312                privileges,
313                oid,
314            },
315            self.op_id,
316        ) {
317            Ok(_) => Ok(()),
318            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319        }
320    }
321
322    pub fn insert_builtin_role(
323        &mut self,
324        id: RoleId,
325        name: String,
326        attributes: RoleAttributesRaw,
327        membership: RoleMembership,
328        vars: RoleVars,
329        oid: u32,
330    ) -> Result<RoleId, CatalogError> {
331        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332        self.insert_role(id, name, attributes, membership, vars, oid)?;
333        Ok(id)
334    }
335
336    pub fn insert_user_role(
337        &mut self,
338        name: String,
339        attributes: RoleAttributesRaw,
340        membership: RoleMembership,
341        vars: RoleVars,
342        temporary_oids: &HashSet<u32>,
343    ) -> Result<(RoleId, u32), CatalogError> {
344        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345        let id = RoleId::User(id);
346        let oid = self.allocate_oid(temporary_oids)?;
347        self.insert_role(id, name, attributes, membership, vars, oid)?;
348        Ok((id, oid))
349    }
350
351    fn insert_role(
352        &mut self,
353        id: RoleId,
354        name: String,
355        attributes: RoleAttributesRaw,
356        membership: RoleMembership,
357        vars: RoleVars,
358        oid: u32,
359    ) -> Result<(), CatalogError> {
360        if let Some(ref password) = attributes.password {
361            let hash = mz_auth::hash::scram256_hash(
362                password,
363                &attributes
364                    .scram_iterations
365                    .or_else(|| {
366                        soft_panic_or_log!(
367                            "Hash iterations must be set if a password is provided."
368                        );
369                        None
370                    })
371                    // This should never happen, but rather than panicking we'll
372                    // set a known secure value as a fallback.
373                    .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374            )
375            .expect("password hash should be valid");
376            match self.role_auth.insert(
377                RoleAuthKey { role_id: id },
378                RoleAuthValue {
379                    password_hash: Some(hash),
380                    updated_at: SYSTEM_TIME(),
381                },
382                self.op_id,
383            ) {
384                Ok(_) => {}
385                Err(_) => {
386                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387                }
388            }
389        }
390
391        match self.roles.insert(
392            RoleKey { id },
393            RoleValue {
394                name: name.clone(),
395                attributes: attributes.into(),
396                membership,
397                vars,
398                oid,
399            },
400            self.op_id,
401        ) {
402            Ok(_) => Ok(()),
403            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404        }
405    }
406
407    /// Panics if any introspection source id is not a system id
408    pub fn insert_user_cluster(
409        &mut self,
410        cluster_id: ClusterId,
411        cluster_name: &str,
412        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413        owner_id: RoleId,
414        privileges: Vec<MzAclItem>,
415        config: ClusterConfig,
416        temporary_oids: &HashSet<u32>,
417    ) -> Result<(), CatalogError> {
418        self.insert_cluster(
419            cluster_id,
420            cluster_name,
421            introspection_source_indexes,
422            owner_id,
423            privileges,
424            config,
425            temporary_oids,
426        )
427    }
428
429    /// Panics if any introspection source id is not a system id
430    pub fn insert_system_cluster(
431        &mut self,
432        cluster_name: &str,
433        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434        privileges: Vec<MzAclItem>,
435        owner_id: RoleId,
436        config: ClusterConfig,
437        temporary_oids: &HashSet<u32>,
438    ) -> Result<(), CatalogError> {
439        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441        self.insert_cluster(
442            cluster_id,
443            cluster_name,
444            introspection_source_indexes,
445            owner_id,
446            privileges,
447            config,
448            temporary_oids,
449        )
450    }
451
452    fn insert_cluster(
453        &mut self,
454        cluster_id: ClusterId,
455        cluster_name: &str,
456        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457        owner_id: RoleId,
458        privileges: Vec<MzAclItem>,
459        config: ClusterConfig,
460        temporary_oids: &HashSet<u32>,
461    ) -> Result<(), CatalogError> {
462        if let Err(_) = self.clusters.insert(
463            ClusterKey { id: cluster_id },
464            ClusterValue {
465                name: cluster_name.to_string(),
466                owner_id,
467                privileges,
468                config,
469            },
470            self.op_id,
471        ) {
472            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473        };
474
475        let amount = usize_to_u64(introspection_source_indexes.len());
476        let oids = self.allocate_oids(amount, temporary_oids)?;
477        let introspection_source_indexes: Vec<_> = introspection_source_indexes
478            .into_iter()
479            .zip_eq(oids)
480            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481            .collect();
482        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483            let introspection_source_index = IntrospectionSourceIndex {
484                cluster_id,
485                name: builtin.name.to_string(),
486                item_id,
487                index_id,
488                oid,
489            };
490            let (key, value) = introspection_source_index.into_key_value();
491            self.introspection_sources
492                .insert(key, value, self.op_id)
493                .expect("no uniqueness violation");
494        }
495
496        Ok(())
497    }
498
499    pub fn rename_cluster(
500        &mut self,
501        cluster_id: ClusterId,
502        cluster_name: &str,
503        cluster_to_name: &str,
504    ) -> Result<(), CatalogError> {
505        let key = ClusterKey { id: cluster_id };
506
507        match self.clusters.update(
508            |k, v| {
509                if *k == key {
510                    let mut value = v.clone();
511                    value.name = cluster_to_name.to_string();
512                    Some(value)
513                } else {
514                    None
515                }
516            },
517            self.op_id,
518        )? {
519            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520            Diff::ONE => Ok(()),
521            n => panic!(
522                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523            ),
524        }
525    }
526
527    pub fn rename_cluster_replica(
528        &mut self,
529        replica_id: ReplicaId,
530        replica_name: &QualifiedReplica,
531        replica_to_name: &str,
532    ) -> Result<(), CatalogError> {
533        let key = ClusterReplicaKey { id: replica_id };
534
535        match self.cluster_replicas.update(
536            |k, v| {
537                if *k == key {
538                    let mut value = v.clone();
539                    value.name = replica_to_name.to_string();
540                    Some(value)
541                } else {
542                    None
543                }
544            },
545            self.op_id,
546        )? {
547            Diff::ZERO => {
548                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549            }
550            Diff::ONE => Ok(()),
551            n => panic!(
552                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553            ),
554        }
555    }
556
557    pub fn insert_cluster_replica(
558        &mut self,
559        cluster_id: ClusterId,
560        replica_name: &str,
561        config: ReplicaConfig,
562        owner_id: RoleId,
563    ) -> Result<ReplicaId, CatalogError> {
564        let replica_id = match cluster_id {
565            ClusterId::System(_) => self.allocate_system_replica_id()?,
566            ClusterId::User(_) => self.allocate_user_replica_id()?,
567        };
568        self.insert_cluster_replica_with_id(
569            cluster_id,
570            replica_id,
571            replica_name,
572            config,
573            owner_id,
574        )?;
575        Ok(replica_id)
576    }
577
578    pub(crate) fn insert_cluster_replica_with_id(
579        &mut self,
580        cluster_id: ClusterId,
581        replica_id: ReplicaId,
582        replica_name: &str,
583        config: ReplicaConfig,
584        owner_id: RoleId,
585    ) -> Result<(), CatalogError> {
586        if let Err(_) = self.cluster_replicas.insert(
587            ClusterReplicaKey { id: replica_id },
588            ClusterReplicaValue {
589                cluster_id,
590                name: replica_name.into(),
591                config,
592                owner_id,
593            },
594            self.op_id,
595        ) {
596            let cluster = self
597                .clusters
598                .get(&ClusterKey { id: cluster_id })
599                .expect("cluster exists");
600            return Err(SqlCatalogError::DuplicateReplica(
601                replica_name.to_string(),
602                cluster.name.to_string(),
603            )
604            .into());
605        };
606        Ok(())
607    }
608
609    pub fn insert_user_network_policy(
610        &mut self,
611        name: String,
612        rules: Vec<NetworkPolicyRule>,
613        privileges: Vec<MzAclItem>,
614        owner_id: RoleId,
615        temporary_oids: &HashSet<u32>,
616    ) -> Result<NetworkPolicyId, CatalogError> {
617        let oid = self.allocate_oid(temporary_oids)?;
618        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619        let id = NetworkPolicyId::User(id);
620        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621    }
622
623    pub fn insert_network_policy(
624        &mut self,
625        id: NetworkPolicyId,
626        name: String,
627        rules: Vec<NetworkPolicyRule>,
628        privileges: Vec<MzAclItem>,
629        owner_id: RoleId,
630        oid: u32,
631    ) -> Result<NetworkPolicyId, CatalogError> {
632        match self.network_policies.insert(
633            NetworkPolicyKey { id },
634            NetworkPolicyValue {
635                name: name.clone(),
636                rules,
637                privileges,
638                owner_id,
639                oid,
640            },
641            self.op_id,
642        ) {
643            Ok(_) => Ok(id),
644            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645        }
646    }
647
648    /// Updates persisted information about persisted introspection source
649    /// indexes.
650    ///
651    /// Panics if provided id is not a system id.
652    pub fn update_introspection_source_index_gids(
653        &mut self,
654        mappings: impl Iterator<
655            Item = (
656                ClusterId,
657                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658            ),
659        >,
660    ) -> Result<(), CatalogError> {
661        for (cluster_id, updates) in mappings {
662            for (name, item_id, index_id, oid) in updates {
663                let introspection_source_index = IntrospectionSourceIndex {
664                    cluster_id,
665                    name,
666                    item_id,
667                    index_id,
668                    oid,
669                };
670                let (key, value) = introspection_source_index.into_key_value();
671
672                let prev = self
673                    .introspection_sources
674                    .set(key, Some(value), self.op_id)?;
675                if prev.is_none() {
676                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677                        "{index_id}"
678                    ))
679                    .into());
680                }
681            }
682        }
683        Ok(())
684    }
685
686    pub fn insert_user_item(
687        &mut self,
688        id: CatalogItemId,
689        global_id: GlobalId,
690        schema_id: SchemaId,
691        item_name: &str,
692        create_sql: String,
693        owner_id: RoleId,
694        privileges: Vec<MzAclItem>,
695        temporary_oids: &HashSet<u32>,
696        versions: BTreeMap<RelationVersion, GlobalId>,
697    ) -> Result<u32, CatalogError> {
698        let oid = self.allocate_oid(temporary_oids)?;
699        self.insert_item(
700            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701        )?;
702        Ok(oid)
703    }
704
705    pub fn insert_item(
706        &mut self,
707        id: CatalogItemId,
708        oid: u32,
709        global_id: GlobalId,
710        schema_id: SchemaId,
711        item_name: &str,
712        create_sql: String,
713        owner_id: RoleId,
714        privileges: Vec<MzAclItem>,
715        extra_versions: BTreeMap<RelationVersion, GlobalId>,
716    ) -> Result<(), CatalogError> {
717        match self.items.insert(
718            ItemKey { id },
719            ItemValue {
720                schema_id,
721                name: item_name.to_string(),
722                create_sql,
723                owner_id,
724                privileges,
725                oid,
726                global_id,
727                extra_versions,
728            },
729            self.op_id,
730        ) {
731            Ok(_) => Ok(()),
732            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733        }
734    }
735
736    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738    }
739
740    pub fn get_and_increment_id_by(
741        &mut self,
742        key: String,
743        amount: u64,
744    ) -> Result<Vec<u64>, CatalogError> {
745        assert!(
746            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747            "system item IDs cannot be allocated outside of bootstrap"
748        );
749
750        let current_id = self
751            .id_allocator
752            .items()
753            .get(&IdAllocKey { name: key.clone() })
754            .unwrap_or_else(|| panic!("{key} id allocator missing"))
755            .next_id;
756        let next_id = current_id
757            .checked_add(amount)
758            .ok_or(SqlCatalogError::IdExhaustion)?;
759        let prev = self.id_allocator.set(
760            IdAllocKey { name: key },
761            Some(IdAllocValue { next_id }),
762            self.op_id,
763        )?;
764        assert_eq!(
765            prev,
766            Some(IdAllocValue {
767                next_id: current_id
768            })
769        );
770        Ok((current_id..next_id).collect())
771    }
772
773    pub fn allocate_system_item_ids(
774        &mut self,
775        amount: u64,
776    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777        assert!(
778            !self.durable_catalog.is_bootstrap_complete(),
779            "we can only allocate system item IDs during bootstrap"
780        );
781        Ok(self
782            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783            .into_iter()
784            // TODO(alter_table): Use separate ID allocators.
785            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786            .collect())
787    }
788
789    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
790    /// from the `cluster_id` and `log_variant`.
791    ///
792    /// Introspection source indexes are a special edge case of items. They are considered system
793    /// items, but they are the only system item that can be created by the user at any time. All
794    /// other system items can only be created by the system during the startup of an upgrade.
795    ///
796    /// Furthermore, all other system item IDs are allocated deterministically in the same order
797    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
798    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
799    /// only one of them can successfully write the IDs down to the catalog. This removes the need
800    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
801    ///
802    /// Since introspection IDs can be allocated at any time, read-only instances would either need
803    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
804    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
805    ///
806    /// Introspection source index IDs are 64 bit integers, with the following format (not to
807    /// scale):
808    ///
809    /// -------------------------------------------------------------
810    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
811    /// |--------------------|------------------------|-------------|
812    /// |       8-bits       |         48-bits        |   8-bits    |
813    /// -------------------------------------------------------------
814    ///
815    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
816    ///                          to.
817    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
818    ///                          belongs to.
819    /// Log Variant:             A unique number indicating the log variant this index is on.
820    pub fn allocate_introspection_source_index_id(
821        cluster_id: &ClusterId,
822        log_variant: LogVariant,
823    ) -> (CatalogItemId, GlobalId) {
824        let cluster_variant: u8 = match cluster_id {
825            ClusterId::System(_) => 1,
826            ClusterId::User(_) => 2,
827        };
828        let cluster_id: u64 = cluster_id.inner_id();
829        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830        assert_eq!(
831            CLUSTER_ID_MASK & cluster_id,
832            0,
833            "invalid cluster ID: {cluster_id}"
834        );
835        let log_variant: u8 = match log_variant {
836            LogVariant::Timely(TimelyLog::Operates) => 1,
837            LogVariant::Timely(TimelyLog::Channels) => 2,
838            LogVariant::Timely(TimelyLog::Elapsed) => 3,
839            LogVariant::Timely(TimelyLog::Histogram) => 4,
840            LogVariant::Timely(TimelyLog::Addresses) => 5,
841            LogVariant::Timely(TimelyLog::Parks) => 6,
842            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844            LogVariant::Timely(TimelyLog::Reachability) => 9,
845            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849            LogVariant::Differential(DifferentialLog::Sharing) => 14,
850            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862            LogVariant::Compute(ComputeLog::ShutdownDuration) => 27,
863            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
864            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
865            LogVariant::Compute(ComputeLog::LirMapping) => 30,
866            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
867            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
868        };
869
870        let mut id: u64 = u64::from(cluster_variant) << 56;
871        id |= cluster_id << 8;
872        id |= u64::from(log_variant);
873
874        (
875            CatalogItemId::IntrospectionSourceIndex(id),
876            GlobalId::IntrospectionSourceIndex(id),
877        )
878    }
879
880    pub fn allocate_user_item_ids(
881        &mut self,
882        amount: u64,
883    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
884        Ok(self
885            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
886            .into_iter()
887            // TODO(alter_table): Use separate ID allocators.
888            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
889            .collect())
890    }
891
892    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
893        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
894        Ok(ReplicaId::User(id))
895    }
896
897    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
898        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
899        Ok(ReplicaId::System(id))
900    }
901
902    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
903        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
904    }
905
906    pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
907        self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
908    }
909
910    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
911    /// object.
912    #[mz_ore::instrument]
913    fn allocate_oids(
914        &mut self,
915        amount: u64,
916        temporary_oids: &HashSet<u32>,
917    ) -> Result<Vec<u32>, CatalogError> {
918        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
919        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
920        struct UserOid(u32);
921
922        impl UserOid {
923            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
924                if oid < FIRST_USER_OID {
925                    Err(anyhow!("invalid user OID {oid}"))
926                } else {
927                    Ok(UserOid(oid))
928                }
929            }
930        }
931
932        impl std::ops::AddAssign<u32> for UserOid {
933            fn add_assign(&mut self, rhs: u32) {
934                let (res, overflow) = self.0.overflowing_add(rhs);
935                self.0 = if overflow { FIRST_USER_OID + res } else { res };
936            }
937        }
938
939        if amount > u32::MAX.into() {
940            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
941        }
942
943        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
944        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
945        // However, benchmarking shows that this doesn't make a noticeable difference and the other
946        // approach requires making sure that allocator always stays in-sync which can be
947        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
948        let mut allocated_oids = HashSet::with_capacity(
949            self.databases.len()
950                + self.schemas.len()
951                + self.roles.len()
952                + self.items.len()
953                + self.introspection_sources.len()
954                + temporary_oids.len(),
955        );
956        self.databases.for_values(|_, value| {
957            allocated_oids.insert(value.oid);
958        });
959        self.schemas.for_values(|_, value| {
960            allocated_oids.insert(value.oid);
961        });
962        self.roles.for_values(|_, value| {
963            allocated_oids.insert(value.oid);
964        });
965        self.items.for_values(|_, value| {
966            allocated_oids.insert(value.oid);
967        });
968        self.introspection_sources.for_values(|_, value| {
969            allocated_oids.insert(value.oid);
970        });
971
972        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
973
974        let start_oid: u32 = self
975            .id_allocator
976            .items()
977            .get(&IdAllocKey {
978                name: OID_ALLOC_KEY.to_string(),
979            })
980            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
981            .next_id
982            .try_into()
983            .expect("we should never persist an oid outside of the u32 range");
984        let mut current_oid = UserOid::new(start_oid)
985            .expect("we should never persist an oid outside of user OID range");
986        let mut oids = Vec::new();
987        while oids.len() < u64_to_usize(amount) {
988            if !is_allocated(current_oid.0) {
989                oids.push(current_oid.0);
990            }
991            current_oid += 1;
992
993            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
994                // We've exhausted all possible OIDs and still don't have `amount`.
995                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
996            }
997        }
998
999        let next_id = current_oid.0;
1000        let prev = self.id_allocator.set(
1001            IdAllocKey {
1002                name: OID_ALLOC_KEY.to_string(),
1003            },
1004            Some(IdAllocValue {
1005                next_id: next_id.into(),
1006            }),
1007            self.op_id,
1008        )?;
1009        assert_eq!(
1010            prev,
1011            Some(IdAllocValue {
1012                next_id: start_oid.into(),
1013            })
1014        );
1015
1016        Ok(oids)
1017    }
1018
1019    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1020    /// object.
1021    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1022        self.allocate_oids(1, temporary_oids)
1023            .map(|oids| oids.into_element())
1024    }
1025
1026    pub(crate) fn insert_id_allocator(
1027        &mut self,
1028        name: String,
1029        next_id: u64,
1030    ) -> Result<(), CatalogError> {
1031        match self.id_allocator.insert(
1032            IdAllocKey { name: name.clone() },
1033            IdAllocValue { next_id },
1034            self.op_id,
1035        ) {
1036            Ok(_) => Ok(()),
1037            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1038        }
1039    }
1040
1041    /// Removes the database `id` from the transaction.
1042    ///
1043    /// Returns an error if `id` is not found.
1044    ///
1045    /// Runtime is linear with respect to the total number of databases in the catalog.
1046    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1047    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1048        let prev = self
1049            .databases
1050            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1051        if prev.is_some() {
1052            Ok(())
1053        } else {
1054            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1055        }
1056    }
1057
1058    /// Removes all databases in `databases` from the transaction.
1059    ///
1060    /// Returns an error if any id in `databases` is not found.
1061    ///
1062    /// NOTE: On error, there still may be some databases removed from the transaction. It
1063    /// is up to the caller to either abort the transaction or commit.
1064    pub fn remove_databases(
1065        &mut self,
1066        databases: &BTreeSet<DatabaseId>,
1067    ) -> Result<(), CatalogError> {
1068        if databases.is_empty() {
1069            return Ok(());
1070        }
1071
1072        let to_remove = databases
1073            .iter()
1074            .map(|id| (DatabaseKey { id: *id }, None))
1075            .collect();
1076        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1077        prev.retain(|_k, val| val.is_none());
1078
1079        if !prev.is_empty() {
1080            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1081            return Err(SqlCatalogError::UnknownDatabase(err).into());
1082        }
1083
1084        Ok(())
1085    }
1086
1087    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1088    ///
1089    /// Returns an error if `(database_id, schema_id)` is not found.
1090    ///
1091    /// Runtime is linear with respect to the total number of schemas in the catalog.
1092    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1093    pub fn remove_schema(
1094        &mut self,
1095        database_id: &Option<DatabaseId>,
1096        schema_id: &SchemaId,
1097    ) -> Result<(), CatalogError> {
1098        let prev = self
1099            .schemas
1100            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1101        if prev.is_some() {
1102            Ok(())
1103        } else {
1104            let database_name = match database_id {
1105                Some(id) => format!("{id}."),
1106                None => "".to_string(),
1107            };
1108            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1109        }
1110    }
1111
1112    /// Removes all schemas in `schemas` from the transaction.
1113    ///
1114    /// Returns an error if any id in `schemas` is not found.
1115    ///
1116    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1117    /// is up to the caller to either abort the transaction or commit.
1118    pub fn remove_schemas(
1119        &mut self,
1120        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1121    ) -> Result<(), CatalogError> {
1122        if schemas.is_empty() {
1123            return Ok(());
1124        }
1125
1126        let to_remove = schemas
1127            .iter()
1128            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1129            .collect();
1130        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1131        prev.retain(|_k, v| v.is_none());
1132
1133        if !prev.is_empty() {
1134            let err = prev
1135                .keys()
1136                .map(|k| {
1137                    let db_spec = schemas.get(&k.id).expect("should_exist");
1138                    let db_name = match db_spec {
1139                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1140                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1141                    };
1142                    format!("{}.{}", db_name, k.id)
1143                })
1144                .join(", ");
1145
1146            return Err(SqlCatalogError::UnknownSchema(err).into());
1147        }
1148
1149        Ok(())
1150    }
1151
1152    pub fn remove_source_references(
1153        &mut self,
1154        source_id: CatalogItemId,
1155    ) -> Result<(), CatalogError> {
1156        let deleted = self
1157            .source_references
1158            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1159            .is_some();
1160        if deleted {
1161            Ok(())
1162        } else {
1163            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1164        }
1165    }
1166
1167    /// Removes all user roles in `roles` from the transaction.
1168    ///
1169    /// Returns an error if any id in `roles` is not found.
1170    ///
1171    /// NOTE: On error, there still may be some roles removed from the transaction. It
1172    /// is up to the caller to either abort the transaction or commit.
1173    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1174        assert!(
1175            roles.iter().all(|id| id.is_user()),
1176            "cannot delete non-user roles"
1177        );
1178        self.remove_roles(roles)
1179    }
1180
1181    /// Removes all roles in `roles` from the transaction.
1182    ///
1183    /// Returns an error if any id in `roles` is not found.
1184    ///
1185    /// NOTE: On error, there still may be some roles removed from the transaction. It
1186    /// is up to the caller to either abort the transaction or commit.
1187    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1188        if roles.is_empty() {
1189            return Ok(());
1190        }
1191
1192        let to_remove_keys = roles
1193            .iter()
1194            .map(|role_id| RoleKey { id: *role_id })
1195            .collect::<Vec<_>>();
1196
1197        let to_remove_roles = to_remove_keys
1198            .iter()
1199            .map(|role_key| (role_key.clone(), None))
1200            .collect();
1201
1202        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1203
1204        let to_remove_role_auth = to_remove_keys
1205            .iter()
1206            .map(|role_key| {
1207                (
1208                    RoleAuthKey {
1209                        role_id: role_key.id,
1210                    },
1211                    None,
1212                )
1213            })
1214            .collect();
1215
1216        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1217
1218        prev.retain(|_k, v| v.is_none());
1219        if !prev.is_empty() {
1220            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1221            return Err(SqlCatalogError::UnknownRole(err).into());
1222        }
1223
1224        role_auth_prev.retain(|_k, v| v.is_none());
1225        // The reason we don't to the same check as above is that the role auth table
1226        // is not required to have all roles in the role table.
1227
1228        Ok(())
1229    }
1230
1231    /// Removes all cluster in `clusters` from the transaction.
1232    ///
1233    /// Returns an error if any id in `clusters` is not found.
1234    ///
1235    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1236    /// the caller to either abort the transaction or commit.
1237    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1238        if clusters.is_empty() {
1239            return Ok(());
1240        }
1241
1242        let to_remove = clusters
1243            .iter()
1244            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1245            .collect();
1246        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1247
1248        prev.retain(|_k, v| v.is_none());
1249        if !prev.is_empty() {
1250            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1251            return Err(SqlCatalogError::UnknownCluster(err).into());
1252        }
1253
1254        // Cascade delete introspection sources and cluster replicas.
1255        //
1256        // TODO(benesch): this doesn't seem right. Cascade deletions should
1257        // be entirely the domain of the higher catalog layer, not the
1258        // storage layer.
1259        self.cluster_replicas
1260            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1261        self.introspection_sources
1262            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1263
1264        Ok(())
1265    }
1266
1267    /// Removes the cluster replica `id` from the transaction.
1268    ///
1269    /// Returns an error if `id` is not found.
1270    ///
1271    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1272    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1273    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1274        let deleted = self
1275            .cluster_replicas
1276            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1277            .is_some();
1278        if deleted {
1279            Ok(())
1280        } else {
1281            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1282        }
1283    }
1284
1285    /// Removes all cluster replicas in `replicas` from the transaction.
1286    ///
1287    /// Returns an error if any id in `replicas` is not found.
1288    ///
1289    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1290    /// is up to the caller to either abort the transaction or commit.
1291    pub fn remove_cluster_replicas(
1292        &mut self,
1293        replicas: &BTreeSet<ReplicaId>,
1294    ) -> Result<(), CatalogError> {
1295        if replicas.is_empty() {
1296            return Ok(());
1297        }
1298
1299        let to_remove = replicas
1300            .iter()
1301            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1302            .collect();
1303        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1304
1305        prev.retain(|_k, v| v.is_none());
1306        if !prev.is_empty() {
1307            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1308            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1309        }
1310
1311        Ok(())
1312    }
1313
1314    /// Removes item `id` from the transaction.
1315    ///
1316    /// Returns an error if `id` is not found.
1317    ///
1318    /// Runtime is linear with respect to the total number of items in the catalog.
1319    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1320    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1321        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1322        if prev.is_some() {
1323            Ok(())
1324        } else {
1325            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1326        }
1327    }
1328
1329    /// Removes all items in `ids` from the transaction.
1330    ///
1331    /// Returns an error if any id in `ids` is not found.
1332    ///
1333    /// NOTE: On error, there still may be some items removed from the transaction. It is
1334    /// up to the caller to either abort the transaction or commit.
1335    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1336        if ids.is_empty() {
1337            return Ok(());
1338        }
1339
1340        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1341        let n = self.items.delete_by_keys(ks, self.op_id).len();
1342        if n == ids.len() {
1343            Ok(())
1344        } else {
1345            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1346            let mut unknown = ids.difference(&item_ids);
1347            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1348        }
1349    }
1350
1351    /// Removes all system object mappings in `descriptions` from the transaction.
1352    ///
1353    /// Returns an error if any description in `descriptions` is not found.
1354    ///
1355    /// NOTE: On error, there still may be some items removed from the transaction. It is
1356    /// up to the caller to either abort the transaction or commit.
1357    pub fn remove_system_object_mappings(
1358        &mut self,
1359        descriptions: BTreeSet<SystemObjectDescription>,
1360    ) -> Result<(), CatalogError> {
1361        if descriptions.is_empty() {
1362            return Ok(());
1363        }
1364
1365        let ks: Vec<_> = descriptions
1366            .clone()
1367            .into_iter()
1368            .map(|desc| GidMappingKey {
1369                schema_name: desc.schema_name,
1370                object_type: desc.object_type,
1371                object_name: desc.object_name,
1372            })
1373            .collect();
1374        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1375
1376        if n == descriptions.len() {
1377            Ok(())
1378        } else {
1379            let item_descriptions = self
1380                .system_gid_mapping
1381                .items()
1382                .keys()
1383                .map(|k| SystemObjectDescription {
1384                    schema_name: k.schema_name.clone(),
1385                    object_type: k.object_type.clone(),
1386                    object_name: k.object_name.clone(),
1387                })
1388                .collect();
1389            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1390                format!(
1391                    "{} {}.{}",
1392                    desc.object_type, desc.schema_name, desc.object_name
1393                )
1394            });
1395            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1396        }
1397    }
1398
1399    /// Removes all introspection source indexes in `indexes` from the transaction.
1400    ///
1401    /// Returns an error if any index in `indexes` is not found.
1402    ///
1403    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1404    /// up to the caller to either abort the transaction or commit.
1405    pub fn remove_introspection_source_indexes(
1406        &mut self,
1407        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1408    ) -> Result<(), CatalogError> {
1409        if introspection_source_indexes.is_empty() {
1410            return Ok(());
1411        }
1412
1413        let ks: Vec<_> = introspection_source_indexes
1414            .clone()
1415            .into_iter()
1416            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1417            .collect();
1418        let n = self
1419            .introspection_sources
1420            .delete_by_keys(ks, self.op_id)
1421            .len();
1422        if n == introspection_source_indexes.len() {
1423            Ok(())
1424        } else {
1425            let txn_indexes = self
1426                .introspection_sources
1427                .items()
1428                .keys()
1429                .map(|k| (k.cluster_id, k.name.clone()))
1430                .collect();
1431            let mut unknown = introspection_source_indexes
1432                .difference(&txn_indexes)
1433                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1434            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1435        }
1436    }
1437
1438    /// Updates item `id` in the transaction to `item_name` and `item`.
1439    ///
1440    /// Returns an error if `id` is not found.
1441    ///
1442    /// Runtime is linear with respect to the total number of items in the catalog.
1443    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1444    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1445        let updated =
1446            self.items
1447                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1448        if updated {
1449            Ok(())
1450        } else {
1451            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1452        }
1453    }
1454
1455    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1456    /// corresponding value in `items`.
1457    ///
1458    /// Returns an error if any id in `items` is not found.
1459    ///
1460    /// NOTE: On error, there still may be some items updated in the transaction. It is
1461    /// up to the caller to either abort the transaction or commit.
1462    pub fn update_items(
1463        &mut self,
1464        items: BTreeMap<CatalogItemId, Item>,
1465    ) -> Result<(), CatalogError> {
1466        if items.is_empty() {
1467            return Ok(());
1468        }
1469
1470        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1471        let kvs: Vec<_> = items
1472            .clone()
1473            .into_iter()
1474            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1475            .collect();
1476        let n = self.items.update_by_keys(kvs, self.op_id)?;
1477        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1478        if n == update_ids.len() {
1479            Ok(())
1480        } else {
1481            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1482            let mut unknown = update_ids.difference(&item_ids);
1483            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1484        }
1485    }
1486
1487    /// Updates role `id` in the transaction to `role`.
1488    ///
1489    /// Returns an error if `id` is not found.
1490    ///
1491    /// Runtime is linear with respect to the total number of items in the catalog.
1492    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1493    /// You should model it after [`Self::update_items`].
1494    pub fn update_role(
1495        &mut self,
1496        id: RoleId,
1497        role: Role,
1498        password: PasswordAction,
1499    ) -> Result<(), CatalogError> {
1500        let key = RoleKey { id };
1501        if self.roles.get(&key).is_some() {
1502            let auth_key = RoleAuthKey { role_id: id };
1503
1504            match password {
1505                PasswordAction::Set(new_password) => {
1506                    let hash = mz_auth::hash::scram256_hash(
1507                        &new_password.password,
1508                        &new_password.scram_iterations,
1509                    )
1510                    .expect("password hash should be valid");
1511                    let value = RoleAuthValue {
1512                        password_hash: Some(hash),
1513                        updated_at: SYSTEM_TIME(),
1514                    };
1515
1516                    if self.role_auth.get(&auth_key).is_some() {
1517                        self.role_auth
1518                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1519                    } else {
1520                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1521                    }
1522                }
1523                PasswordAction::Clear => {
1524                    let value = RoleAuthValue {
1525                        password_hash: None,
1526                        updated_at: SYSTEM_TIME(),
1527                    };
1528                    if self.role_auth.get(&auth_key).is_some() {
1529                        self.role_auth
1530                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1531                    }
1532                }
1533                PasswordAction::NoChange => {}
1534            }
1535
1536            self.roles
1537                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1538
1539            Ok(())
1540        } else {
1541            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1542        }
1543    }
1544
1545    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1546    /// corresponding value in `roles`.
1547    ///
1548    /// This function does *not* write role_authentication information to the catalog.
1549    /// It is purely for updating the role itself.
1550    ///
1551    /// Returns an error if any id in `roles` is not found.
1552    ///
1553    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1554    /// up to the caller to either abort the transaction or commit.
1555    pub fn update_roles_without_auth(
1556        &mut self,
1557        roles: BTreeMap<RoleId, Role>,
1558    ) -> Result<(), CatalogError> {
1559        if roles.is_empty() {
1560            return Ok(());
1561        }
1562
1563        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1564        let kvs: Vec<_> = roles
1565            .into_iter()
1566            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1567            .collect();
1568        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1569        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1570
1571        if n == update_role_ids.len() {
1572            Ok(())
1573        } else {
1574            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1575            let mut unknown = update_role_ids.difference(&role_ids);
1576            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1577        }
1578    }
1579
1580    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1581    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1582    ///
1583    /// Panics if provided id is not a system id.
1584    pub fn update_system_object_mappings(
1585        &mut self,
1586        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1587    ) -> Result<(), CatalogError> {
1588        if mappings.is_empty() {
1589            return Ok(());
1590        }
1591
1592        let n = self.system_gid_mapping.update(
1593            |_k, v| {
1594                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1595                    let (_, new_value) = mapping.clone().into_key_value();
1596                    Some(new_value)
1597                } else {
1598                    None
1599                }
1600            },
1601            self.op_id,
1602        )?;
1603
1604        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1605            != mappings.len()
1606        {
1607            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1608            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1609        }
1610
1611        Ok(())
1612    }
1613
1614    /// Updates cluster `id` in the transaction to `cluster`.
1615    ///
1616    /// Returns an error if `id` is not found.
1617    ///
1618    /// Runtime is linear with respect to the total number of clusters in the catalog.
1619    /// DO NOT call this function in a loop.
1620    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1621        let updated = self.clusters.update_by_key(
1622            ClusterKey { id },
1623            cluster.into_key_value().1,
1624            self.op_id,
1625        )?;
1626        if updated {
1627            Ok(())
1628        } else {
1629            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1630        }
1631    }
1632
1633    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1634    ///
1635    /// Returns an error if `replica_id` is not found.
1636    ///
1637    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1638    /// DO NOT call this function in a loop.
1639    pub fn update_cluster_replica(
1640        &mut self,
1641        replica_id: ReplicaId,
1642        replica: ClusterReplica,
1643    ) -> Result<(), CatalogError> {
1644        let updated = self.cluster_replicas.update_by_key(
1645            ClusterReplicaKey { id: replica_id },
1646            replica.into_key_value().1,
1647            self.op_id,
1648        )?;
1649        if updated {
1650            Ok(())
1651        } else {
1652            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1653        }
1654    }
1655
1656    /// Updates database `id` in the transaction to `database`.
1657    ///
1658    /// Returns an error if `id` is not found.
1659    ///
1660    /// Runtime is linear with respect to the total number of databases in the catalog.
1661    /// DO NOT call this function in a loop.
1662    pub fn update_database(
1663        &mut self,
1664        id: DatabaseId,
1665        database: Database,
1666    ) -> Result<(), CatalogError> {
1667        let updated = self.databases.update_by_key(
1668            DatabaseKey { id },
1669            database.into_key_value().1,
1670            self.op_id,
1671        )?;
1672        if updated {
1673            Ok(())
1674        } else {
1675            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1676        }
1677    }
1678
1679    /// Updates schema `schema_id` in the transaction to `schema`.
1680    ///
1681    /// Returns an error if `schema_id` is not found.
1682    ///
1683    /// Runtime is linear with respect to the total number of schemas in the catalog.
1684    /// DO NOT call this function in a loop.
1685    pub fn update_schema(
1686        &mut self,
1687        schema_id: SchemaId,
1688        schema: Schema,
1689    ) -> Result<(), CatalogError> {
1690        let updated = self.schemas.update_by_key(
1691            SchemaKey { id: schema_id },
1692            schema.into_key_value().1,
1693            self.op_id,
1694        )?;
1695        if updated {
1696            Ok(())
1697        } else {
1698            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1699        }
1700    }
1701
1702    /// Updates `network_policy_id` in the transaction to `network policy`.
1703    ///
1704    /// Returns an error if `id` is not found.
1705    ///
1706    /// Runtime is linear with respect to the total number of databases in the catalog.
1707    /// DO NOT call this function in a loop.
1708    pub fn update_network_policy(
1709        &mut self,
1710        id: NetworkPolicyId,
1711        network_policy: NetworkPolicy,
1712    ) -> Result<(), CatalogError> {
1713        let updated = self.network_policies.update_by_key(
1714            NetworkPolicyKey { id },
1715            network_policy.into_key_value().1,
1716            self.op_id,
1717        )?;
1718        if updated {
1719            Ok(())
1720        } else {
1721            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1722        }
1723    }
1724    /// Removes all network policies in `network policies` from the transaction.
1725    ///
1726    /// Returns an error if any id in `network policy` is not found.
1727    ///
1728    /// NOTE: On error, there still may be some roles removed from the transaction. It
1729    /// is up to the caller to either abort the transaction or commit.
1730    pub fn remove_network_policies(
1731        &mut self,
1732        network_policies: &BTreeSet<NetworkPolicyId>,
1733    ) -> Result<(), CatalogError> {
1734        if network_policies.is_empty() {
1735            return Ok(());
1736        }
1737
1738        let to_remove = network_policies
1739            .iter()
1740            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1741            .collect();
1742        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1743        assert!(
1744            prev.iter().all(|(k, _)| k.id.is_user()),
1745            "cannot delete non-user network policy"
1746        );
1747
1748        prev.retain(|_k, v| v.is_none());
1749        if !prev.is_empty() {
1750            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1751            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1752        }
1753
1754        Ok(())
1755    }
1756    /// Set persisted default privilege.
1757    ///
1758    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1759    pub fn set_default_privilege(
1760        &mut self,
1761        role_id: RoleId,
1762        database_id: Option<DatabaseId>,
1763        schema_id: Option<SchemaId>,
1764        object_type: ObjectType,
1765        grantee: RoleId,
1766        privileges: Option<AclMode>,
1767    ) -> Result<(), CatalogError> {
1768        self.default_privileges.set(
1769            DefaultPrivilegesKey {
1770                role_id,
1771                database_id,
1772                schema_id,
1773                object_type,
1774                grantee,
1775            },
1776            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1777            self.op_id,
1778        )?;
1779        Ok(())
1780    }
1781
1782    /// Set persisted default privileges.
1783    pub fn set_default_privileges(
1784        &mut self,
1785        default_privileges: Vec<DefaultPrivilege>,
1786    ) -> Result<(), CatalogError> {
1787        if default_privileges.is_empty() {
1788            return Ok(());
1789        }
1790
1791        let default_privileges = default_privileges
1792            .into_iter()
1793            .map(DurableType::into_key_value)
1794            .map(|(k, v)| (k, Some(v)))
1795            .collect();
1796        self.default_privileges
1797            .set_many(default_privileges, self.op_id)?;
1798        Ok(())
1799    }
1800
1801    /// Set persisted system privilege.
1802    ///
1803    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1804    pub fn set_system_privilege(
1805        &mut self,
1806        grantee: RoleId,
1807        grantor: RoleId,
1808        acl_mode: Option<AclMode>,
1809    ) -> Result<(), CatalogError> {
1810        self.system_privileges.set(
1811            SystemPrivilegesKey { grantee, grantor },
1812            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1813            self.op_id,
1814        )?;
1815        Ok(())
1816    }
1817
1818    /// Set persisted system privileges.
1819    pub fn set_system_privileges(
1820        &mut self,
1821        system_privileges: Vec<MzAclItem>,
1822    ) -> Result<(), CatalogError> {
1823        if system_privileges.is_empty() {
1824            return Ok(());
1825        }
1826
1827        let system_privileges = system_privileges
1828            .into_iter()
1829            .map(DurableType::into_key_value)
1830            .map(|(k, v)| (k, Some(v)))
1831            .collect();
1832        self.system_privileges
1833            .set_many(system_privileges, self.op_id)?;
1834        Ok(())
1835    }
1836
1837    /// Set persisted setting.
1838    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1839        self.settings.set(
1840            SettingKey { name },
1841            value.map(|value| SettingValue { value }),
1842            self.op_id,
1843        )?;
1844        Ok(())
1845    }
1846
1847    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1848        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1849    }
1850
1851    /// Insert persisted introspection source index.
1852    pub fn insert_introspection_source_indexes(
1853        &mut self,
1854        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1855        temporary_oids: &HashSet<u32>,
1856    ) -> Result<(), CatalogError> {
1857        if introspection_source_indexes.is_empty() {
1858            return Ok(());
1859        }
1860
1861        let amount = usize_to_u64(introspection_source_indexes.len());
1862        let oids = self.allocate_oids(amount, temporary_oids)?;
1863        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1864            .into_iter()
1865            .zip_eq(oids)
1866            .map(
1867                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1868                    cluster_id,
1869                    name,
1870                    item_id,
1871                    index_id,
1872                    oid,
1873                },
1874            )
1875            .collect();
1876
1877        for introspection_source_index in introspection_source_indexes {
1878            let (key, value) = introspection_source_index.into_key_value();
1879            self.introspection_sources.insert(key, value, self.op_id)?;
1880        }
1881
1882        Ok(())
1883    }
1884
1885    /// Set persisted system object mappings.
1886    pub fn set_system_object_mappings(
1887        &mut self,
1888        mappings: Vec<SystemObjectMapping>,
1889    ) -> Result<(), CatalogError> {
1890        if mappings.is_empty() {
1891            return Ok(());
1892        }
1893
1894        let mappings = mappings
1895            .into_iter()
1896            .map(DurableType::into_key_value)
1897            .map(|(k, v)| (k, Some(v)))
1898            .collect();
1899        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1900        Ok(())
1901    }
1902
1903    /// Set persisted replica.
1904    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1905        if replicas.is_empty() {
1906            return Ok(());
1907        }
1908
1909        let replicas = replicas
1910            .into_iter()
1911            .map(DurableType::into_key_value)
1912            .map(|(k, v)| (k, Some(v)))
1913            .collect();
1914        self.cluster_replicas.set_many(replicas, self.op_id)?;
1915        Ok(())
1916    }
1917
1918    /// Set persisted configuration.
1919    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1920        match value {
1921            Some(value) => {
1922                let config = Config { key, value };
1923                let (key, value) = config.into_key_value();
1924                self.configs.set(key, Some(value), self.op_id)?;
1925            }
1926            None => {
1927                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1928            }
1929        }
1930        Ok(())
1931    }
1932
1933    /// Get the value of a persisted config.
1934    pub fn get_config(&self, key: String) -> Option<u64> {
1935        self.configs
1936            .get(&ConfigKey { key })
1937            .map(|entry| entry.value)
1938    }
1939
1940    /// Get the value of a persisted setting.
1941    pub fn get_setting(&self, name: String) -> Option<&str> {
1942        self.settings
1943            .get(&SettingKey { name })
1944            .map(|entry| &*entry.value)
1945    }
1946
1947    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1948        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1949            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1950    }
1951
1952    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1953        self.set_setting(
1954            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1955            Some(shard_id.to_string()),
1956        )
1957    }
1958
1959    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1960        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1961            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1962    }
1963
1964    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1965        self.set_setting(
1966            EXPRESSION_CACHE_SHARD_KEY.to_string(),
1967            Some(shard_id.to_string()),
1968        )
1969    }
1970
1971    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
1972    /// match the `with_0dt_deployment_max_wait` "system var" value.
1973    ///
1974    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1975    /// but use it in boot before Launch Darkly is available.
1976    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1977        self.set_config(
1978            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1979            Some(
1980                value
1981                    .as_millis()
1982                    .try_into()
1983                    .expect("max wait fits into u64"),
1984            ),
1985        )
1986    }
1987
1988    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
1989    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
1990    /// value.
1991    ///
1992    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1993    /// but use it in boot before Launch Darkly is available.
1994    pub fn set_0dt_deployment_ddl_check_interval(
1995        &mut self,
1996        value: Duration,
1997    ) -> Result<(), CatalogError> {
1998        self.set_config(
1999            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2000            Some(
2001                value
2002                    .as_millis()
2003                    .try_into()
2004                    .expect("ddl check interval fits into u64"),
2005            ),
2006        )
2007    }
2008
2009    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
2010    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
2011    ///
2012    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2013    /// but use it in boot before Launch Darkly is available.
2014    pub fn set_enable_0dt_deployment_panic_after_timeout(
2015        &mut self,
2016        value: bool,
2017    ) -> Result<(), CatalogError> {
2018        self.set_config(
2019            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2020            Some(u64::from(value)),
2021        )
2022    }
2023
2024    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2025    /// match the `with_0dt_deployment_max_wait` "system var" value.
2026    ///
2027    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2028    /// but use it in boot before LaunchDarkly is available.
2029    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2030        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2031    }
2032
2033    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2034    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2035    /// value.
2036    ///
2037    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2038    /// use it in boot before LaunchDarkly is available.
2039    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2040        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2041    }
2042
2043    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2044    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2045    /// var" value.
2046    ///
2047    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2048    /// use it in boot before LaunchDarkly is available.
2049    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2050        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2051    }
2052
2053    /// Updates the catalog `system_config_synced` "config" value to true.
2054    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2055        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2056    }
2057
2058    pub fn update_comment(
2059        &mut self,
2060        object_id: CommentObjectId,
2061        sub_component: Option<usize>,
2062        comment: Option<String>,
2063    ) -> Result<(), CatalogError> {
2064        let key = CommentKey {
2065            object_id,
2066            sub_component,
2067        };
2068        let value = comment.map(|c| CommentValue { comment: c });
2069        self.comments.set(key, value, self.op_id)?;
2070
2071        Ok(())
2072    }
2073
2074    pub fn drop_comments(
2075        &mut self,
2076        object_ids: &BTreeSet<CommentObjectId>,
2077    ) -> Result<(), CatalogError> {
2078        if object_ids.is_empty() {
2079            return Ok(());
2080        }
2081
2082        self.comments
2083            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2084        Ok(())
2085    }
2086
2087    pub fn update_source_references(
2088        &mut self,
2089        source_id: CatalogItemId,
2090        references: Vec<SourceReference>,
2091        updated_at: u64,
2092    ) -> Result<(), CatalogError> {
2093        let key = SourceReferencesKey { source_id };
2094        let value = SourceReferencesValue {
2095            references,
2096            updated_at,
2097        };
2098        self.source_references.set(key, Some(value), self.op_id)?;
2099        Ok(())
2100    }
2101
2102    /// Upserts persisted system configuration `name` to `value`.
2103    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2104        let key = ServerConfigurationKey {
2105            name: name.to_string(),
2106        };
2107        let value = ServerConfigurationValue { value };
2108        self.system_configurations
2109            .set(key, Some(value), self.op_id)?;
2110        Ok(())
2111    }
2112
2113    /// Removes persisted system configuration `name`.
2114    pub fn remove_system_config(&mut self, name: &str) {
2115        let key = ServerConfigurationKey {
2116            name: name.to_string(),
2117        };
2118        self.system_configurations
2119            .set(key, None, self.op_id)
2120            .expect("cannot have uniqueness violation");
2121    }
2122
2123    /// Removes all persisted system configurations.
2124    pub fn clear_system_configs(&mut self) {
2125        self.system_configurations.delete(|_k, _v| true, self.op_id);
2126    }
2127
2128    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2129        match self.configs.insert(
2130            ConfigKey { key: key.clone() },
2131            ConfigValue { value },
2132            self.op_id,
2133        ) {
2134            Ok(_) => Ok(()),
2135            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2136        }
2137    }
2138
2139    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2140        self.clusters
2141            .items()
2142            .into_iter()
2143            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2144    }
2145
2146    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2147        self.cluster_replicas
2148            .items()
2149            .into_iter()
2150            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2151    }
2152
2153    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2154        self.roles
2155            .items()
2156            .into_iter()
2157            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2158    }
2159
2160    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2161        self.network_policies
2162            .items()
2163            .into_iter()
2164            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2165    }
2166
2167    pub fn get_system_object_mappings(
2168        &self,
2169    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2170        self.system_gid_mapping
2171            .items()
2172            .into_iter()
2173            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2174    }
2175
2176    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2177        self.schemas
2178            .items()
2179            .into_iter()
2180            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2181    }
2182
2183    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2184        self.system_configurations
2185            .items()
2186            .into_iter()
2187            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2188    }
2189
2190    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2191        let key = SchemaKey { id: *id };
2192        self.schemas
2193            .get(&key)
2194            .map(|v| DurableType::from_key_value(key, v.clone()))
2195    }
2196
2197    pub fn get_introspection_source_indexes(
2198        &self,
2199        cluster_id: ClusterId,
2200    ) -> BTreeMap<&str, (GlobalId, u32)> {
2201        self.introspection_sources
2202            .items()
2203            .into_iter()
2204            .filter(|(k, _v)| k.cluster_id == cluster_id)
2205            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2206            .collect()
2207    }
2208
2209    pub fn get_catalog_content_version(&self) -> Option<&str> {
2210        self.settings
2211            .get(&SettingKey {
2212                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2213            })
2214            .map(|value| &*value.value)
2215    }
2216
2217    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2218        self.settings
2219            .get(&SettingKey {
2220                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2221            })
2222            .map(|value| value.value.clone())
2223    }
2224
2225    /// Commit the current operation within the transaction. This does not cause anything to be
2226    /// written durably, but signals to the current transaction that we are moving on to the next
2227    /// operation.
2228    ///
2229    /// Returns the updates of the committed operation.
2230    #[must_use]
2231    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2232        let updates = self.get_op_updates();
2233        self.commit_op();
2234        updates
2235    }
2236
2237    fn get_op_updates(&self) -> Vec<StateUpdate> {
2238        fn get_collection_op_updates<'a, T>(
2239            table_txn: &'a TableTransaction<T::Key, T::Value>,
2240            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2241            op: Timestamp,
2242        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2243        where
2244            T::Key: Ord + Eq + Clone + Debug,
2245            T::Value: Ord + Clone + Debug,
2246            T: DurableType,
2247        {
2248            table_txn
2249                .pending
2250                .iter()
2251                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2252                .filter_map(move |(k, v)| {
2253                    if v.ts == op {
2254                        let key = k.clone();
2255                        let value = v.value.clone();
2256                        let diff = v.diff.clone().try_into().expect("invalid diff");
2257                        let update = DurableType::from_key_value(key, value);
2258                        let kind = kind_fn(update);
2259                        Some((kind, diff))
2260                    } else {
2261                        None
2262                    }
2263                })
2264        }
2265
2266        fn get_large_collection_op_updates<'a, T>(
2267            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2268            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2269            op: Timestamp,
2270        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2271        where
2272            T::Key: Ord + Eq + Clone + Debug,
2273            T: DurableType<Value = ()>,
2274        {
2275            collection.iter().filter_map(move |(k, diff, ts)| {
2276                if *ts == op {
2277                    let key = k.clone();
2278                    let diff = diff.clone().try_into().expect("invalid diff");
2279                    let update = DurableType::from_key_value(key, ());
2280                    let kind = kind_fn(update);
2281                    Some((kind, diff))
2282                } else {
2283                    None
2284                }
2285            })
2286        }
2287
2288        let Transaction {
2289            durable_catalog: _,
2290            databases,
2291            schemas,
2292            items,
2293            comments,
2294            roles,
2295            role_auth,
2296            clusters,
2297            network_policies,
2298            cluster_replicas,
2299            introspection_sources,
2300            system_gid_mapping,
2301            system_configurations,
2302            default_privileges,
2303            source_references,
2304            system_privileges,
2305            audit_log_updates,
2306            storage_collection_metadata,
2307            unfinalized_shards,
2308            // Not representable as a `StateUpdate`.
2309            id_allocator: _,
2310            configs: _,
2311            settings: _,
2312            txn_wal_shard: _,
2313            upper,
2314            op_id: _,
2315        } = &self;
2316
2317        let updates = std::iter::empty()
2318            .chain(get_collection_op_updates(
2319                roles,
2320                StateUpdateKind::Role,
2321                self.op_id,
2322            ))
2323            .chain(get_collection_op_updates(
2324                role_auth,
2325                StateUpdateKind::RoleAuth,
2326                self.op_id,
2327            ))
2328            .chain(get_collection_op_updates(
2329                databases,
2330                StateUpdateKind::Database,
2331                self.op_id,
2332            ))
2333            .chain(get_collection_op_updates(
2334                schemas,
2335                StateUpdateKind::Schema,
2336                self.op_id,
2337            ))
2338            .chain(get_collection_op_updates(
2339                default_privileges,
2340                StateUpdateKind::DefaultPrivilege,
2341                self.op_id,
2342            ))
2343            .chain(get_collection_op_updates(
2344                system_privileges,
2345                StateUpdateKind::SystemPrivilege,
2346                self.op_id,
2347            ))
2348            .chain(get_collection_op_updates(
2349                system_configurations,
2350                StateUpdateKind::SystemConfiguration,
2351                self.op_id,
2352            ))
2353            .chain(get_collection_op_updates(
2354                clusters,
2355                StateUpdateKind::Cluster,
2356                self.op_id,
2357            ))
2358            .chain(get_collection_op_updates(
2359                network_policies,
2360                StateUpdateKind::NetworkPolicy,
2361                self.op_id,
2362            ))
2363            .chain(get_collection_op_updates(
2364                introspection_sources,
2365                StateUpdateKind::IntrospectionSourceIndex,
2366                self.op_id,
2367            ))
2368            .chain(get_collection_op_updates(
2369                cluster_replicas,
2370                StateUpdateKind::ClusterReplica,
2371                self.op_id,
2372            ))
2373            .chain(get_collection_op_updates(
2374                system_gid_mapping,
2375                StateUpdateKind::SystemObjectMapping,
2376                self.op_id,
2377            ))
2378            .chain(get_collection_op_updates(
2379                items,
2380                StateUpdateKind::Item,
2381                self.op_id,
2382            ))
2383            .chain(get_collection_op_updates(
2384                comments,
2385                StateUpdateKind::Comment,
2386                self.op_id,
2387            ))
2388            .chain(get_collection_op_updates(
2389                source_references,
2390                StateUpdateKind::SourceReferences,
2391                self.op_id,
2392            ))
2393            .chain(get_collection_op_updates(
2394                storage_collection_metadata,
2395                StateUpdateKind::StorageCollectionMetadata,
2396                self.op_id,
2397            ))
2398            .chain(get_collection_op_updates(
2399                unfinalized_shards,
2400                StateUpdateKind::UnfinalizedShard,
2401                self.op_id,
2402            ))
2403            .chain(get_large_collection_op_updates(
2404                audit_log_updates,
2405                StateUpdateKind::AuditLog,
2406                self.op_id,
2407            ))
2408            .map(|(kind, diff)| StateUpdate {
2409                kind,
2410                ts: upper.clone(),
2411                diff,
2412            })
2413            .collect();
2414
2415        updates
2416    }
2417
2418    pub fn is_savepoint(&self) -> bool {
2419        self.durable_catalog.is_savepoint()
2420    }
2421
2422    fn commit_op(&mut self) {
2423        self.op_id += 1;
2424    }
2425
2426    pub fn op_id(&self) -> Timestamp {
2427        self.op_id
2428    }
2429
2430    pub fn upper(&self) -> mz_repr::Timestamp {
2431        self.upper
2432    }
2433
2434    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2435        let audit_log_updates = self
2436            .audit_log_updates
2437            .into_iter()
2438            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2439            .collect();
2440
2441        let txn_batch = TransactionBatch {
2442            databases: self.databases.pending(),
2443            schemas: self.schemas.pending(),
2444            items: self.items.pending(),
2445            comments: self.comments.pending(),
2446            roles: self.roles.pending(),
2447            role_auth: self.role_auth.pending(),
2448            clusters: self.clusters.pending(),
2449            cluster_replicas: self.cluster_replicas.pending(),
2450            network_policies: self.network_policies.pending(),
2451            introspection_sources: self.introspection_sources.pending(),
2452            id_allocator: self.id_allocator.pending(),
2453            configs: self.configs.pending(),
2454            source_references: self.source_references.pending(),
2455            settings: self.settings.pending(),
2456            system_gid_mapping: self.system_gid_mapping.pending(),
2457            system_configurations: self.system_configurations.pending(),
2458            default_privileges: self.default_privileges.pending(),
2459            system_privileges: self.system_privileges.pending(),
2460            storage_collection_metadata: self.storage_collection_metadata.pending(),
2461            unfinalized_shards: self.unfinalized_shards.pending(),
2462            txn_wal_shard: self.txn_wal_shard.pending(),
2463            audit_log_updates,
2464            upper: self.upper,
2465        };
2466        (txn_batch, self.durable_catalog)
2467    }
2468
2469    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2470    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2471    /// before proceeding. In general, this must be fatal to the calling process. We do not
2472    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2473    ///
2474    /// The transaction is committed at `commit_ts`.
2475    ///
2476    /// Returns what the upper was directly after the transaction committed.
2477    ///
2478    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2479    /// catalog is not writeable.
2480    #[mz_ore::instrument(level = "debug")]
2481    pub(crate) async fn commit_internal(
2482        self,
2483        commit_ts: mz_repr::Timestamp,
2484    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2485        let (mut txn_batch, durable_catalog) = self.into_parts();
2486        let TransactionBatch {
2487            databases,
2488            schemas,
2489            items,
2490            comments,
2491            roles,
2492            role_auth,
2493            clusters,
2494            cluster_replicas,
2495            network_policies,
2496            introspection_sources,
2497            id_allocator,
2498            configs,
2499            source_references,
2500            settings,
2501            system_gid_mapping,
2502            system_configurations,
2503            default_privileges,
2504            system_privileges,
2505            storage_collection_metadata,
2506            unfinalized_shards,
2507            txn_wal_shard,
2508            audit_log_updates,
2509            upper,
2510        } = &mut txn_batch;
2511        // Consolidate in memory because it will likely be faster than consolidating after the
2512        // transaction has been made durable.
2513        differential_dataflow::consolidation::consolidate_updates(databases);
2514        differential_dataflow::consolidation::consolidate_updates(schemas);
2515        differential_dataflow::consolidation::consolidate_updates(items);
2516        differential_dataflow::consolidation::consolidate_updates(comments);
2517        differential_dataflow::consolidation::consolidate_updates(roles);
2518        differential_dataflow::consolidation::consolidate_updates(role_auth);
2519        differential_dataflow::consolidation::consolidate_updates(clusters);
2520        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2521        differential_dataflow::consolidation::consolidate_updates(network_policies);
2522        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2523        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2524        differential_dataflow::consolidation::consolidate_updates(configs);
2525        differential_dataflow::consolidation::consolidate_updates(settings);
2526        differential_dataflow::consolidation::consolidate_updates(source_references);
2527        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2528        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2529        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2530        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2531        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2532        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2533        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2534        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2535
2536        assert!(
2537            commit_ts >= *upper,
2538            "expected commit ts, {}, to be greater than or equal to upper, {}",
2539            commit_ts,
2540            upper
2541        );
2542        let upper = durable_catalog
2543            .commit_transaction(txn_batch, commit_ts)
2544            .await?;
2545        Ok((durable_catalog, upper))
2546    }
2547
2548    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2549    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2550    /// before proceeding. In general, this must be fatal to the calling process. We do not
2551    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2552    ///
2553    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2554    /// catalog is not writeable.
2555    ///
2556    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2557    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2558    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2559    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2560    ///
2561    /// An alternative implementation would be for the caller to explicitly consume their updates
2562    /// after committing and only then apply the updates in-memory. While this removes assumptions
2563    /// about the caller in this method, in practice it results in duplicate work on every commit.
2564    #[mz_ore::instrument(level = "debug")]
2565    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2566        let op_updates = self.get_op_updates();
2567        assert!(
2568            op_updates.is_empty(),
2569            "unconsumed transaction updates: {op_updates:?}"
2570        );
2571
2572        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2573        // Drain all the updates from the commit since it is assumed that they were already applied.
2574        let updates = durable_storage.sync_updates(upper).await?;
2575        // Writable and savepoint catalogs should have consumed all updates before committing a
2576        // transaction, otherwise the commit was performed with an out of date state.
2577        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2578        // updates before committing.
2579        soft_assert_no_log!(
2580            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2581            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2582        );
2583        Ok(())
2584    }
2585}
2586
2587use crate::durable::async_trait;
2588
2589use super::objects::{RoleAuthKey, RoleAuthValue};
2590
2591#[async_trait]
2592impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2593    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2594        self.storage_collection_metadata
2595            .items()
2596            .into_iter()
2597            .map(
2598                |(
2599                    StorageCollectionMetadataKey { id },
2600                    StorageCollectionMetadataValue { shard },
2601                )| { (*id, shard.clone()) },
2602            )
2603            .collect()
2604    }
2605
2606    fn insert_collection_metadata(
2607        &mut self,
2608        metadata: BTreeMap<GlobalId, ShardId>,
2609    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2610        for (id, shard) in metadata {
2611            self.storage_collection_metadata
2612                .insert(
2613                    StorageCollectionMetadataKey { id },
2614                    StorageCollectionMetadataValue {
2615                        shard: shard.clone(),
2616                    },
2617                    self.op_id,
2618                )
2619                .map_err(|err| match err {
2620                    DurableCatalogError::DuplicateKey => {
2621                        StorageError::CollectionMetadataAlreadyExists(id)
2622                    }
2623                    DurableCatalogError::UniquenessViolation => {
2624                        StorageError::PersistShardAlreadyInUse(shard)
2625                    }
2626                    err => StorageError::Generic(anyhow::anyhow!(err)),
2627                })?;
2628        }
2629        Ok(())
2630    }
2631
2632    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2633        let ks: Vec<_> = ids
2634            .into_iter()
2635            .map(|id| StorageCollectionMetadataKey { id })
2636            .collect();
2637        self.storage_collection_metadata
2638            .delete_by_keys(ks, self.op_id)
2639            .into_iter()
2640            .map(
2641                |(
2642                    StorageCollectionMetadataKey { id },
2643                    StorageCollectionMetadataValue { shard },
2644                )| (id, shard),
2645            )
2646            .collect()
2647    }
2648
2649    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2650        self.unfinalized_shards
2651            .items()
2652            .into_iter()
2653            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2654            .collect()
2655    }
2656
2657    fn insert_unfinalized_shards(
2658        &mut self,
2659        s: BTreeSet<ShardId>,
2660    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2661        for shard in s {
2662            match self
2663                .unfinalized_shards
2664                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2665            {
2666                // Inserting duplicate keys has no effect.
2667                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2668                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2669            };
2670        }
2671        Ok(())
2672    }
2673
2674    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2675        let ks: Vec<_> = shards
2676            .into_iter()
2677            .map(|shard| UnfinalizedShardKey { shard })
2678            .collect();
2679        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2680    }
2681
2682    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2683        self.txn_wal_shard
2684            .values()
2685            .iter()
2686            .next()
2687            .map(|TxnWalShardValue { shard }| *shard)
2688    }
2689
2690    fn write_txn_wal_shard(
2691        &mut self,
2692        shard: ShardId,
2693    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2694        self.txn_wal_shard
2695            .insert((), TxnWalShardValue { shard }, self.op_id)
2696            .map_err(|err| match err {
2697                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2698                err => StorageError::Generic(anyhow::anyhow!(err)),
2699            })
2700    }
2701}
2702
2703/// Describes a set of changes to apply as the result of a catalog transaction.
2704#[derive(Debug, Clone, Default, PartialEq)]
2705pub struct TransactionBatch {
2706    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2707    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2708    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2709    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2710    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2711    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2712    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2713    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2714    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2715    pub(crate) introspection_sources: Vec<(
2716        proto::ClusterIntrospectionSourceIndexKey,
2717        proto::ClusterIntrospectionSourceIndexValue,
2718        Diff,
2719    )>,
2720    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2721    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2722    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2723    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2724    pub(crate) system_configurations: Vec<(
2725        proto::ServerConfigurationKey,
2726        proto::ServerConfigurationValue,
2727        Diff,
2728    )>,
2729    pub(crate) default_privileges: Vec<(
2730        proto::DefaultPrivilegesKey,
2731        proto::DefaultPrivilegesValue,
2732        Diff,
2733    )>,
2734    pub(crate) source_references: Vec<(
2735        proto::SourceReferencesKey,
2736        proto::SourceReferencesValue,
2737        Diff,
2738    )>,
2739    pub(crate) system_privileges: Vec<(
2740        proto::SystemPrivilegesKey,
2741        proto::SystemPrivilegesValue,
2742        Diff,
2743    )>,
2744    pub(crate) storage_collection_metadata: Vec<(
2745        proto::StorageCollectionMetadataKey,
2746        proto::StorageCollectionMetadataValue,
2747        Diff,
2748    )>,
2749    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2750    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2751    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2752    /// The upper of the catalog when the transaction started.
2753    pub(crate) upper: mz_repr::Timestamp,
2754}
2755
2756impl TransactionBatch {
2757    pub fn is_empty(&self) -> bool {
2758        let TransactionBatch {
2759            databases,
2760            schemas,
2761            items,
2762            comments,
2763            roles,
2764            role_auth,
2765            clusters,
2766            cluster_replicas,
2767            network_policies,
2768            introspection_sources,
2769            id_allocator,
2770            configs,
2771            settings,
2772            source_references,
2773            system_gid_mapping,
2774            system_configurations,
2775            default_privileges,
2776            system_privileges,
2777            storage_collection_metadata,
2778            unfinalized_shards,
2779            txn_wal_shard,
2780            audit_log_updates,
2781            upper: _,
2782        } = self;
2783        databases.is_empty()
2784            && schemas.is_empty()
2785            && items.is_empty()
2786            && comments.is_empty()
2787            && roles.is_empty()
2788            && role_auth.is_empty()
2789            && clusters.is_empty()
2790            && cluster_replicas.is_empty()
2791            && network_policies.is_empty()
2792            && introspection_sources.is_empty()
2793            && id_allocator.is_empty()
2794            && configs.is_empty()
2795            && settings.is_empty()
2796            && source_references.is_empty()
2797            && system_gid_mapping.is_empty()
2798            && system_configurations.is_empty()
2799            && default_privileges.is_empty()
2800            && system_privileges.is_empty()
2801            && storage_collection_metadata.is_empty()
2802            && unfinalized_shards.is_empty()
2803            && txn_wal_shard.is_empty()
2804            && audit_log_updates.is_empty()
2805    }
2806}
2807
2808#[derive(Debug, Clone, PartialEq, Eq)]
2809struct TransactionUpdate<V> {
2810    value: V,
2811    ts: Timestamp,
2812    diff: Diff,
2813}
2814
2815/// Utility trait to check for plan validity.
2816trait UniqueName {
2817    /// Does the item have a unique name? If yes, we can check for name equality in validity
2818    /// checking.
2819    const HAS_UNIQUE_NAME: bool;
2820    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2821    fn unique_name(&self) -> &str;
2822}
2823
2824mod unique_name {
2825    use crate::durable::objects::*;
2826
2827    macro_rules! impl_unique_name {
2828        ($($t:ty),* $(,)?) => {
2829            $(
2830                impl crate::durable::transaction::UniqueName for $t {
2831                    const HAS_UNIQUE_NAME: bool = true;
2832                    fn unique_name(&self) -> &str {
2833                        &self.name
2834                    }
2835                }
2836            )*
2837        };
2838    }
2839
2840    macro_rules! impl_no_unique_name {
2841        ($($t:ty),* $(,)?) => {
2842            $(
2843                impl crate::durable::transaction::UniqueName for $t {
2844                    const HAS_UNIQUE_NAME: bool = false;
2845                    fn unique_name(&self) -> &str {
2846                       ""
2847                    }
2848                }
2849            )*
2850        };
2851    }
2852
2853    impl_unique_name! {
2854        ClusterReplicaValue,
2855        ClusterValue,
2856        DatabaseValue,
2857        ItemValue,
2858        NetworkPolicyValue,
2859        RoleValue,
2860        SchemaValue,
2861    }
2862
2863    impl_no_unique_name!(
2864        (),
2865        ClusterIntrospectionSourceIndexValue,
2866        CommentValue,
2867        ConfigValue,
2868        DefaultPrivilegesValue,
2869        GidMappingValue,
2870        IdAllocValue,
2871        ServerConfigurationValue,
2872        SettingValue,
2873        SourceReferencesValue,
2874        StorageCollectionMetadataValue,
2875        SystemPrivilegesValue,
2876        TxnWalShardValue,
2877        RoleAuthValue,
2878    );
2879
2880    #[cfg(test)]
2881    mod test {
2882        impl_no_unique_name!(String,);
2883    }
2884}
2885
2886/// TableTransaction emulates some features of a typical SQL transaction over
2887/// table for a Collection.
2888///
2889/// It supports:
2890/// - uniqueness constraints
2891/// - transactional reads and writes (including read-your-writes before commit)
2892///
2893/// `K` is the primary key type. Multiple entries with the same key are disallowed.
2894/// `V` is the an arbitrary value type.
2895#[derive(Debug)]
2896struct TableTransaction<K, V> {
2897    initial: BTreeMap<K, V>,
2898    // The desired updates to keys after commit.
2899    // Invariant: Value is sorted by `ts`.
2900    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2901    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2902}
2903
2904impl<K, V> TableTransaction<K, V>
2905where
2906    K: Ord + Eq + Clone + Debug,
2907    V: Ord + Clone + Debug + UniqueName,
2908{
2909    /// Create a new TableTransaction with initial data.
2910    ///
2911    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
2912    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
2913    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
2914    /// over.
2915    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2916    where
2917        K: RustType<KP>,
2918        V: RustType<VP>,
2919    {
2920        let initial = initial
2921            .into_iter()
2922            .map(RustType::from_proto)
2923            .collect::<Result<_, _>>()?;
2924
2925        Ok(Self {
2926            initial,
2927            pending: BTreeMap::new(),
2928            uniqueness_violation: None,
2929        })
2930    }
2931
2932    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
2933    /// that determines whether there is a uniqueness violation among two values.
2934    fn new_with_uniqueness_fn<KP, VP>(
2935        initial: BTreeMap<KP, VP>,
2936        uniqueness_violation: fn(a: &V, b: &V) -> bool,
2937    ) -> Result<Self, TryFromProtoError>
2938    where
2939        K: RustType<KP>,
2940        V: RustType<VP>,
2941    {
2942        let initial = initial
2943            .into_iter()
2944            .map(RustType::from_proto)
2945            .collect::<Result<_, _>>()?;
2946
2947        Ok(Self {
2948            initial,
2949            pending: BTreeMap::new(),
2950            uniqueness_violation: Some(uniqueness_violation),
2951        })
2952    }
2953
2954    /// Consumes and returns the pending changes and their diffs. `Diff` is
2955    /// guaranteed to be 1 or -1.
2956    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2957    where
2958        K: RustType<KP>,
2959        V: RustType<VP>,
2960    {
2961        soft_assert_no_log!(self.verify().is_ok());
2962        // Pending describes the desired final state for some keys. K,V pairs should be
2963        // retracted if they already exist and were deleted or are being updated.
2964        self.pending
2965            .into_iter()
2966            .flat_map(|(k, v)| {
2967                let mut v: Vec<_> = v
2968                    .into_iter()
2969                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2970                    .collect();
2971                differential_dataflow::consolidation::consolidate(&mut v);
2972                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2973            })
2974            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2975            .collect()
2976    }
2977
2978    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
2979    ///
2980    /// Runtime is O(n^2), where n is the number of items in `self`, if
2981    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
2982    fn verify(&self) -> Result<(), DurableCatalogError> {
2983        if let Some(uniqueness_violation) = self.uniqueness_violation {
2984            // Compare each value to each other value and ensure they are unique.
2985            let items = self.values();
2986            if V::HAS_UNIQUE_NAME {
2987                let by_name: BTreeMap<_, _> = items
2988                    .iter()
2989                    .enumerate()
2990                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2991                    .collect();
2992                for (i, vi) in items.iter().enumerate() {
2993                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
2994                        if i != *j && uniqueness_violation(vi, *vj) {
2995                            return Err(DurableCatalogError::UniquenessViolation);
2996                        }
2997                    }
2998                }
2999            } else {
3000                for (i, vi) in items.iter().enumerate() {
3001                    for (j, vj) in items.iter().enumerate() {
3002                        if i != j && uniqueness_violation(vi, vj) {
3003                            return Err(DurableCatalogError::UniquenessViolation);
3004                        }
3005                    }
3006                }
3007            }
3008        }
3009        soft_assert_no_log!(
3010            self.pending
3011                .values()
3012                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3013            "pending should be sorted by timestamp: {:?}",
3014            self.pending
3015        );
3016        Ok(())
3017    }
3018
3019    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3020    ///
3021    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3022    /// items in `keys`.
3023    fn verify_keys<'a>(
3024        &self,
3025        keys: impl IntoIterator<Item = &'a K>,
3026    ) -> Result<(), DurableCatalogError>
3027    where
3028        K: 'a,
3029    {
3030        if let Some(uniqueness_violation) = self.uniqueness_violation {
3031            let entries: Vec<_> = keys
3032                .into_iter()
3033                .filter_map(|key| self.get(key).map(|value| (key, value)))
3034                .collect();
3035            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3036            for (ki, vi) in self.items() {
3037                for (kj, vj) in &entries {
3038                    if ki != *kj && uniqueness_violation(vi, vj) {
3039                        return Err(DurableCatalogError::UniquenessViolation);
3040                    }
3041                }
3042            }
3043        }
3044        soft_assert_no_log!(self.verify().is_ok());
3045        Ok(())
3046    }
3047
3048    /// Iterates over the items viewable in the current transaction in arbitrary
3049    /// order and applies `f` on all key, value pairs.
3050    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3051        let mut seen = BTreeSet::new();
3052        for k in self.pending.keys() {
3053            seen.insert(k);
3054            let v = self.get(k);
3055            // Deleted items don't exist so shouldn't be visited, but still suppress
3056            // visiting the key later.
3057            if let Some(v) = v {
3058                f(k, v);
3059            }
3060        }
3061        for (k, v) in self.initial.iter() {
3062            // Add on initial items that don't have updates.
3063            if !seen.contains(k) {
3064                f(k, v);
3065            }
3066        }
3067    }
3068
3069    /// Returns the current value of `k`.
3070    fn get(&self, k: &K) -> Option<&V> {
3071        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3072        let mut updates = Vec::with_capacity(pending.len() + 1);
3073        if let Some(initial) = self.initial.get(k) {
3074            updates.push((initial, Diff::ONE));
3075        }
3076        updates.extend(
3077            pending
3078                .into_iter()
3079                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3080        );
3081
3082        differential_dataflow::consolidation::consolidate(&mut updates);
3083        assert!(updates.len() <= 1);
3084        updates.into_iter().next().map(|(v, _)| v)
3085    }
3086
3087    /// Returns the items viewable in the current transaction. The items are
3088    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3089    /// [`Self::for_values`].
3090    // Used by tests.
3091    #[cfg(test)]
3092    fn items_cloned(&self) -> BTreeMap<K, V> {
3093        let mut items = BTreeMap::new();
3094        self.for_values(|k, v| {
3095            items.insert(k.clone(), v.clone());
3096        });
3097        items
3098    }
3099
3100    /// Returns the items viewable in the current transaction as references. Returns a map
3101    /// of references.
3102    fn items(&self) -> BTreeMap<&K, &V> {
3103        let mut items = BTreeMap::new();
3104        self.for_values(|k, v| {
3105            items.insert(k, v);
3106        });
3107        items
3108    }
3109
3110    /// Returns the values viewable in the current transaction as references.
3111    fn values(&self) -> BTreeSet<&V> {
3112        let mut items = BTreeSet::new();
3113        self.for_values(|_, v| {
3114            items.insert(v);
3115        });
3116        items
3117    }
3118
3119    /// Returns the number of items viewable in the current transaction.
3120    fn len(&self) -> usize {
3121        let mut count = 0;
3122        self.for_values(|_, _| {
3123            count += 1;
3124        });
3125        count
3126    }
3127
3128    /// Iterates over the items viewable in the current transaction, and provides a
3129    /// map where additional pending items can be inserted, which will be appended
3130    /// to current pending items. Does not verify uniqueness.
3131    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3132        &mut self,
3133        mut f: F,
3134    ) {
3135        let mut pending = BTreeMap::new();
3136        self.for_values(|k, v| f(&mut pending, k, v));
3137        for (k, updates) in pending {
3138            self.pending.entry(k).or_default().extend(updates);
3139        }
3140    }
3141
3142    /// Inserts a new k,v pair.
3143    ///
3144    /// Returns an error if the uniqueness check failed or the key already exists.
3145    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3146        let mut violation = None;
3147        self.for_values(|for_k, for_v| {
3148            if &k == for_k {
3149                violation = Some(DurableCatalogError::DuplicateKey);
3150            }
3151            if let Some(uniqueness_violation) = self.uniqueness_violation {
3152                if uniqueness_violation(for_v, &v) {
3153                    violation = Some(DurableCatalogError::UniquenessViolation);
3154                }
3155            }
3156        });
3157        if let Some(violation) = violation {
3158            return Err(violation);
3159        }
3160        self.pending.entry(k).or_default().push(TransactionUpdate {
3161            value: v,
3162            ts,
3163            diff: Diff::ONE,
3164        });
3165        soft_assert_no_log!(self.verify().is_ok());
3166        Ok(())
3167    }
3168
3169    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3170    /// value should be updated, otherwise `None`. Returns the number of changed
3171    /// entries.
3172    ///
3173    /// Returns an error if the uniqueness check failed.
3174    ///
3175    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3176    /// better performance.
3177    fn update<F: Fn(&K, &V) -> Option<V>>(
3178        &mut self,
3179        f: F,
3180        ts: Timestamp,
3181    ) -> Result<Diff, DurableCatalogError> {
3182        let mut changed = Diff::ZERO;
3183        let mut keys = BTreeSet::new();
3184        // Keep a copy of pending in case of uniqueness violation.
3185        let pending = self.pending.clone();
3186        self.for_values_mut(|p, k, v| {
3187            if let Some(next) = f(k, v) {
3188                changed += Diff::ONE;
3189                keys.insert(k.clone());
3190                let updates = p.entry(k.clone()).or_default();
3191                updates.push(TransactionUpdate {
3192                    value: v.clone(),
3193                    ts,
3194                    diff: Diff::MINUS_ONE,
3195                });
3196                updates.push(TransactionUpdate {
3197                    value: next,
3198                    ts,
3199                    diff: Diff::ONE,
3200                });
3201            }
3202        });
3203        // Check for uniqueness violation.
3204        if let Err(err) = self.verify_keys(&keys) {
3205            self.pending = pending;
3206            Err(err)
3207        } else {
3208            Ok(changed)
3209        }
3210    }
3211
3212    /// Updates `k`, `v` pair if `k` already exists in `self`.
3213    ///
3214    /// Returns `true` if `k` was updated, `false` otherwise.
3215    /// Returns an error if the uniqueness check failed.
3216    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3217        if let Some(cur_v) = self.get(&k) {
3218            if v != *cur_v {
3219                self.set(k, Some(v), ts)?;
3220            }
3221            Ok(true)
3222        } else {
3223            Ok(false)
3224        }
3225    }
3226
3227    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3228    ///
3229    /// Returns the number of changed entries.
3230    /// Returns an error if the uniqueness check failed.
3231    fn update_by_keys(
3232        &mut self,
3233        kvs: impl IntoIterator<Item = (K, V)>,
3234        ts: Timestamp,
3235    ) -> Result<Diff, DurableCatalogError> {
3236        let kvs: Vec<_> = kvs
3237            .into_iter()
3238            .filter_map(|(k, v)| match self.get(&k) {
3239                // Record if updating this entry would be a no-op.
3240                Some(cur_v) => Some((*cur_v == v, k, v)),
3241                None => None,
3242            })
3243            .collect();
3244        let changed = kvs.len();
3245        let changed =
3246            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3247        let kvs = kvs
3248            .into_iter()
3249            // Filter out no-ops to save some work.
3250            .filter(|(no_op, _, _)| !no_op)
3251            .map(|(_, k, v)| (k, Some(v)))
3252            .collect();
3253        self.set_many(kvs, ts)?;
3254        Ok(changed)
3255    }
3256
3257    /// Set the value for a key. Returns the previous entry if the key existed,
3258    /// otherwise None.
3259    ///
3260    /// Returns an error if the uniqueness check failed.
3261    ///
3262    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3263    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3264        let prev = self.get(&k).cloned();
3265        let entry = self.pending.entry(k.clone()).or_default();
3266        let restore_len = entry.len();
3267
3268        match (v, prev.clone()) {
3269            (Some(v), Some(prev)) => {
3270                entry.push(TransactionUpdate {
3271                    value: prev,
3272                    ts,
3273                    diff: Diff::MINUS_ONE,
3274                });
3275                entry.push(TransactionUpdate {
3276                    value: v,
3277                    ts,
3278                    diff: Diff::ONE,
3279                });
3280            }
3281            (Some(v), None) => {
3282                entry.push(TransactionUpdate {
3283                    value: v,
3284                    ts,
3285                    diff: Diff::ONE,
3286                });
3287            }
3288            (None, Some(prev)) => {
3289                entry.push(TransactionUpdate {
3290                    value: prev,
3291                    ts,
3292                    diff: Diff::MINUS_ONE,
3293                });
3294            }
3295            (None, None) => {}
3296        }
3297
3298        // Check for uniqueness violation.
3299        if let Err(err) = self.verify_keys([&k]) {
3300            // Revert self.pending to the state it was in before calling this
3301            // function.
3302            let pending = self.pending.get_mut(&k).expect("inserted above");
3303            pending.truncate(restore_len);
3304            Err(err)
3305        } else {
3306            Ok(prev)
3307        }
3308    }
3309
3310    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3311    /// otherwise None.
3312    ///
3313    /// Returns an error if any uniqueness check failed.
3314    fn set_many(
3315        &mut self,
3316        kvs: BTreeMap<K, Option<V>>,
3317        ts: Timestamp,
3318    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3319        if kvs.is_empty() {
3320            return Ok(BTreeMap::new());
3321        }
3322
3323        let mut prevs = BTreeMap::new();
3324        let mut restores = BTreeMap::new();
3325
3326        for (k, v) in kvs {
3327            let prev = self.get(&k).cloned();
3328            let entry = self.pending.entry(k.clone()).or_default();
3329            restores.insert(k.clone(), entry.len());
3330
3331            match (v, prev.clone()) {
3332                (Some(v), Some(prev)) => {
3333                    entry.push(TransactionUpdate {
3334                        value: prev,
3335                        ts,
3336                        diff: Diff::MINUS_ONE,
3337                    });
3338                    entry.push(TransactionUpdate {
3339                        value: v,
3340                        ts,
3341                        diff: Diff::ONE,
3342                    });
3343                }
3344                (Some(v), None) => {
3345                    entry.push(TransactionUpdate {
3346                        value: v,
3347                        ts,
3348                        diff: Diff::ONE,
3349                    });
3350                }
3351                (None, Some(prev)) => {
3352                    entry.push(TransactionUpdate {
3353                        value: prev,
3354                        ts,
3355                        diff: Diff::MINUS_ONE,
3356                    });
3357                }
3358                (None, None) => {}
3359            }
3360
3361            prevs.insert(k, prev);
3362        }
3363
3364        // Check for uniqueness violation.
3365        if let Err(err) = self.verify_keys(prevs.keys()) {
3366            for (k, restore_len) in restores {
3367                // Revert self.pending to the state it was in before calling this
3368                // function.
3369                let pending = self.pending.get_mut(&k).expect("inserted above");
3370                pending.truncate(restore_len);
3371            }
3372            Err(err)
3373        } else {
3374            Ok(prevs)
3375        }
3376    }
3377
3378    /// Deletes items for which `f` returns true. Returns the keys and values of
3379    /// the deleted entries.
3380    ///
3381    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3382    /// better performance.
3383    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3384        let mut deleted = Vec::new();
3385        self.for_values_mut(|p, k, v| {
3386            if f(k, v) {
3387                deleted.push((k.clone(), v.clone()));
3388                p.entry(k.clone()).or_default().push(TransactionUpdate {
3389                    value: v.clone(),
3390                    ts,
3391                    diff: Diff::MINUS_ONE,
3392                });
3393            }
3394        });
3395        soft_assert_no_log!(self.verify().is_ok());
3396        deleted
3397    }
3398
3399    /// Deletes item with key `k`.
3400    ///
3401    /// Returns the value of the deleted entry, if it existed.
3402    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3403        self.set(k, None, ts)
3404            .expect("deleting an entry cannot violate uniqueness")
3405    }
3406
3407    /// Deletes items with key in `ks`.
3408    ///
3409    /// Returns the keys and values of the deleted entries.
3410    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3411        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3412        let prevs = self
3413            .set_many(kvs, ts)
3414            .expect("deleting entries cannot violate uniqueness");
3415        prevs
3416            .into_iter()
3417            .filter_map(|(k, v)| v.map(|v| (k, v)))
3418            .collect()
3419    }
3420}
3421
3422#[cfg(test)]
3423#[allow(clippy::unwrap_used)]
3424mod tests {
3425    use super::*;
3426
3427    use mz_ore::now::SYSTEM_TIME;
3428    use mz_ore::{assert_none, assert_ok};
3429    use mz_persist_client::cache::PersistClientCache;
3430    use mz_persist_types::PersistLocation;
3431    use semver::Version;
3432
3433    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3434    use crate::memory;
3435
3436    #[mz_ore::test]
3437    fn test_table_transaction_simple() {
3438        fn uniqueness_violation(a: &String, b: &String) -> bool {
3439            a == b
3440        }
3441        let mut table = TableTransaction::new_with_uniqueness_fn(
3442            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3443            uniqueness_violation,
3444        )
3445        .unwrap();
3446
3447        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3448        // for DurableCatalogError.
3449        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3450        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3451        assert!(
3452            table
3453                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3454                .is_err()
3455        );
3456        assert!(
3457            table
3458                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3459                .is_err()
3460        );
3461    }
3462
3463    #[mz_ore::test]
3464    fn test_table_transaction() {
3465        fn uniqueness_violation(a: &String, b: &String) -> bool {
3466            a == b
3467        }
3468        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3469
3470        fn commit(
3471            table: &mut BTreeMap<Vec<u8>, String>,
3472            mut pending: Vec<(Vec<u8>, String, Diff)>,
3473        ) {
3474            // Sort by diff so that we process retractions first.
3475            pending.sort_by(|a, b| a.2.cmp(&b.2));
3476            for (k, v, diff) in pending {
3477                if diff == Diff::MINUS_ONE {
3478                    let prev = table.remove(&k);
3479                    assert_eq!(prev, Some(v));
3480                } else if diff == Diff::ONE {
3481                    let prev = table.insert(k, v);
3482                    assert_eq!(prev, None);
3483                } else {
3484                    panic!("unexpected diff: {diff}");
3485                }
3486            }
3487        }
3488
3489        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3490        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3491        let mut table_txn =
3492            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3493        assert_eq!(table_txn.items_cloned(), table);
3494        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3495        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3496        assert_eq!(
3497            table_txn.items_cloned(),
3498            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3499        );
3500        assert_eq!(
3501            table_txn
3502                .update(|_k, _v| Some("v3".to_string()), 2)
3503                .unwrap(),
3504            Diff::ONE
3505        );
3506
3507        // Uniqueness violation.
3508        table_txn
3509            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3510            .unwrap_err();
3511
3512        table_txn
3513            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3514            .unwrap();
3515        assert_eq!(
3516            table_txn.items_cloned(),
3517            BTreeMap::from([
3518                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3519                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3520            ])
3521        );
3522        let err = table_txn
3523            .update(|_k, _v| Some("v1".to_string()), 5)
3524            .unwrap_err();
3525        assert!(
3526            matches!(err, DurableCatalogError::UniquenessViolation),
3527            "unexpected err: {err:?}"
3528        );
3529        let pending = table_txn.pending();
3530        assert_eq!(
3531            pending,
3532            vec![
3533                (
3534                    1i64.to_le_bytes().to_vec(),
3535                    "v1".to_string(),
3536                    Diff::MINUS_ONE
3537                ),
3538                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3539                (
3540                    2i64.to_le_bytes().to_vec(),
3541                    "v2".to_string(),
3542                    Diff::MINUS_ONE
3543                ),
3544                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3545            ]
3546        );
3547        commit(&mut table, pending);
3548        assert_eq!(
3549            table,
3550            BTreeMap::from([
3551                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3552                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3553            ])
3554        );
3555
3556        let mut table_txn =
3557            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3558        // Deleting then creating an item that has a uniqueness violation should work.
3559        assert_eq!(
3560            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3561            1
3562        );
3563        table_txn
3564            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3565            .unwrap();
3566        // Uniqueness violation in value.
3567        table_txn
3568            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3569            .unwrap_err();
3570        // Key already exists, expect error.
3571        table_txn
3572            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3573            .unwrap_err();
3574        assert_eq!(
3575            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3576            1
3577        );
3578        // Both the inserts work now because the key and uniqueness violation are gone.
3579        table_txn
3580            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3581            .unwrap();
3582        table_txn
3583            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3584            .unwrap();
3585        let pending = table_txn.pending();
3586        assert_eq!(
3587            pending,
3588            vec![
3589                (
3590                    1i64.to_le_bytes().to_vec(),
3591                    "v3".to_string(),
3592                    Diff::MINUS_ONE
3593                ),
3594                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3595                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3596            ]
3597        );
3598        commit(&mut table, pending);
3599        assert_eq!(
3600            table,
3601            BTreeMap::from([
3602                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3603                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3604                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3605            ])
3606        );
3607
3608        let mut table_txn =
3609            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3610        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3611        table_txn
3612            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3613            .unwrap();
3614
3615        commit(&mut table, table_txn.pending());
3616        assert_eq!(
3617            table,
3618            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3619        );
3620
3621        let mut table_txn =
3622            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3623        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3624        table_txn
3625            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3626            .unwrap();
3627        commit(&mut table, table_txn.pending());
3628        assert_eq!(
3629            table,
3630            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3631        );
3632
3633        // Verify we don't try to delete v3 or v4 during commit.
3634        let mut table_txn =
3635            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3636        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3637        table_txn
3638            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3639            .unwrap();
3640        table_txn
3641            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3642            .unwrap_err();
3643        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3644        table_txn
3645            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3646            .unwrap();
3647        commit(&mut table, table_txn.pending());
3648        assert_eq!(
3649            table.clone().into_iter().collect::<Vec<_>>(),
3650            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3651        );
3652
3653        // Test `set`.
3654        let mut table_txn =
3655            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3656        // Uniqueness violation.
3657        table_txn
3658            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3659            .unwrap_err();
3660        table_txn
3661            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3662            .unwrap();
3663        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3664        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3665        let pending = table_txn.pending();
3666        assert_eq!(
3667            pending,
3668            vec![
3669                (
3670                    1i64.to_le_bytes().to_vec(),
3671                    "v5".to_string(),
3672                    Diff::MINUS_ONE
3673                ),
3674                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3675            ]
3676        );
3677        commit(&mut table, pending);
3678        assert_eq!(
3679            table,
3680            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3681        );
3682
3683        // Duplicate `set`.
3684        let mut table_txn =
3685            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3686        table_txn
3687            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3688            .unwrap();
3689        let pending = table_txn.pending::<Vec<u8>, String>();
3690        assert!(pending.is_empty());
3691
3692        // Test `set_many`.
3693        let mut table_txn =
3694            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3695        // Uniqueness violation.
3696        table_txn
3697            .set_many(
3698                BTreeMap::from([
3699                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3700                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3701                ]),
3702                0,
3703            )
3704            .unwrap_err();
3705        table_txn
3706            .set_many(
3707                BTreeMap::from([
3708                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3709                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3710                ]),
3711                1,
3712            )
3713            .unwrap();
3714        table_txn
3715            .set_many(
3716                BTreeMap::from([
3717                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3718                    (3i64.to_le_bytes().to_vec(), None),
3719                ]),
3720                2,
3721            )
3722            .unwrap();
3723        let pending = table_txn.pending();
3724        assert_eq!(
3725            pending,
3726            vec![
3727                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3728                (
3729                    3i64.to_le_bytes().to_vec(),
3730                    "v6".to_string(),
3731                    Diff::MINUS_ONE
3732                ),
3733                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3734            ]
3735        );
3736        commit(&mut table, pending);
3737        assert_eq!(
3738            table,
3739            BTreeMap::from([
3740                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3741                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3742            ])
3743        );
3744
3745        // Duplicate `set_many`.
3746        let mut table_txn =
3747            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3748        table_txn
3749            .set_many(
3750                BTreeMap::from([
3751                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3752                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3753                ]),
3754                0,
3755            )
3756            .unwrap();
3757        let pending = table_txn.pending::<Vec<u8>, String>();
3758        assert!(pending.is_empty());
3759        commit(&mut table, pending);
3760        assert_eq!(
3761            table,
3762            BTreeMap::from([
3763                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3764                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3765            ])
3766        );
3767
3768        // Test `update_by_key`
3769        let mut table_txn =
3770            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3771        // Uniqueness violation.
3772        table_txn
3773            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3774            .unwrap_err();
3775        assert!(
3776            table_txn
3777                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3778                .unwrap()
3779        );
3780        assert!(
3781            !table_txn
3782                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3783                .unwrap()
3784        );
3785        let pending = table_txn.pending();
3786        assert_eq!(
3787            pending,
3788            vec![
3789                (
3790                    1i64.to_le_bytes().to_vec(),
3791                    "v6".to_string(),
3792                    Diff::MINUS_ONE
3793                ),
3794                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3795            ]
3796        );
3797        commit(&mut table, pending);
3798        assert_eq!(
3799            table,
3800            BTreeMap::from([
3801                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3802                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3803            ])
3804        );
3805
3806        // Duplicate `update_by_key`.
3807        let mut table_txn =
3808            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3809        assert!(
3810            table_txn
3811                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3812                .unwrap()
3813        );
3814        let pending = table_txn.pending::<Vec<u8>, String>();
3815        assert!(pending.is_empty());
3816        commit(&mut table, pending);
3817        assert_eq!(
3818            table,
3819            BTreeMap::from([
3820                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3821                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3822            ])
3823        );
3824
3825        // Test `update_by_keys`
3826        let mut table_txn =
3827            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3828        // Uniqueness violation.
3829        table_txn
3830            .update_by_keys(
3831                [
3832                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3833                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3834                ],
3835                0,
3836            )
3837            .unwrap_err();
3838        let n = table_txn
3839            .update_by_keys(
3840                [
3841                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3842                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3843                ],
3844                1,
3845            )
3846            .unwrap();
3847        assert_eq!(n, Diff::ONE);
3848        let n = table_txn
3849            .update_by_keys(
3850                [
3851                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3852                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3853                ],
3854                2,
3855            )
3856            .unwrap();
3857        assert_eq!(n, Diff::ZERO);
3858        let pending = table_txn.pending();
3859        assert_eq!(
3860            pending,
3861            vec![
3862                (
3863                    1i64.to_le_bytes().to_vec(),
3864                    "v8".to_string(),
3865                    Diff::MINUS_ONE
3866                ),
3867                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3868            ]
3869        );
3870        commit(&mut table, pending);
3871        assert_eq!(
3872            table,
3873            BTreeMap::from([
3874                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3875                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3876            ])
3877        );
3878
3879        // Duplicate `update_by_keys`.
3880        let mut table_txn =
3881            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3882        let n = table_txn
3883            .update_by_keys(
3884                [
3885                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3886                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3887                ],
3888                0,
3889            )
3890            .unwrap();
3891        assert_eq!(n, Diff::from(2));
3892        let pending = table_txn.pending::<Vec<u8>, String>();
3893        assert!(pending.is_empty());
3894        commit(&mut table, pending);
3895        assert_eq!(
3896            table,
3897            BTreeMap::from([
3898                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3899                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3900            ])
3901        );
3902
3903        // Test `delete_by_key`
3904        let mut table_txn =
3905            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3906        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3907        assert_eq!(prev, Some("v9".to_string()));
3908        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3909        assert_none!(prev);
3910        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3911        assert_none!(prev);
3912        let pending = table_txn.pending();
3913        assert_eq!(
3914            pending,
3915            vec![(
3916                1i64.to_le_bytes().to_vec(),
3917                "v9".to_string(),
3918                Diff::MINUS_ONE
3919            ),]
3920        );
3921        commit(&mut table, pending);
3922        assert_eq!(
3923            table,
3924            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3925        );
3926
3927        // Test `delete_by_keys`
3928        let mut table_txn =
3929            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3930        let prevs = table_txn.delete_by_keys(
3931            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3932            0,
3933        );
3934        assert_eq!(
3935            prevs,
3936            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3937        );
3938        let prevs = table_txn.delete_by_keys(
3939            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3940            1,
3941        );
3942        assert_eq!(prevs, vec![]);
3943        let prevs = table_txn.delete_by_keys(
3944            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3945            2,
3946        );
3947        assert_eq!(prevs, vec![]);
3948        let pending = table_txn.pending();
3949        assert_eq!(
3950            pending,
3951            vec![(
3952                42i64.to_le_bytes().to_vec(),
3953                "v7".to_string(),
3954                Diff::MINUS_ONE
3955            ),]
3956        );
3957        commit(&mut table, pending);
3958        assert_eq!(table, BTreeMap::new());
3959    }
3960
3961    #[mz_ore::test(tokio::test)]
3962    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3963    async fn test_savepoint() {
3964        const VERSION: Version = Version::new(26, 0, 0);
3965        let mut persist_cache = PersistClientCache::new_no_metrics();
3966        persist_cache.cfg.build_version = VERSION;
3967        let persist_client = persist_cache
3968            .open(PersistLocation::new_in_mem())
3969            .await
3970            .unwrap();
3971        let state_builder = TestCatalogStateBuilder::new(persist_client)
3972            .with_default_deploy_generation()
3973            .with_version(VERSION);
3974
3975        // Initialize catalog.
3976        let _ = state_builder
3977            .clone()
3978            .unwrap_build()
3979            .await
3980            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3981            .await
3982            .unwrap()
3983            .0;
3984        let mut savepoint_state = state_builder
3985            .unwrap_build()
3986            .await
3987            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3988            .await
3989            .unwrap()
3990            .0;
3991
3992        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3993        assert!(!initial_snapshot.is_empty());
3994
3995        let db_name = "db";
3996        let db_owner = RoleId::User(42);
3997        let db_privileges = Vec::new();
3998        let mut txn = savepoint_state.transaction().await.unwrap();
3999        let (db_id, db_oid) = txn
4000            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4001            .unwrap();
4002        let commit_ts = txn.upper();
4003        txn.commit_internal(commit_ts).await.unwrap();
4004        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4005        let update = updates.into_element();
4006
4007        assert_eq!(update.diff, StateDiff::Addition);
4008
4009        let db = match update.kind {
4010            memory::objects::StateUpdateKind::Database(db) => db,
4011            update => panic!("unexpected update: {update:?}"),
4012        };
4013
4014        assert_eq!(db_id, db.id);
4015        assert_eq!(db_oid, db.oid);
4016        assert_eq!(db_name, db.name);
4017        assert_eq!(db_owner, db.owner_id);
4018        assert_eq!(db_privileges, db.privileges);
4019    }
4020
4021    #[mz_ore::test]
4022    fn test_allocate_introspection_source_index_id() {
4023        let cluster_variant: u8 = 0b0000_0001;
4024        let cluster_id_inner: u64 =
4025            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4026        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4027
4028        let cluster_id = ClusterId::System(cluster_id_inner);
4029        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4030
4031        let introspection_source_index_id: u64 =
4032            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4033
4034        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4035        {
4036            let mut cluster_variant_mask = 0xFF << 56;
4037            cluster_variant_mask &= introspection_source_index_id;
4038            cluster_variant_mask >>= 56;
4039            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4040        }
4041
4042        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4043        {
4044            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4045            cluster_id_inner_mask &= introspection_source_index_id;
4046            cluster_id_inner_mask >>= 8;
4047            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4048        }
4049
4050        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4051        {
4052            let mut log_variant_mask = 0xFF;
4053            log_variant_mask &= introspection_source_index_id;
4054            assert_eq!(
4055                log_variant_mask,
4056                u64::from(timely_messages_received_log_variant)
4057            );
4058        }
4059
4060        let (catalog_item_id, global_id) =
4061            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4062
4063        assert_eq!(
4064            catalog_item_id,
4065            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4066        );
4067        assert_eq!(
4068            global_id,
4069            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4070        );
4071    }
4072}