mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::time::Duration;
13
14use anyhow::anyhow;
15use derivative::Derivative;
16use itertools::Itertools;
17use mz_audit_log::VersionedEvent;
18use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
19use mz_controller_types::{ClusterId, ReplicaId};
20use mz_ore::cast::{u64_to_usize, usize_to_u64};
21use mz_ore::collections::{CollectionExt, HashSet};
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::vec::VecExt;
24use mz_ore::{soft_assert_no_log, soft_assert_or_log};
25use mz_persist_types::ShardId;
26use mz_pgrepr::oid::FIRST_USER_OID;
27use mz_proto::{RustType, TryFromProtoError};
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::role_id::RoleId;
31use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
32use mz_sql::catalog::{
33    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
34    RoleAttributesRaw, RoleMembership, RoleVars,
35};
36use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
37use mz_sql::plan::NetworkPolicyRule;
38use mz_sql_parser::ast::QualifiedReplica;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::controller::StorageError;
41use tracing::warn;
42
43use crate::builtin::BuiltinLog;
44use crate::durable::initialize::{
45    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
46    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
47};
48use crate::durable::objects::serialization::proto;
49use crate::durable::objects::{
50    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
51    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
52    ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
53    Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
54    DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
55    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
56    ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
57    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
58    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
59    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
60    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
61};
62use crate::durable::{
63    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
65    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
66    SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
67    SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
68    USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
69    USER_ROLE_ID_ALLOC_KEY,
70};
71use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
72
73type Timestamp = u64;
74
75/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
76/// An operation also logically groups multiple catalog updates together.
77#[derive(Derivative)]
78#[derivative(Debug)]
79pub struct Transaction<'a> {
80    #[derivative(Debug = "ignore")]
81    #[derivative(PartialEq = "ignore")]
82    durable_catalog: &'a mut dyn DurableCatalogState,
83    databases: TableTransaction<DatabaseKey, DatabaseValue>,
84    schemas: TableTransaction<SchemaKey, SchemaValue>,
85    items: TableTransaction<ItemKey, ItemValue>,
86    comments: TableTransaction<CommentKey, CommentValue>,
87    roles: TableTransaction<RoleKey, RoleValue>,
88    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
89    clusters: TableTransaction<ClusterKey, ClusterValue>,
90    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
91    introspection_sources:
92        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
93    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
94    configs: TableTransaction<ConfigKey, ConfigValue>,
95    settings: TableTransaction<SettingKey, SettingValue>,
96    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
97    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
98    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
99    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
100    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
101    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
102    storage_collection_metadata:
103        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
104    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
105    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
106    // Don't make this a table transaction so that it's not read into the
107    // in-memory cache.
108    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
109    /// The upper of `durable_catalog` at the start of the transaction.
110    upper: mz_repr::Timestamp,
111    /// The ID of the current operation of this transaction.
112    op_id: Timestamp,
113}
114
115impl<'a> Transaction<'a> {
116    pub fn new(
117        durable_catalog: &'a mut dyn DurableCatalogState,
118        Snapshot {
119            databases,
120            schemas,
121            roles,
122            role_auth,
123            items,
124            comments,
125            clusters,
126            network_policies,
127            cluster_replicas,
128            introspection_sources,
129            id_allocator,
130            configs,
131            settings,
132            source_references,
133            system_object_mappings,
134            system_configurations,
135            default_privileges,
136            system_privileges,
137            storage_collection_metadata,
138            unfinalized_shards,
139            txn_wal_shard,
140        }: Snapshot,
141        upper: mz_repr::Timestamp,
142    ) -> Result<Transaction<'a>, CatalogError> {
143        Ok(Transaction {
144            durable_catalog,
145            databases: TableTransaction::new_with_uniqueness_fn(
146                databases,
147                |a: &DatabaseValue, b| a.name == b.name,
148            )?,
149            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
150                a.database_id == b.database_id && a.name == b.name
151            })?,
152            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
153                a.schema_id == b.schema_id && a.name == b.name && {
154                    // `item_type` is slow, only compute if needed.
155                    let a_type = a.item_type();
156                    let b_type = b.item_type();
157                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
158                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
159                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
160                }
161            })?,
162            comments: TableTransaction::new(comments)?,
163            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
164                a.name == b.name
165            })?,
166            role_auth: TableTransaction::new(role_auth)?,
167            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
168                a.name == b.name
169            })?,
170            network_policies: TableTransaction::new_with_uniqueness_fn(
171                network_policies,
172                |a: &NetworkPolicyValue, b| a.name == b.name,
173            )?,
174            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
175                cluster_replicas,
176                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
177            )?,
178            introspection_sources: TableTransaction::new(introspection_sources)?,
179            id_allocator: TableTransaction::new(id_allocator)?,
180            configs: TableTransaction::new(configs)?,
181            settings: TableTransaction::new(settings)?,
182            source_references: TableTransaction::new(source_references)?,
183            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
184            system_configurations: TableTransaction::new(system_configurations)?,
185            default_privileges: TableTransaction::new(default_privileges)?,
186            system_privileges: TableTransaction::new(system_privileges)?,
187            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
188            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
189            // Uniqueness violations for this value occur at the key rather than
190            // the value (the key is the unit struct `()` so this is a singleton
191            // value).
192            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
193            audit_log_updates: Vec::new(),
194            upper,
195            op_id: 0,
196        })
197    }
198
199    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
200        let key = ItemKey { id: *id };
201        self.items
202            .get(&key)
203            .map(|v| DurableType::from_key_value(key, v.clone()))
204    }
205
206    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
207        self.items
208            .items()
209            .into_iter()
210            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
211            .sorted_by_key(|Item { id, .. }| *id)
212    }
213
214    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
215        self.insert_audit_log_events([event]);
216    }
217
218    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
219        let events = events
220            .into_iter()
221            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
222        self.audit_log_updates.extend(events);
223    }
224
225    pub fn insert_user_database(
226        &mut self,
227        database_name: &str,
228        owner_id: RoleId,
229        privileges: Vec<MzAclItem>,
230        temporary_oids: &HashSet<u32>,
231    ) -> Result<(DatabaseId, u32), CatalogError> {
232        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
233        let id = DatabaseId::User(id);
234        let oid = self.allocate_oid(temporary_oids)?;
235        self.insert_database(id, database_name, owner_id, privileges, oid)?;
236        Ok((id, oid))
237    }
238
239    pub(crate) fn insert_database(
240        &mut self,
241        id: DatabaseId,
242        database_name: &str,
243        owner_id: RoleId,
244        privileges: Vec<MzAclItem>,
245        oid: u32,
246    ) -> Result<u32, CatalogError> {
247        match self.databases.insert(
248            DatabaseKey { id },
249            DatabaseValue {
250                name: database_name.to_string(),
251                owner_id,
252                privileges,
253                oid,
254            },
255            self.op_id,
256        ) {
257            Ok(_) => Ok(oid),
258            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
259        }
260    }
261
262    pub fn insert_user_schema(
263        &mut self,
264        database_id: DatabaseId,
265        schema_name: &str,
266        owner_id: RoleId,
267        privileges: Vec<MzAclItem>,
268        temporary_oids: &HashSet<u32>,
269    ) -> Result<(SchemaId, u32), CatalogError> {
270        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
271        let id = SchemaId::User(id);
272        let oid = self.allocate_oid(temporary_oids)?;
273        self.insert_schema(
274            id,
275            Some(database_id),
276            schema_name.to_string(),
277            owner_id,
278            privileges,
279            oid,
280        )?;
281        Ok((id, oid))
282    }
283
284    pub fn insert_system_schema(
285        &mut self,
286        schema_id: u64,
287        schema_name: &str,
288        owner_id: RoleId,
289        privileges: Vec<MzAclItem>,
290        oid: u32,
291    ) -> Result<(), CatalogError> {
292        let id = SchemaId::System(schema_id);
293        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
294    }
295
296    pub(crate) fn insert_schema(
297        &mut self,
298        schema_id: SchemaId,
299        database_id: Option<DatabaseId>,
300        schema_name: String,
301        owner_id: RoleId,
302        privileges: Vec<MzAclItem>,
303        oid: u32,
304    ) -> Result<(), CatalogError> {
305        match self.schemas.insert(
306            SchemaKey { id: schema_id },
307            SchemaValue {
308                database_id,
309                name: schema_name.clone(),
310                owner_id,
311                privileges,
312                oid,
313            },
314            self.op_id,
315        ) {
316            Ok(_) => Ok(()),
317            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
318        }
319    }
320
321    pub fn insert_builtin_role(
322        &mut self,
323        id: RoleId,
324        name: String,
325        attributes: RoleAttributesRaw,
326        membership: RoleMembership,
327        vars: RoleVars,
328        oid: u32,
329    ) -> Result<RoleId, CatalogError> {
330        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
331        self.insert_role(id, name, attributes, membership, vars, oid)?;
332        Ok(id)
333    }
334
335    pub fn insert_user_role(
336        &mut self,
337        name: String,
338        attributes: RoleAttributesRaw,
339        membership: RoleMembership,
340        vars: RoleVars,
341        temporary_oids: &HashSet<u32>,
342    ) -> Result<(RoleId, u32), CatalogError> {
343        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
344        let id = RoleId::User(id);
345        let oid = self.allocate_oid(temporary_oids)?;
346        self.insert_role(id, name, attributes, membership, vars, oid)?;
347        Ok((id, oid))
348    }
349
350    fn insert_role(
351        &mut self,
352        id: RoleId,
353        name: String,
354        attributes: RoleAttributesRaw,
355        membership: RoleMembership,
356        vars: RoleVars,
357        oid: u32,
358    ) -> Result<(), CatalogError> {
359        if let Some(ref password) = attributes.password {
360            let hash =
361                mz_auth::hash::scram256_hash(password).expect("password hash should be valid");
362            match self.role_auth.insert(
363                RoleAuthKey { role_id: id },
364                RoleAuthValue {
365                    password_hash: Some(hash),
366                    updated_at: SYSTEM_TIME(),
367                },
368                self.op_id,
369            ) {
370                Ok(_) => {}
371                Err(_) => {
372                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
373                }
374            }
375        }
376
377        match self.roles.insert(
378            RoleKey { id },
379            RoleValue {
380                name: name.clone(),
381                attributes: attributes.into(),
382                membership,
383                vars,
384                oid,
385            },
386            self.op_id,
387        ) {
388            Ok(_) => Ok(()),
389            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
390        }
391    }
392
393    /// Panics if any introspection source id is not a system id
394    pub fn insert_user_cluster(
395        &mut self,
396        cluster_id: ClusterId,
397        cluster_name: &str,
398        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
399        owner_id: RoleId,
400        privileges: Vec<MzAclItem>,
401        config: ClusterConfig,
402        temporary_oids: &HashSet<u32>,
403    ) -> Result<(), CatalogError> {
404        self.insert_cluster(
405            cluster_id,
406            cluster_name,
407            introspection_source_indexes,
408            owner_id,
409            privileges,
410            config,
411            temporary_oids,
412        )
413    }
414
415    /// Panics if any introspection source id is not a system id
416    pub fn insert_system_cluster(
417        &mut self,
418        cluster_name: &str,
419        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
420        privileges: Vec<MzAclItem>,
421        owner_id: RoleId,
422        config: ClusterConfig,
423        temporary_oids: &HashSet<u32>,
424    ) -> Result<(), CatalogError> {
425        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
426        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
427        self.insert_cluster(
428            cluster_id,
429            cluster_name,
430            introspection_source_indexes,
431            owner_id,
432            privileges,
433            config,
434            temporary_oids,
435        )
436    }
437
438    fn insert_cluster(
439        &mut self,
440        cluster_id: ClusterId,
441        cluster_name: &str,
442        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
443        owner_id: RoleId,
444        privileges: Vec<MzAclItem>,
445        config: ClusterConfig,
446        temporary_oids: &HashSet<u32>,
447    ) -> Result<(), CatalogError> {
448        if let Err(_) = self.clusters.insert(
449            ClusterKey { id: cluster_id },
450            ClusterValue {
451                name: cluster_name.to_string(),
452                owner_id,
453                privileges,
454                config,
455            },
456            self.op_id,
457        ) {
458            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
459        };
460
461        let amount = usize_to_u64(introspection_source_indexes.len());
462        let oids = self.allocate_oids(amount, temporary_oids)?;
463        let introspection_source_indexes: Vec<_> = introspection_source_indexes
464            .into_iter()
465            .zip_eq(oids)
466            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
467            .collect();
468        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
469            let introspection_source_index = IntrospectionSourceIndex {
470                cluster_id,
471                name: builtin.name.to_string(),
472                item_id,
473                index_id,
474                oid,
475            };
476            let (key, value) = introspection_source_index.into_key_value();
477            self.introspection_sources
478                .insert(key, value, self.op_id)
479                .expect("no uniqueness violation");
480        }
481
482        Ok(())
483    }
484
485    pub fn rename_cluster(
486        &mut self,
487        cluster_id: ClusterId,
488        cluster_name: &str,
489        cluster_to_name: &str,
490    ) -> Result<(), CatalogError> {
491        let key = ClusterKey { id: cluster_id };
492
493        match self.clusters.update(
494            |k, v| {
495                if *k == key {
496                    let mut value = v.clone();
497                    value.name = cluster_to_name.to_string();
498                    Some(value)
499                } else {
500                    None
501                }
502            },
503            self.op_id,
504        )? {
505            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
506            Diff::ONE => Ok(()),
507            n => panic!(
508                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
509            ),
510        }
511    }
512
513    pub fn rename_cluster_replica(
514        &mut self,
515        replica_id: ReplicaId,
516        replica_name: &QualifiedReplica,
517        replica_to_name: &str,
518    ) -> Result<(), CatalogError> {
519        let key = ClusterReplicaKey { id: replica_id };
520
521        match self.cluster_replicas.update(
522            |k, v| {
523                if *k == key {
524                    let mut value = v.clone();
525                    value.name = replica_to_name.to_string();
526                    Some(value)
527                } else {
528                    None
529                }
530            },
531            self.op_id,
532        )? {
533            Diff::ZERO => {
534                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
535            }
536            Diff::ONE => Ok(()),
537            n => panic!(
538                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
539            ),
540        }
541    }
542
543    pub fn insert_cluster_replica(
544        &mut self,
545        cluster_id: ClusterId,
546        replica_name: &str,
547        config: ReplicaConfig,
548        owner_id: RoleId,
549    ) -> Result<ReplicaId, CatalogError> {
550        let replica_id = match cluster_id {
551            ClusterId::System(_) => self.allocate_system_replica_id()?,
552            ClusterId::User(_) => self.allocate_user_replica_id()?,
553        };
554        self.insert_cluster_replica_with_id(
555            cluster_id,
556            replica_id,
557            replica_name,
558            config,
559            owner_id,
560        )?;
561        Ok(replica_id)
562    }
563
564    pub(crate) fn insert_cluster_replica_with_id(
565        &mut self,
566        cluster_id: ClusterId,
567        replica_id: ReplicaId,
568        replica_name: &str,
569        config: ReplicaConfig,
570        owner_id: RoleId,
571    ) -> Result<(), CatalogError> {
572        if let Err(_) = self.cluster_replicas.insert(
573            ClusterReplicaKey { id: replica_id },
574            ClusterReplicaValue {
575                cluster_id,
576                name: replica_name.into(),
577                config,
578                owner_id,
579            },
580            self.op_id,
581        ) {
582            let cluster = self
583                .clusters
584                .get(&ClusterKey { id: cluster_id })
585                .expect("cluster exists");
586            return Err(SqlCatalogError::DuplicateReplica(
587                replica_name.to_string(),
588                cluster.name.to_string(),
589            )
590            .into());
591        };
592        Ok(())
593    }
594
595    pub fn insert_user_network_policy(
596        &mut self,
597        name: String,
598        rules: Vec<NetworkPolicyRule>,
599        privileges: Vec<MzAclItem>,
600        owner_id: RoleId,
601        temporary_oids: &HashSet<u32>,
602    ) -> Result<NetworkPolicyId, CatalogError> {
603        let oid = self.allocate_oid(temporary_oids)?;
604        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
605        let id = NetworkPolicyId::User(id);
606        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
607    }
608
609    pub fn insert_network_policy(
610        &mut self,
611        id: NetworkPolicyId,
612        name: String,
613        rules: Vec<NetworkPolicyRule>,
614        privileges: Vec<MzAclItem>,
615        owner_id: RoleId,
616        oid: u32,
617    ) -> Result<NetworkPolicyId, CatalogError> {
618        match self.network_policies.insert(
619            NetworkPolicyKey { id },
620            NetworkPolicyValue {
621                name: name.clone(),
622                rules,
623                privileges,
624                owner_id,
625                oid,
626            },
627            self.op_id,
628        ) {
629            Ok(_) => Ok(id),
630            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
631        }
632    }
633
634    /// Updates persisted information about persisted introspection source
635    /// indexes.
636    ///
637    /// Panics if provided id is not a system id.
638    pub fn update_introspection_source_index_gids(
639        &mut self,
640        mappings: impl Iterator<
641            Item = (
642                ClusterId,
643                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
644            ),
645        >,
646    ) -> Result<(), CatalogError> {
647        for (cluster_id, updates) in mappings {
648            for (name, item_id, index_id, oid) in updates {
649                let introspection_source_index = IntrospectionSourceIndex {
650                    cluster_id,
651                    name,
652                    item_id,
653                    index_id,
654                    oid,
655                };
656                let (key, value) = introspection_source_index.into_key_value();
657
658                let prev = self
659                    .introspection_sources
660                    .set(key, Some(value), self.op_id)?;
661                if prev.is_none() {
662                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
663                        "{index_id}"
664                    ))
665                    .into());
666                }
667            }
668        }
669        Ok(())
670    }
671
672    pub fn insert_user_item(
673        &mut self,
674        id: CatalogItemId,
675        global_id: GlobalId,
676        schema_id: SchemaId,
677        item_name: &str,
678        create_sql: String,
679        owner_id: RoleId,
680        privileges: Vec<MzAclItem>,
681        temporary_oids: &HashSet<u32>,
682        versions: BTreeMap<RelationVersion, GlobalId>,
683    ) -> Result<u32, CatalogError> {
684        let oid = self.allocate_oid(temporary_oids)?;
685        self.insert_item(
686            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
687        )?;
688        Ok(oid)
689    }
690
691    pub fn insert_item(
692        &mut self,
693        id: CatalogItemId,
694        oid: u32,
695        global_id: GlobalId,
696        schema_id: SchemaId,
697        item_name: &str,
698        create_sql: String,
699        owner_id: RoleId,
700        privileges: Vec<MzAclItem>,
701        extra_versions: BTreeMap<RelationVersion, GlobalId>,
702    ) -> Result<(), CatalogError> {
703        match self.items.insert(
704            ItemKey { id },
705            ItemValue {
706                schema_id,
707                name: item_name.to_string(),
708                create_sql,
709                owner_id,
710                privileges,
711                oid,
712                global_id,
713                extra_versions,
714            },
715            self.op_id,
716        ) {
717            Ok(_) => Ok(()),
718            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
719        }
720    }
721
722    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
723        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
724    }
725
726    pub fn get_and_increment_id_by(
727        &mut self,
728        key: String,
729        amount: u64,
730    ) -> Result<Vec<u64>, CatalogError> {
731        assert!(
732            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
733            "system item IDs cannot be allocated outside of bootstrap"
734        );
735
736        let current_id = self
737            .id_allocator
738            .items()
739            .get(&IdAllocKey { name: key.clone() })
740            .unwrap_or_else(|| panic!("{key} id allocator missing"))
741            .next_id;
742        let next_id = current_id
743            .checked_add(amount)
744            .ok_or(SqlCatalogError::IdExhaustion)?;
745        let prev = self.id_allocator.set(
746            IdAllocKey { name: key },
747            Some(IdAllocValue { next_id }),
748            self.op_id,
749        )?;
750        assert_eq!(
751            prev,
752            Some(IdAllocValue {
753                next_id: current_id
754            })
755        );
756        Ok((current_id..next_id).collect())
757    }
758
759    pub fn allocate_system_item_ids(
760        &mut self,
761        amount: u64,
762    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
763        assert!(
764            !self.durable_catalog.is_bootstrap_complete(),
765            "we can only allocate system item IDs during bootstrap"
766        );
767        Ok(self
768            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
769            .into_iter()
770            // TODO(alter_table): Use separate ID allocators.
771            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
772            .collect())
773    }
774
775    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
776    /// from the `cluster_id` and `log_variant`.
777    ///
778    /// Introspection source indexes are a special edge case of items. They are considered system
779    /// items, but they are the only system item that can be created by the user at any time. All
780    /// other system items can only be created by the system during the startup of an upgrade.
781    ///
782    /// Furthermore, all other system item IDs are allocated deterministically in the same order
783    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
784    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
785    /// only one of them can successfully write the IDs down to the catalog. This removes the need
786    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
787    ///
788    /// Since introspection IDs can be allocated at any time, read-only instances would either need
789    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
790    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
791    ///
792    /// Introspection source index IDs are 64 bit integers, with the following format (not to
793    /// scale):
794    ///
795    /// -------------------------------------------------------------
796    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
797    /// |--------------------|------------------------|-------------|
798    /// |       8-bits       |         48-bits        |   8-bits    |
799    /// -------------------------------------------------------------
800    ///
801    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
802    ///                          to.
803    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
804    ///                          belongs to.
805    /// Log Variant:             A unique number indicating the log variant this index is on.
806    pub fn allocate_introspection_source_index_id(
807        cluster_id: &ClusterId,
808        log_variant: LogVariant,
809    ) -> (CatalogItemId, GlobalId) {
810        let cluster_variant: u8 = match cluster_id {
811            ClusterId::System(_) => 1,
812            ClusterId::User(_) => 2,
813        };
814        let cluster_id: u64 = cluster_id.inner_id();
815        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
816        assert_eq!(
817            CLUSTER_ID_MASK & cluster_id,
818            0,
819            "invalid cluster ID: {cluster_id}"
820        );
821        let log_variant: u8 = match log_variant {
822            LogVariant::Timely(TimelyLog::Operates) => 1,
823            LogVariant::Timely(TimelyLog::Channels) => 2,
824            LogVariant::Timely(TimelyLog::Elapsed) => 3,
825            LogVariant::Timely(TimelyLog::Histogram) => 4,
826            LogVariant::Timely(TimelyLog::Addresses) => 5,
827            LogVariant::Timely(TimelyLog::Parks) => 6,
828            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
829            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
830            LogVariant::Timely(TimelyLog::Reachability) => 9,
831            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
832            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
833            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
834            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
835            LogVariant::Differential(DifferentialLog::Sharing) => 14,
836            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
837            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
838            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
839            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
840            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
841            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
842            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
843            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
844            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
845            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
846            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
847            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
848            LogVariant::Compute(ComputeLog::ShutdownDuration) => 27,
849            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
850            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
851            LogVariant::Compute(ComputeLog::LirMapping) => 30,
852            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
853            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
854        };
855
856        let mut id: u64 = u64::from(cluster_variant) << 56;
857        id |= cluster_id << 8;
858        id |= u64::from(log_variant);
859
860        (
861            CatalogItemId::IntrospectionSourceIndex(id),
862            GlobalId::IntrospectionSourceIndex(id),
863        )
864    }
865
866    pub fn allocate_user_item_ids(
867        &mut self,
868        amount: u64,
869    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
870        Ok(self
871            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
872            .into_iter()
873            // TODO(alter_table): Use separate ID allocators.
874            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
875            .collect())
876    }
877
878    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
879        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
880        Ok(ReplicaId::User(id))
881    }
882
883    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
884        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
885        Ok(ReplicaId::System(id))
886    }
887
888    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
889        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
890    }
891
892    pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
893        self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
894    }
895
896    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
897    /// object.
898    #[mz_ore::instrument]
899    fn allocate_oids(
900        &mut self,
901        amount: u64,
902        temporary_oids: &HashSet<u32>,
903    ) -> Result<Vec<u32>, CatalogError> {
904        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
905        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
906        struct UserOid(u32);
907
908        impl UserOid {
909            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
910                if oid < FIRST_USER_OID {
911                    Err(anyhow!("invalid user OID {oid}"))
912                } else {
913                    Ok(UserOid(oid))
914                }
915            }
916        }
917
918        impl std::ops::AddAssign<u32> for UserOid {
919            fn add_assign(&mut self, rhs: u32) {
920                let (res, overflow) = self.0.overflowing_add(rhs);
921                self.0 = if overflow { FIRST_USER_OID + res } else { res };
922            }
923        }
924
925        if amount > u32::MAX.into() {
926            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
927        }
928
929        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
930        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
931        // However, benchmarking shows that this doesn't make a noticeable difference and the other
932        // approach requires making sure that allocator always stays in-sync which can be
933        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
934        let mut allocated_oids = HashSet::with_capacity(
935            self.databases.len()
936                + self.schemas.len()
937                + self.roles.len()
938                + self.items.len()
939                + self.introspection_sources.len()
940                + temporary_oids.len(),
941        );
942        self.databases.for_values(|_, value| {
943            allocated_oids.insert(value.oid);
944        });
945        self.schemas.for_values(|_, value| {
946            allocated_oids.insert(value.oid);
947        });
948        self.roles.for_values(|_, value| {
949            allocated_oids.insert(value.oid);
950        });
951        self.items.for_values(|_, value| {
952            allocated_oids.insert(value.oid);
953        });
954        self.introspection_sources.for_values(|_, value| {
955            allocated_oids.insert(value.oid);
956        });
957
958        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
959
960        let start_oid: u32 = self
961            .id_allocator
962            .items()
963            .get(&IdAllocKey {
964                name: OID_ALLOC_KEY.to_string(),
965            })
966            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
967            .next_id
968            .try_into()
969            .expect("we should never persist an oid outside of the u32 range");
970        let mut current_oid = UserOid::new(start_oid)
971            .expect("we should never persist an oid outside of user OID range");
972        let mut oids = Vec::new();
973        while oids.len() < u64_to_usize(amount) {
974            if !is_allocated(current_oid.0) {
975                oids.push(current_oid.0);
976            }
977            current_oid += 1;
978
979            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
980                // We've exhausted all possible OIDs and still don't have `amount`.
981                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
982            }
983        }
984
985        let next_id = current_oid.0;
986        let prev = self.id_allocator.set(
987            IdAllocKey {
988                name: OID_ALLOC_KEY.to_string(),
989            },
990            Some(IdAllocValue {
991                next_id: next_id.into(),
992            }),
993            self.op_id,
994        )?;
995        assert_eq!(
996            prev,
997            Some(IdAllocValue {
998                next_id: start_oid.into(),
999            })
1000        );
1001
1002        Ok(oids)
1003    }
1004
1005    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1006    /// object.
1007    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1008        self.allocate_oids(1, temporary_oids)
1009            .map(|oids| oids.into_element())
1010    }
1011
1012    pub(crate) fn insert_id_allocator(
1013        &mut self,
1014        name: String,
1015        next_id: u64,
1016    ) -> Result<(), CatalogError> {
1017        match self.id_allocator.insert(
1018            IdAllocKey { name: name.clone() },
1019            IdAllocValue { next_id },
1020            self.op_id,
1021        ) {
1022            Ok(_) => Ok(()),
1023            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1024        }
1025    }
1026
1027    /// Removes the database `id` from the transaction.
1028    ///
1029    /// Returns an error if `id` is not found.
1030    ///
1031    /// Runtime is linear with respect to the total number of databases in the catalog.
1032    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1033    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1034        let prev = self
1035            .databases
1036            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1037        if prev.is_some() {
1038            Ok(())
1039        } else {
1040            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1041        }
1042    }
1043
1044    /// Removes all databases in `databases` from the transaction.
1045    ///
1046    /// Returns an error if any id in `databases` is not found.
1047    ///
1048    /// NOTE: On error, there still may be some databases removed from the transaction. It
1049    /// is up to the caller to either abort the transaction or commit.
1050    pub fn remove_databases(
1051        &mut self,
1052        databases: &BTreeSet<DatabaseId>,
1053    ) -> Result<(), CatalogError> {
1054        if databases.is_empty() {
1055            return Ok(());
1056        }
1057
1058        let to_remove = databases
1059            .iter()
1060            .map(|id| (DatabaseKey { id: *id }, None))
1061            .collect();
1062        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1063        prev.retain(|_k, val| val.is_none());
1064
1065        if !prev.is_empty() {
1066            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1067            return Err(SqlCatalogError::UnknownDatabase(err).into());
1068        }
1069
1070        Ok(())
1071    }
1072
1073    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1074    ///
1075    /// Returns an error if `(database_id, schema_id)` is not found.
1076    ///
1077    /// Runtime is linear with respect to the total number of schemas in the catalog.
1078    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1079    pub fn remove_schema(
1080        &mut self,
1081        database_id: &Option<DatabaseId>,
1082        schema_id: &SchemaId,
1083    ) -> Result<(), CatalogError> {
1084        let prev = self
1085            .schemas
1086            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1087        if prev.is_some() {
1088            Ok(())
1089        } else {
1090            let database_name = match database_id {
1091                Some(id) => format!("{id}."),
1092                None => "".to_string(),
1093            };
1094            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1095        }
1096    }
1097
1098    /// Removes all schemas in `schemas` from the transaction.
1099    ///
1100    /// Returns an error if any id in `schemas` is not found.
1101    ///
1102    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1103    /// is up to the caller to either abort the transaction or commit.
1104    pub fn remove_schemas(
1105        &mut self,
1106        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1107    ) -> Result<(), CatalogError> {
1108        if schemas.is_empty() {
1109            return Ok(());
1110        }
1111
1112        let to_remove = schemas
1113            .iter()
1114            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1115            .collect();
1116        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1117        prev.retain(|_k, v| v.is_none());
1118
1119        if !prev.is_empty() {
1120            let err = prev
1121                .keys()
1122                .map(|k| {
1123                    let db_spec = schemas.get(&k.id).expect("should_exist");
1124                    let db_name = match db_spec {
1125                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1126                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1127                    };
1128                    format!("{}.{}", db_name, k.id)
1129                })
1130                .join(", ");
1131
1132            return Err(SqlCatalogError::UnknownSchema(err).into());
1133        }
1134
1135        Ok(())
1136    }
1137
1138    pub fn remove_source_references(
1139        &mut self,
1140        source_id: CatalogItemId,
1141    ) -> Result<(), CatalogError> {
1142        let deleted = self
1143            .source_references
1144            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1145            .is_some();
1146        if deleted {
1147            Ok(())
1148        } else {
1149            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1150        }
1151    }
1152
1153    /// Removes all user roles in `roles` from the transaction.
1154    ///
1155    /// Returns an error if any id in `roles` is not found.
1156    ///
1157    /// NOTE: On error, there still may be some roles removed from the transaction. It
1158    /// is up to the caller to either abort the transaction or commit.
1159    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1160        assert!(
1161            roles.iter().all(|id| id.is_user()),
1162            "cannot delete non-user roles"
1163        );
1164        self.remove_roles(roles)
1165    }
1166
1167    /// Removes all roles in `roles` from the transaction.
1168    ///
1169    /// Returns an error if any id in `roles` is not found.
1170    ///
1171    /// NOTE: On error, there still may be some roles removed from the transaction. It
1172    /// is up to the caller to either abort the transaction or commit.
1173    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1174        if roles.is_empty() {
1175            return Ok(());
1176        }
1177
1178        let to_remove_keys = roles
1179            .iter()
1180            .map(|role_id| RoleKey { id: *role_id })
1181            .collect::<Vec<_>>();
1182
1183        let to_remove_roles = to_remove_keys
1184            .iter()
1185            .map(|role_key| (role_key.clone(), None))
1186            .collect();
1187
1188        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1189
1190        let to_remove_role_auth = to_remove_keys
1191            .iter()
1192            .map(|role_key| {
1193                (
1194                    RoleAuthKey {
1195                        role_id: role_key.id,
1196                    },
1197                    None,
1198                )
1199            })
1200            .collect();
1201
1202        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1203
1204        prev.retain(|_k, v| v.is_none());
1205        if !prev.is_empty() {
1206            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1207            return Err(SqlCatalogError::UnknownRole(err).into());
1208        }
1209
1210        role_auth_prev.retain(|_k, v| v.is_none());
1211        // The reason we don't to the same check as above is that the role auth table
1212        // is not required to have all roles in the role table.
1213
1214        Ok(())
1215    }
1216
1217    /// Removes all cluster in `clusters` from the transaction.
1218    ///
1219    /// Returns an error if any id in `clusters` is not found.
1220    ///
1221    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1222    /// the caller to either abort the transaction or commit.
1223    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1224        if clusters.is_empty() {
1225            return Ok(());
1226        }
1227
1228        let to_remove = clusters
1229            .iter()
1230            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1231            .collect();
1232        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1233
1234        prev.retain(|_k, v| v.is_none());
1235        if !prev.is_empty() {
1236            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1237            return Err(SqlCatalogError::UnknownCluster(err).into());
1238        }
1239
1240        // Cascade delete introspection sources and cluster replicas.
1241        //
1242        // TODO(benesch): this doesn't seem right. Cascade deletions should
1243        // be entirely the domain of the higher catalog layer, not the
1244        // storage layer.
1245        self.cluster_replicas
1246            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1247        self.introspection_sources
1248            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1249
1250        Ok(())
1251    }
1252
1253    /// Removes the cluster replica `id` from the transaction.
1254    ///
1255    /// Returns an error if `id` is not found.
1256    ///
1257    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1258    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1259    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1260        let deleted = self
1261            .cluster_replicas
1262            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1263            .is_some();
1264        if deleted {
1265            Ok(())
1266        } else {
1267            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1268        }
1269    }
1270
1271    /// Removes all cluster replicas in `replicas` from the transaction.
1272    ///
1273    /// Returns an error if any id in `replicas` is not found.
1274    ///
1275    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1276    /// is up to the caller to either abort the transaction or commit.
1277    pub fn remove_cluster_replicas(
1278        &mut self,
1279        replicas: &BTreeSet<ReplicaId>,
1280    ) -> Result<(), CatalogError> {
1281        if replicas.is_empty() {
1282            return Ok(());
1283        }
1284
1285        let to_remove = replicas
1286            .iter()
1287            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1288            .collect();
1289        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1290
1291        prev.retain(|_k, v| v.is_none());
1292        if !prev.is_empty() {
1293            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1294            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1295        }
1296
1297        Ok(())
1298    }
1299
1300    /// Removes item `id` from the transaction.
1301    ///
1302    /// Returns an error if `id` is not found.
1303    ///
1304    /// Runtime is linear with respect to the total number of items in the catalog.
1305    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1306    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1307        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1308        if prev.is_some() {
1309            Ok(())
1310        } else {
1311            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1312        }
1313    }
1314
1315    /// Removes all items in `ids` from the transaction.
1316    ///
1317    /// Returns an error if any id in `ids` is not found.
1318    ///
1319    /// NOTE: On error, there still may be some items removed from the transaction. It is
1320    /// up to the caller to either abort the transaction or commit.
1321    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1322        if ids.is_empty() {
1323            return Ok(());
1324        }
1325
1326        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1327        let n = self.items.delete_by_keys(ks, self.op_id).len();
1328        if n == ids.len() {
1329            Ok(())
1330        } else {
1331            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1332            let mut unknown = ids.difference(&item_ids);
1333            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1334        }
1335    }
1336
1337    /// Removes all system object mappings in `descriptions` from the transaction.
1338    ///
1339    /// Returns an error if any description in `descriptions` is not found.
1340    ///
1341    /// NOTE: On error, there still may be some items removed from the transaction. It is
1342    /// up to the caller to either abort the transaction or commit.
1343    pub fn remove_system_object_mappings(
1344        &mut self,
1345        descriptions: BTreeSet<SystemObjectDescription>,
1346    ) -> Result<(), CatalogError> {
1347        if descriptions.is_empty() {
1348            return Ok(());
1349        }
1350
1351        let ks: Vec<_> = descriptions
1352            .clone()
1353            .into_iter()
1354            .map(|desc| GidMappingKey {
1355                schema_name: desc.schema_name,
1356                object_type: desc.object_type,
1357                object_name: desc.object_name,
1358            })
1359            .collect();
1360        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1361
1362        if n == descriptions.len() {
1363            Ok(())
1364        } else {
1365            let item_descriptions = self
1366                .system_gid_mapping
1367                .items()
1368                .keys()
1369                .map(|k| SystemObjectDescription {
1370                    schema_name: k.schema_name.clone(),
1371                    object_type: k.object_type.clone(),
1372                    object_name: k.object_name.clone(),
1373                })
1374                .collect();
1375            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1376                format!(
1377                    "{} {}.{}",
1378                    desc.object_type, desc.schema_name, desc.object_name
1379                )
1380            });
1381            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1382        }
1383    }
1384
1385    /// Removes all introspection source indexes in `indexes` from the transaction.
1386    ///
1387    /// Returns an error if any index in `indexes` is not found.
1388    ///
1389    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1390    /// up to the caller to either abort the transaction or commit.
1391    pub fn remove_introspection_source_indexes(
1392        &mut self,
1393        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1394    ) -> Result<(), CatalogError> {
1395        if introspection_source_indexes.is_empty() {
1396            return Ok(());
1397        }
1398
1399        let ks: Vec<_> = introspection_source_indexes
1400            .clone()
1401            .into_iter()
1402            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1403            .collect();
1404        let n = self
1405            .introspection_sources
1406            .delete_by_keys(ks, self.op_id)
1407            .len();
1408        if n == introspection_source_indexes.len() {
1409            Ok(())
1410        } else {
1411            let txn_indexes = self
1412                .introspection_sources
1413                .items()
1414                .keys()
1415                .map(|k| (k.cluster_id, k.name.clone()))
1416                .collect();
1417            let mut unknown = introspection_source_indexes
1418                .difference(&txn_indexes)
1419                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1420            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1421        }
1422    }
1423
1424    /// Updates item `id` in the transaction to `item_name` and `item`.
1425    ///
1426    /// Returns an error if `id` is not found.
1427    ///
1428    /// Runtime is linear with respect to the total number of items in the catalog.
1429    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1430    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1431        let updated =
1432            self.items
1433                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1434        if updated {
1435            Ok(())
1436        } else {
1437            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1438        }
1439    }
1440
1441    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1442    /// corresponding value in `items`.
1443    ///
1444    /// Returns an error if any id in `items` is not found.
1445    ///
1446    /// NOTE: On error, there still may be some items updated in the transaction. It is
1447    /// up to the caller to either abort the transaction or commit.
1448    pub fn update_items(
1449        &mut self,
1450        items: BTreeMap<CatalogItemId, Item>,
1451    ) -> Result<(), CatalogError> {
1452        if items.is_empty() {
1453            return Ok(());
1454        }
1455
1456        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1457        let kvs: Vec<_> = items
1458            .clone()
1459            .into_iter()
1460            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1461            .collect();
1462        let n = self.items.update_by_keys(kvs, self.op_id)?;
1463        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1464        if n == update_ids.len() {
1465            Ok(())
1466        } else {
1467            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1468            let mut unknown = update_ids.difference(&item_ids);
1469            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1470        }
1471    }
1472
1473    /// Updates role `id` in the transaction to `role`.
1474    ///
1475    /// Returns an error if `id` is not found.
1476    ///
1477    /// Runtime is linear with respect to the total number of items in the catalog.
1478    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1479    /// You should model it after [`Self::update_items`].
1480    pub fn update_role(
1481        &mut self,
1482        id: RoleId,
1483        role: Role,
1484        password: PasswordAction,
1485    ) -> Result<(), CatalogError> {
1486        let key = RoleKey { id };
1487        if self.roles.get(&key).is_some() {
1488            let auth_key = RoleAuthKey { role_id: id };
1489
1490            match password {
1491                PasswordAction::Set(new_password) => {
1492                    let hash = mz_auth::hash::scram256_hash(&new_password)
1493                        .expect("password hash should be valid");
1494                    let value = RoleAuthValue {
1495                        password_hash: Some(hash),
1496                        updated_at: SYSTEM_TIME(),
1497                    };
1498
1499                    if self.role_auth.get(&auth_key).is_some() {
1500                        self.role_auth
1501                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1502                    } else {
1503                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1504                    }
1505                }
1506                PasswordAction::Clear => {
1507                    let value = RoleAuthValue {
1508                        password_hash: None,
1509                        updated_at: SYSTEM_TIME(),
1510                    };
1511                    if self.role_auth.get(&auth_key).is_some() {
1512                        self.role_auth
1513                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1514                    }
1515                }
1516                PasswordAction::NoChange => {}
1517            }
1518
1519            self.roles
1520                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1521
1522            Ok(())
1523        } else {
1524            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1525        }
1526    }
1527
1528    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1529    /// corresponding value in `roles`.
1530    ///
1531    /// This function does *not* write role_authentication information to the catalog.
1532    /// It is purely for updating the role itself.
1533    ///
1534    /// Returns an error if any id in `roles` is not found.
1535    ///
1536    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1537    /// up to the caller to either abort the transaction or commit.
1538    pub fn update_roles_without_auth(
1539        &mut self,
1540        roles: BTreeMap<RoleId, Role>,
1541    ) -> Result<(), CatalogError> {
1542        if roles.is_empty() {
1543            return Ok(());
1544        }
1545
1546        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1547        let kvs: Vec<_> = roles
1548            .into_iter()
1549            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1550            .collect();
1551        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1552        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1553
1554        if n == update_role_ids.len() {
1555            Ok(())
1556        } else {
1557            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1558            let mut unknown = update_role_ids.difference(&role_ids);
1559            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1560        }
1561    }
1562
1563    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1564    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1565    ///
1566    /// Panics if provided id is not a system id.
1567    pub fn update_system_object_mappings(
1568        &mut self,
1569        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1570    ) -> Result<(), CatalogError> {
1571        if mappings.is_empty() {
1572            return Ok(());
1573        }
1574
1575        let n = self.system_gid_mapping.update(
1576            |_k, v| {
1577                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1578                    let (_, new_value) = mapping.clone().into_key_value();
1579                    Some(new_value)
1580                } else {
1581                    None
1582                }
1583            },
1584            self.op_id,
1585        )?;
1586
1587        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1588            != mappings.len()
1589        {
1590            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1591            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1592        }
1593
1594        Ok(())
1595    }
1596
1597    /// Updates cluster `id` in the transaction to `cluster`.
1598    ///
1599    /// Returns an error if `id` is not found.
1600    ///
1601    /// Runtime is linear with respect to the total number of clusters in the catalog.
1602    /// DO NOT call this function in a loop.
1603    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1604        let updated = self.clusters.update_by_key(
1605            ClusterKey { id },
1606            cluster.into_key_value().1,
1607            self.op_id,
1608        )?;
1609        if updated {
1610            Ok(())
1611        } else {
1612            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1613        }
1614    }
1615
1616    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1617    ///
1618    /// Returns an error if `replica_id` is not found.
1619    ///
1620    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1621    /// DO NOT call this function in a loop.
1622    pub fn update_cluster_replica(
1623        &mut self,
1624        replica_id: ReplicaId,
1625        replica: ClusterReplica,
1626    ) -> Result<(), CatalogError> {
1627        let updated = self.cluster_replicas.update_by_key(
1628            ClusterReplicaKey { id: replica_id },
1629            replica.into_key_value().1,
1630            self.op_id,
1631        )?;
1632        if updated {
1633            Ok(())
1634        } else {
1635            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1636        }
1637    }
1638
1639    /// Updates database `id` in the transaction to `database`.
1640    ///
1641    /// Returns an error if `id` is not found.
1642    ///
1643    /// Runtime is linear with respect to the total number of databases in the catalog.
1644    /// DO NOT call this function in a loop.
1645    pub fn update_database(
1646        &mut self,
1647        id: DatabaseId,
1648        database: Database,
1649    ) -> Result<(), CatalogError> {
1650        let updated = self.databases.update_by_key(
1651            DatabaseKey { id },
1652            database.into_key_value().1,
1653            self.op_id,
1654        )?;
1655        if updated {
1656            Ok(())
1657        } else {
1658            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1659        }
1660    }
1661
1662    /// Updates schema `schema_id` in the transaction to `schema`.
1663    ///
1664    /// Returns an error if `schema_id` is not found.
1665    ///
1666    /// Runtime is linear with respect to the total number of schemas in the catalog.
1667    /// DO NOT call this function in a loop.
1668    pub fn update_schema(
1669        &mut self,
1670        schema_id: SchemaId,
1671        schema: Schema,
1672    ) -> Result<(), CatalogError> {
1673        let updated = self.schemas.update_by_key(
1674            SchemaKey { id: schema_id },
1675            schema.into_key_value().1,
1676            self.op_id,
1677        )?;
1678        if updated {
1679            Ok(())
1680        } else {
1681            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1682        }
1683    }
1684
1685    /// Updates `network_policy_id` in the transaction to `network policy`.
1686    ///
1687    /// Returns an error if `id` is not found.
1688    ///
1689    /// Runtime is linear with respect to the total number of databases in the catalog.
1690    /// DO NOT call this function in a loop.
1691    pub fn update_network_policy(
1692        &mut self,
1693        id: NetworkPolicyId,
1694        network_policy: NetworkPolicy,
1695    ) -> Result<(), CatalogError> {
1696        let updated = self.network_policies.update_by_key(
1697            NetworkPolicyKey { id },
1698            network_policy.into_key_value().1,
1699            self.op_id,
1700        )?;
1701        if updated {
1702            Ok(())
1703        } else {
1704            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1705        }
1706    }
1707    /// Removes all network policies in `network policies` from the transaction.
1708    ///
1709    /// Returns an error if any id in `network policy` is not found.
1710    ///
1711    /// NOTE: On error, there still may be some roles removed from the transaction. It
1712    /// is up to the caller to either abort the transaction or commit.
1713    pub fn remove_network_policies(
1714        &mut self,
1715        network_policies: &BTreeSet<NetworkPolicyId>,
1716    ) -> Result<(), CatalogError> {
1717        if network_policies.is_empty() {
1718            return Ok(());
1719        }
1720
1721        let to_remove = network_policies
1722            .iter()
1723            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1724            .collect();
1725        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1726        assert!(
1727            prev.iter().all(|(k, _)| k.id.is_user()),
1728            "cannot delete non-user network policy"
1729        );
1730
1731        prev.retain(|_k, v| v.is_none());
1732        if !prev.is_empty() {
1733            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1734            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1735        }
1736
1737        Ok(())
1738    }
1739    /// Set persisted default privilege.
1740    ///
1741    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1742    pub fn set_default_privilege(
1743        &mut self,
1744        role_id: RoleId,
1745        database_id: Option<DatabaseId>,
1746        schema_id: Option<SchemaId>,
1747        object_type: ObjectType,
1748        grantee: RoleId,
1749        privileges: Option<AclMode>,
1750    ) -> Result<(), CatalogError> {
1751        self.default_privileges.set(
1752            DefaultPrivilegesKey {
1753                role_id,
1754                database_id,
1755                schema_id,
1756                object_type,
1757                grantee,
1758            },
1759            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1760            self.op_id,
1761        )?;
1762        Ok(())
1763    }
1764
1765    /// Set persisted default privileges.
1766    pub fn set_default_privileges(
1767        &mut self,
1768        default_privileges: Vec<DefaultPrivilege>,
1769    ) -> Result<(), CatalogError> {
1770        if default_privileges.is_empty() {
1771            return Ok(());
1772        }
1773
1774        let default_privileges = default_privileges
1775            .into_iter()
1776            .map(DurableType::into_key_value)
1777            .map(|(k, v)| (k, Some(v)))
1778            .collect();
1779        self.default_privileges
1780            .set_many(default_privileges, self.op_id)?;
1781        Ok(())
1782    }
1783
1784    /// Set persisted system privilege.
1785    ///
1786    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1787    pub fn set_system_privilege(
1788        &mut self,
1789        grantee: RoleId,
1790        grantor: RoleId,
1791        acl_mode: Option<AclMode>,
1792    ) -> Result<(), CatalogError> {
1793        self.system_privileges.set(
1794            SystemPrivilegesKey { grantee, grantor },
1795            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1796            self.op_id,
1797        )?;
1798        Ok(())
1799    }
1800
1801    /// Set persisted system privileges.
1802    pub fn set_system_privileges(
1803        &mut self,
1804        system_privileges: Vec<MzAclItem>,
1805    ) -> Result<(), CatalogError> {
1806        if system_privileges.is_empty() {
1807            return Ok(());
1808        }
1809
1810        let system_privileges = system_privileges
1811            .into_iter()
1812            .map(DurableType::into_key_value)
1813            .map(|(k, v)| (k, Some(v)))
1814            .collect();
1815        self.system_privileges
1816            .set_many(system_privileges, self.op_id)?;
1817        Ok(())
1818    }
1819
1820    /// Set persisted setting.
1821    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1822        self.settings.set(
1823            SettingKey { name },
1824            value.map(|value| SettingValue { value }),
1825            self.op_id,
1826        )?;
1827        Ok(())
1828    }
1829
1830    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1831        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1832    }
1833
1834    /// Insert persisted introspection source index.
1835    pub fn insert_introspection_source_indexes(
1836        &mut self,
1837        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1838        temporary_oids: &HashSet<u32>,
1839    ) -> Result<(), CatalogError> {
1840        if introspection_source_indexes.is_empty() {
1841            return Ok(());
1842        }
1843
1844        let amount = usize_to_u64(introspection_source_indexes.len());
1845        let oids = self.allocate_oids(amount, temporary_oids)?;
1846        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1847            .into_iter()
1848            .zip_eq(oids)
1849            .map(
1850                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1851                    cluster_id,
1852                    name,
1853                    item_id,
1854                    index_id,
1855                    oid,
1856                },
1857            )
1858            .collect();
1859
1860        for introspection_source_index in introspection_source_indexes {
1861            let (key, value) = introspection_source_index.into_key_value();
1862            self.introspection_sources.insert(key, value, self.op_id)?;
1863        }
1864
1865        Ok(())
1866    }
1867
1868    /// Set persisted system object mappings.
1869    pub fn set_system_object_mappings(
1870        &mut self,
1871        mappings: Vec<SystemObjectMapping>,
1872    ) -> Result<(), CatalogError> {
1873        if mappings.is_empty() {
1874            return Ok(());
1875        }
1876
1877        let mappings = mappings
1878            .into_iter()
1879            .map(DurableType::into_key_value)
1880            .map(|(k, v)| (k, Some(v)))
1881            .collect();
1882        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1883        Ok(())
1884    }
1885
1886    /// Set persisted replica.
1887    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1888        if replicas.is_empty() {
1889            return Ok(());
1890        }
1891
1892        let replicas = replicas
1893            .into_iter()
1894            .map(DurableType::into_key_value)
1895            .map(|(k, v)| (k, Some(v)))
1896            .collect();
1897        self.cluster_replicas.set_many(replicas, self.op_id)?;
1898        Ok(())
1899    }
1900
1901    /// Set persisted configuration.
1902    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1903        match value {
1904            Some(value) => {
1905                let config = Config { key, value };
1906                let (key, value) = config.into_key_value();
1907                self.configs.set(key, Some(value), self.op_id)?;
1908            }
1909            None => {
1910                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1911            }
1912        }
1913        Ok(())
1914    }
1915
1916    /// Get the value of a persisted config.
1917    pub fn get_config(&self, key: String) -> Option<u64> {
1918        self.configs
1919            .get(&ConfigKey { key })
1920            .map(|entry| entry.value)
1921    }
1922
1923    /// Get the value of a persisted setting.
1924    pub fn get_setting(&self, name: String) -> Option<&str> {
1925        self.settings
1926            .get(&SettingKey { name })
1927            .map(|entry| &*entry.value)
1928    }
1929
1930    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1931        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1932            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1933    }
1934
1935    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1936        self.set_setting(
1937            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1938            Some(shard_id.to_string()),
1939        )
1940    }
1941
1942    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1943        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1944            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1945    }
1946
1947    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1948        self.set_setting(
1949            EXPRESSION_CACHE_SHARD_KEY.to_string(),
1950            Some(shard_id.to_string()),
1951        )
1952    }
1953
1954    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
1955    /// match the `with_0dt_deployment_max_wait` "system var" value.
1956    ///
1957    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1958    /// but use it in boot before Launch Darkly is available.
1959    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1960        self.set_config(
1961            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1962            Some(
1963                value
1964                    .as_millis()
1965                    .try_into()
1966                    .expect("max wait fits into u64"),
1967            ),
1968        )
1969    }
1970
1971    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
1972    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
1973    /// value.
1974    ///
1975    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1976    /// but use it in boot before Launch Darkly is available.
1977    pub fn set_0dt_deployment_ddl_check_interval(
1978        &mut self,
1979        value: Duration,
1980    ) -> Result<(), CatalogError> {
1981        self.set_config(
1982            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
1983            Some(
1984                value
1985                    .as_millis()
1986                    .try_into()
1987                    .expect("ddl check interval fits into u64"),
1988            ),
1989        )
1990    }
1991
1992    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
1993    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
1994    ///
1995    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1996    /// but use it in boot before Launch Darkly is available.
1997    pub fn set_enable_0dt_deployment_panic_after_timeout(
1998        &mut self,
1999        value: bool,
2000    ) -> Result<(), CatalogError> {
2001        self.set_config(
2002            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2003            Some(u64::from(value)),
2004        )
2005    }
2006
2007    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2008    /// match the `with_0dt_deployment_max_wait` "system var" value.
2009    ///
2010    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2011    /// but use it in boot before LaunchDarkly is available.
2012    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2013        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2014    }
2015
2016    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2017    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2018    /// value.
2019    ///
2020    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2021    /// use it in boot before LaunchDarkly is available.
2022    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2023        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2024    }
2025
2026    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2027    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2028    /// var" value.
2029    ///
2030    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2031    /// use it in boot before LaunchDarkly is available.
2032    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2033        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2034    }
2035
2036    /// Updates the catalog `system_config_synced` "config" value to true.
2037    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2038        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2039    }
2040
2041    pub fn update_comment(
2042        &mut self,
2043        object_id: CommentObjectId,
2044        sub_component: Option<usize>,
2045        comment: Option<String>,
2046    ) -> Result<(), CatalogError> {
2047        let key = CommentKey {
2048            object_id,
2049            sub_component,
2050        };
2051        let value = comment.map(|c| CommentValue { comment: c });
2052        self.comments.set(key, value, self.op_id)?;
2053
2054        Ok(())
2055    }
2056
2057    pub fn drop_comments(
2058        &mut self,
2059        object_ids: &BTreeSet<CommentObjectId>,
2060    ) -> Result<(), CatalogError> {
2061        if object_ids.is_empty() {
2062            return Ok(());
2063        }
2064
2065        self.comments
2066            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2067        Ok(())
2068    }
2069
2070    pub fn update_source_references(
2071        &mut self,
2072        source_id: CatalogItemId,
2073        references: Vec<SourceReference>,
2074        updated_at: u64,
2075    ) -> Result<(), CatalogError> {
2076        let key = SourceReferencesKey { source_id };
2077        let value = SourceReferencesValue {
2078            references,
2079            updated_at,
2080        };
2081        self.source_references.set(key, Some(value), self.op_id)?;
2082        Ok(())
2083    }
2084
2085    /// Upserts persisted system configuration `name` to `value`.
2086    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2087        let key = ServerConfigurationKey {
2088            name: name.to_string(),
2089        };
2090        let value = ServerConfigurationValue { value };
2091        self.system_configurations
2092            .set(key, Some(value), self.op_id)?;
2093        Ok(())
2094    }
2095
2096    /// Removes persisted system configuration `name`.
2097    pub fn remove_system_config(&mut self, name: &str) {
2098        let key = ServerConfigurationKey {
2099            name: name.to_string(),
2100        };
2101        self.system_configurations
2102            .set(key, None, self.op_id)
2103            .expect("cannot have uniqueness violation");
2104    }
2105
2106    /// Removes all persisted system configurations.
2107    pub fn clear_system_configs(&mut self) {
2108        self.system_configurations.delete(|_k, _v| true, self.op_id);
2109    }
2110
2111    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2112        match self.configs.insert(
2113            ConfigKey { key: key.clone() },
2114            ConfigValue { value },
2115            self.op_id,
2116        ) {
2117            Ok(_) => Ok(()),
2118            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2119        }
2120    }
2121
2122    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2123        self.clusters
2124            .items()
2125            .into_iter()
2126            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2127    }
2128
2129    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2130        self.cluster_replicas
2131            .items()
2132            .into_iter()
2133            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2134    }
2135
2136    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2137        self.roles
2138            .items()
2139            .into_iter()
2140            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2141    }
2142
2143    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2144        self.network_policies
2145            .items()
2146            .into_iter()
2147            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2148    }
2149
2150    pub fn get_system_object_mappings(
2151        &self,
2152    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2153        self.system_gid_mapping
2154            .items()
2155            .into_iter()
2156            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2157    }
2158
2159    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2160        self.schemas
2161            .items()
2162            .into_iter()
2163            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2164    }
2165
2166    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2167        self.system_configurations
2168            .items()
2169            .into_iter()
2170            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2171    }
2172
2173    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2174        let key = SchemaKey { id: *id };
2175        self.schemas
2176            .get(&key)
2177            .map(|v| DurableType::from_key_value(key, v.clone()))
2178    }
2179
2180    pub fn get_introspection_source_indexes(
2181        &self,
2182        cluster_id: ClusterId,
2183    ) -> BTreeMap<&str, (GlobalId, u32)> {
2184        self.introspection_sources
2185            .items()
2186            .into_iter()
2187            .filter(|(k, _v)| k.cluster_id == cluster_id)
2188            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2189            .collect()
2190    }
2191
2192    pub fn get_catalog_content_version(&self) -> Option<&str> {
2193        self.settings
2194            .get(&SettingKey {
2195                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2196            })
2197            .map(|value| &*value.value)
2198    }
2199
2200    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2201        self.settings
2202            .get(&SettingKey {
2203                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2204            })
2205            .map(|value| value.value.clone())
2206    }
2207
2208    /// Commit the current operation within the transaction. This does not cause anything to be
2209    /// written durably, but signals to the current transaction that we are moving on to the next
2210    /// operation.
2211    ///
2212    /// Returns the updates of the committed operation.
2213    #[must_use]
2214    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2215        let updates = self.get_op_updates();
2216        self.commit_op();
2217        updates
2218    }
2219
2220    fn get_op_updates(&self) -> Vec<StateUpdate> {
2221        fn get_collection_op_updates<'a, T>(
2222            table_txn: &'a TableTransaction<T::Key, T::Value>,
2223            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2224            op: Timestamp,
2225        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2226        where
2227            T::Key: Ord + Eq + Clone + Debug,
2228            T::Value: Ord + Clone + Debug,
2229            T: DurableType,
2230        {
2231            table_txn
2232                .pending
2233                .iter()
2234                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2235                .filter_map(move |(k, v)| {
2236                    if v.ts == op {
2237                        let key = k.clone();
2238                        let value = v.value.clone();
2239                        let diff = v.diff.clone().try_into().expect("invalid diff");
2240                        let update = DurableType::from_key_value(key, value);
2241                        let kind = kind_fn(update);
2242                        Some((kind, diff))
2243                    } else {
2244                        None
2245                    }
2246                })
2247        }
2248
2249        fn get_large_collection_op_updates<'a, T>(
2250            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2251            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2252            op: Timestamp,
2253        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2254        where
2255            T::Key: Ord + Eq + Clone + Debug,
2256            T: DurableType<Value = ()>,
2257        {
2258            collection.iter().filter_map(move |(k, diff, ts)| {
2259                if *ts == op {
2260                    let key = k.clone();
2261                    let diff = diff.clone().try_into().expect("invalid diff");
2262                    let update = DurableType::from_key_value(key, ());
2263                    let kind = kind_fn(update);
2264                    Some((kind, diff))
2265                } else {
2266                    None
2267                }
2268            })
2269        }
2270
2271        let Transaction {
2272            durable_catalog: _,
2273            databases,
2274            schemas,
2275            items,
2276            comments,
2277            roles,
2278            role_auth,
2279            clusters,
2280            network_policies,
2281            cluster_replicas,
2282            introspection_sources,
2283            system_gid_mapping,
2284            system_configurations,
2285            default_privileges,
2286            source_references,
2287            system_privileges,
2288            audit_log_updates,
2289            storage_collection_metadata,
2290            unfinalized_shards,
2291            // Not representable as a `StateUpdate`.
2292            id_allocator: _,
2293            configs: _,
2294            settings: _,
2295            txn_wal_shard: _,
2296            upper,
2297            op_id: _,
2298        } = &self;
2299
2300        let updates = std::iter::empty()
2301            .chain(get_collection_op_updates(
2302                roles,
2303                StateUpdateKind::Role,
2304                self.op_id,
2305            ))
2306            .chain(get_collection_op_updates(
2307                role_auth,
2308                StateUpdateKind::RoleAuth,
2309                self.op_id,
2310            ))
2311            .chain(get_collection_op_updates(
2312                databases,
2313                StateUpdateKind::Database,
2314                self.op_id,
2315            ))
2316            .chain(get_collection_op_updates(
2317                schemas,
2318                StateUpdateKind::Schema,
2319                self.op_id,
2320            ))
2321            .chain(get_collection_op_updates(
2322                default_privileges,
2323                StateUpdateKind::DefaultPrivilege,
2324                self.op_id,
2325            ))
2326            .chain(get_collection_op_updates(
2327                system_privileges,
2328                StateUpdateKind::SystemPrivilege,
2329                self.op_id,
2330            ))
2331            .chain(get_collection_op_updates(
2332                system_configurations,
2333                StateUpdateKind::SystemConfiguration,
2334                self.op_id,
2335            ))
2336            .chain(get_collection_op_updates(
2337                clusters,
2338                StateUpdateKind::Cluster,
2339                self.op_id,
2340            ))
2341            .chain(get_collection_op_updates(
2342                network_policies,
2343                StateUpdateKind::NetworkPolicy,
2344                self.op_id,
2345            ))
2346            .chain(get_collection_op_updates(
2347                introspection_sources,
2348                StateUpdateKind::IntrospectionSourceIndex,
2349                self.op_id,
2350            ))
2351            .chain(get_collection_op_updates(
2352                cluster_replicas,
2353                StateUpdateKind::ClusterReplica,
2354                self.op_id,
2355            ))
2356            .chain(get_collection_op_updates(
2357                system_gid_mapping,
2358                StateUpdateKind::SystemObjectMapping,
2359                self.op_id,
2360            ))
2361            .chain(get_collection_op_updates(
2362                items,
2363                StateUpdateKind::Item,
2364                self.op_id,
2365            ))
2366            .chain(get_collection_op_updates(
2367                comments,
2368                StateUpdateKind::Comment,
2369                self.op_id,
2370            ))
2371            .chain(get_collection_op_updates(
2372                source_references,
2373                StateUpdateKind::SourceReferences,
2374                self.op_id,
2375            ))
2376            .chain(get_collection_op_updates(
2377                storage_collection_metadata,
2378                StateUpdateKind::StorageCollectionMetadata,
2379                self.op_id,
2380            ))
2381            .chain(get_collection_op_updates(
2382                unfinalized_shards,
2383                StateUpdateKind::UnfinalizedShard,
2384                self.op_id,
2385            ))
2386            .chain(get_large_collection_op_updates(
2387                audit_log_updates,
2388                StateUpdateKind::AuditLog,
2389                self.op_id,
2390            ))
2391            .map(|(kind, diff)| StateUpdate {
2392                kind,
2393                ts: upper.clone(),
2394                diff,
2395            })
2396            .collect();
2397
2398        updates
2399    }
2400
2401    pub fn is_savepoint(&self) -> bool {
2402        self.durable_catalog.is_savepoint()
2403    }
2404
2405    fn commit_op(&mut self) {
2406        self.op_id += 1;
2407    }
2408
2409    pub fn op_id(&self) -> Timestamp {
2410        self.op_id
2411    }
2412
2413    pub fn upper(&self) -> mz_repr::Timestamp {
2414        self.upper
2415    }
2416
2417    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2418        let audit_log_updates = self
2419            .audit_log_updates
2420            .into_iter()
2421            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2422            .collect();
2423
2424        let txn_batch = TransactionBatch {
2425            databases: self.databases.pending(),
2426            schemas: self.schemas.pending(),
2427            items: self.items.pending(),
2428            comments: self.comments.pending(),
2429            roles: self.roles.pending(),
2430            role_auth: self.role_auth.pending(),
2431            clusters: self.clusters.pending(),
2432            cluster_replicas: self.cluster_replicas.pending(),
2433            network_policies: self.network_policies.pending(),
2434            introspection_sources: self.introspection_sources.pending(),
2435            id_allocator: self.id_allocator.pending(),
2436            configs: self.configs.pending(),
2437            source_references: self.source_references.pending(),
2438            settings: self.settings.pending(),
2439            system_gid_mapping: self.system_gid_mapping.pending(),
2440            system_configurations: self.system_configurations.pending(),
2441            default_privileges: self.default_privileges.pending(),
2442            system_privileges: self.system_privileges.pending(),
2443            storage_collection_metadata: self.storage_collection_metadata.pending(),
2444            unfinalized_shards: self.unfinalized_shards.pending(),
2445            txn_wal_shard: self.txn_wal_shard.pending(),
2446            audit_log_updates,
2447            upper: self.upper,
2448        };
2449        (txn_batch, self.durable_catalog)
2450    }
2451
2452    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2453    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2454    /// before proceeding. In general, this must be fatal to the calling process. We do not
2455    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2456    ///
2457    /// The transaction is committed at `commit_ts`.
2458    ///
2459    /// Returns what the upper was directly after the transaction committed.
2460    ///
2461    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2462    /// catalog is not writeable.
2463    #[mz_ore::instrument(level = "debug")]
2464    pub(crate) async fn commit_internal(
2465        self,
2466        commit_ts: mz_repr::Timestamp,
2467    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2468        let (mut txn_batch, durable_catalog) = self.into_parts();
2469        let TransactionBatch {
2470            databases,
2471            schemas,
2472            items,
2473            comments,
2474            roles,
2475            role_auth,
2476            clusters,
2477            cluster_replicas,
2478            network_policies,
2479            introspection_sources,
2480            id_allocator,
2481            configs,
2482            source_references,
2483            settings,
2484            system_gid_mapping,
2485            system_configurations,
2486            default_privileges,
2487            system_privileges,
2488            storage_collection_metadata,
2489            unfinalized_shards,
2490            txn_wal_shard,
2491            audit_log_updates,
2492            upper,
2493        } = &mut txn_batch;
2494        // Consolidate in memory because it will likely be faster than consolidating after the
2495        // transaction has been made durable.
2496        differential_dataflow::consolidation::consolidate_updates(databases);
2497        differential_dataflow::consolidation::consolidate_updates(schemas);
2498        differential_dataflow::consolidation::consolidate_updates(items);
2499        differential_dataflow::consolidation::consolidate_updates(comments);
2500        differential_dataflow::consolidation::consolidate_updates(roles);
2501        differential_dataflow::consolidation::consolidate_updates(role_auth);
2502        differential_dataflow::consolidation::consolidate_updates(clusters);
2503        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2504        differential_dataflow::consolidation::consolidate_updates(network_policies);
2505        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2506        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2507        differential_dataflow::consolidation::consolidate_updates(configs);
2508        differential_dataflow::consolidation::consolidate_updates(settings);
2509        differential_dataflow::consolidation::consolidate_updates(source_references);
2510        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2511        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2512        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2513        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2514        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2515        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2516        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2517        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2518
2519        assert!(
2520            commit_ts >= *upper,
2521            "expected commit ts, {}, to be greater than or equal to upper, {}",
2522            commit_ts,
2523            upper
2524        );
2525        let upper = durable_catalog
2526            .commit_transaction(txn_batch, commit_ts)
2527            .await?;
2528        Ok((durable_catalog, upper))
2529    }
2530
2531    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2532    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2533    /// before proceeding. In general, this must be fatal to the calling process. We do not
2534    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2535    ///
2536    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2537    /// catalog is not writeable.
2538    ///
2539    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2540    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2541    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2542    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2543    ///
2544    /// An alternative implementation would be for the caller to explicitly consume their updates
2545    /// after committing and only then apply the updates in-memory. While this removes assumptions
2546    /// about the caller in this method, in practice it results in duplicate work on every commit.
2547    #[mz_ore::instrument(level = "debug")]
2548    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2549        let op_updates = self.get_op_updates();
2550        assert!(
2551            op_updates.is_empty(),
2552            "unconsumed transaction updates: {op_updates:?}"
2553        );
2554
2555        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2556        // Drain all the updates from the commit since it is assumed that they were already applied.
2557        let updates = durable_storage.sync_updates(upper).await?;
2558        // Writable and savepoint catalogs should have consumed all updates before committing a
2559        // transaction, otherwise the commit was performed with an out of date state.
2560        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2561        // updates before committing.
2562        soft_assert_no_log!(
2563            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2564            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2565        );
2566        Ok(())
2567    }
2568}
2569
2570use crate::durable::async_trait;
2571
2572use super::objects::{RoleAuthKey, RoleAuthValue};
2573
2574#[async_trait]
2575impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2576    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2577        self.storage_collection_metadata
2578            .items()
2579            .into_iter()
2580            .map(
2581                |(
2582                    StorageCollectionMetadataKey { id },
2583                    StorageCollectionMetadataValue { shard },
2584                )| { (*id, shard.clone()) },
2585            )
2586            .collect()
2587    }
2588
2589    fn insert_collection_metadata(
2590        &mut self,
2591        metadata: BTreeMap<GlobalId, ShardId>,
2592    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2593        for (id, shard) in metadata {
2594            self.storage_collection_metadata
2595                .insert(
2596                    StorageCollectionMetadataKey { id },
2597                    StorageCollectionMetadataValue {
2598                        shard: shard.clone(),
2599                    },
2600                    self.op_id,
2601                )
2602                .map_err(|err| match err {
2603                    DurableCatalogError::DuplicateKey => {
2604                        StorageError::CollectionMetadataAlreadyExists(id)
2605                    }
2606                    DurableCatalogError::UniquenessViolation => {
2607                        StorageError::PersistShardAlreadyInUse(shard)
2608                    }
2609                    err => StorageError::Generic(anyhow::anyhow!(err)),
2610                })?;
2611        }
2612        Ok(())
2613    }
2614
2615    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2616        let ks: Vec<_> = ids
2617            .into_iter()
2618            .map(|id| StorageCollectionMetadataKey { id })
2619            .collect();
2620        self.storage_collection_metadata
2621            .delete_by_keys(ks, self.op_id)
2622            .into_iter()
2623            .map(
2624                |(
2625                    StorageCollectionMetadataKey { id },
2626                    StorageCollectionMetadataValue { shard },
2627                )| (id, shard),
2628            )
2629            .collect()
2630    }
2631
2632    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2633        self.unfinalized_shards
2634            .items()
2635            .into_iter()
2636            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2637            .collect()
2638    }
2639
2640    fn insert_unfinalized_shards(
2641        &mut self,
2642        s: BTreeSet<ShardId>,
2643    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2644        for shard in s {
2645            match self
2646                .unfinalized_shards
2647                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2648            {
2649                // Inserting duplicate keys has no effect.
2650                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2651                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2652            };
2653        }
2654        Ok(())
2655    }
2656
2657    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2658        let ks: Vec<_> = shards
2659            .into_iter()
2660            .map(|shard| UnfinalizedShardKey { shard })
2661            .collect();
2662        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2663    }
2664
2665    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2666        self.txn_wal_shard
2667            .values()
2668            .iter()
2669            .next()
2670            .map(|TxnWalShardValue { shard }| *shard)
2671    }
2672
2673    fn write_txn_wal_shard(
2674        &mut self,
2675        shard: ShardId,
2676    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2677        self.txn_wal_shard
2678            .insert((), TxnWalShardValue { shard }, self.op_id)
2679            .map_err(|err| match err {
2680                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2681                err => StorageError::Generic(anyhow::anyhow!(err)),
2682            })
2683    }
2684}
2685
2686/// Describes a set of changes to apply as the result of a catalog transaction.
2687#[derive(Debug, Clone, Default, PartialEq)]
2688pub struct TransactionBatch {
2689    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2690    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2691    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2692    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2693    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2694    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2695    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2696    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2697    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2698    pub(crate) introspection_sources: Vec<(
2699        proto::ClusterIntrospectionSourceIndexKey,
2700        proto::ClusterIntrospectionSourceIndexValue,
2701        Diff,
2702    )>,
2703    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2704    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2705    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2706    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2707    pub(crate) system_configurations: Vec<(
2708        proto::ServerConfigurationKey,
2709        proto::ServerConfigurationValue,
2710        Diff,
2711    )>,
2712    pub(crate) default_privileges: Vec<(
2713        proto::DefaultPrivilegesKey,
2714        proto::DefaultPrivilegesValue,
2715        Diff,
2716    )>,
2717    pub(crate) source_references: Vec<(
2718        proto::SourceReferencesKey,
2719        proto::SourceReferencesValue,
2720        Diff,
2721    )>,
2722    pub(crate) system_privileges: Vec<(
2723        proto::SystemPrivilegesKey,
2724        proto::SystemPrivilegesValue,
2725        Diff,
2726    )>,
2727    pub(crate) storage_collection_metadata: Vec<(
2728        proto::StorageCollectionMetadataKey,
2729        proto::StorageCollectionMetadataValue,
2730        Diff,
2731    )>,
2732    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2733    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2734    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2735    /// The upper of the catalog when the transaction started.
2736    pub(crate) upper: mz_repr::Timestamp,
2737}
2738
2739impl TransactionBatch {
2740    pub fn is_empty(&self) -> bool {
2741        let TransactionBatch {
2742            databases,
2743            schemas,
2744            items,
2745            comments,
2746            roles,
2747            role_auth,
2748            clusters,
2749            cluster_replicas,
2750            network_policies,
2751            introspection_sources,
2752            id_allocator,
2753            configs,
2754            settings,
2755            source_references,
2756            system_gid_mapping,
2757            system_configurations,
2758            default_privileges,
2759            system_privileges,
2760            storage_collection_metadata,
2761            unfinalized_shards,
2762            txn_wal_shard,
2763            audit_log_updates,
2764            upper: _,
2765        } = self;
2766        databases.is_empty()
2767            && schemas.is_empty()
2768            && items.is_empty()
2769            && comments.is_empty()
2770            && roles.is_empty()
2771            && role_auth.is_empty()
2772            && clusters.is_empty()
2773            && cluster_replicas.is_empty()
2774            && network_policies.is_empty()
2775            && introspection_sources.is_empty()
2776            && id_allocator.is_empty()
2777            && configs.is_empty()
2778            && settings.is_empty()
2779            && source_references.is_empty()
2780            && system_gid_mapping.is_empty()
2781            && system_configurations.is_empty()
2782            && default_privileges.is_empty()
2783            && system_privileges.is_empty()
2784            && storage_collection_metadata.is_empty()
2785            && unfinalized_shards.is_empty()
2786            && txn_wal_shard.is_empty()
2787            && audit_log_updates.is_empty()
2788    }
2789}
2790
2791#[derive(Debug, Clone, PartialEq, Eq)]
2792struct TransactionUpdate<V> {
2793    value: V,
2794    ts: Timestamp,
2795    diff: Diff,
2796}
2797
2798/// Utility trait to check for plan validity.
2799trait UniqueName {
2800    /// Does the item have a unique name? If yes, we can check for name equality in validity
2801    /// checking.
2802    const HAS_UNIQUE_NAME: bool;
2803    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2804    fn unique_name(&self) -> &str;
2805}
2806
2807mod unique_name {
2808    use crate::durable::objects::*;
2809
2810    macro_rules! impl_unique_name {
2811        ($($t:ty),* $(,)?) => {
2812            $(
2813                impl crate::durable::transaction::UniqueName for $t {
2814                    const HAS_UNIQUE_NAME: bool = true;
2815                    fn unique_name(&self) -> &str {
2816                        &self.name
2817                    }
2818                }
2819            )*
2820        };
2821    }
2822
2823    macro_rules! impl_no_unique_name {
2824        ($($t:ty),* $(,)?) => {
2825            $(
2826                impl crate::durable::transaction::UniqueName for $t {
2827                    const HAS_UNIQUE_NAME: bool = false;
2828                    fn unique_name(&self) -> &str {
2829                       ""
2830                    }
2831                }
2832            )*
2833        };
2834    }
2835
2836    impl_unique_name! {
2837        ClusterReplicaValue,
2838        ClusterValue,
2839        DatabaseValue,
2840        ItemValue,
2841        NetworkPolicyValue,
2842        RoleValue,
2843        SchemaValue,
2844    }
2845
2846    impl_no_unique_name!(
2847        (),
2848        ClusterIntrospectionSourceIndexValue,
2849        CommentValue,
2850        ConfigValue,
2851        DefaultPrivilegesValue,
2852        GidMappingValue,
2853        IdAllocValue,
2854        ServerConfigurationValue,
2855        SettingValue,
2856        SourceReferencesValue,
2857        StorageCollectionMetadataValue,
2858        SystemPrivilegesValue,
2859        TxnWalShardValue,
2860        RoleAuthValue,
2861    );
2862
2863    #[cfg(test)]
2864    mod test {
2865        impl_no_unique_name!(String,);
2866    }
2867}
2868
2869/// TableTransaction emulates some features of a typical SQL transaction over
2870/// table for a Collection.
2871///
2872/// It supports:
2873/// - uniqueness constraints
2874/// - transactional reads and writes (including read-your-writes before commit)
2875///
2876/// `K` is the primary key type. Multiple entries with the same key are disallowed.
2877/// `V` is the an arbitrary value type.
2878#[derive(Debug)]
2879struct TableTransaction<K, V> {
2880    initial: BTreeMap<K, V>,
2881    // The desired updates to keys after commit.
2882    // Invariant: Value is sorted by `ts`.
2883    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2884    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2885}
2886
2887impl<K, V> TableTransaction<K, V>
2888where
2889    K: Ord + Eq + Clone + Debug,
2890    V: Ord + Clone + Debug + UniqueName,
2891{
2892    /// Create a new TableTransaction with initial data.
2893    ///
2894    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
2895    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
2896    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
2897    /// over.
2898    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2899    where
2900        K: RustType<KP>,
2901        V: RustType<VP>,
2902    {
2903        let initial = initial
2904            .into_iter()
2905            .map(RustType::from_proto)
2906            .collect::<Result<_, _>>()?;
2907
2908        Ok(Self {
2909            initial,
2910            pending: BTreeMap::new(),
2911            uniqueness_violation: None,
2912        })
2913    }
2914
2915    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
2916    /// that determines whether there is a uniqueness violation among two values.
2917    fn new_with_uniqueness_fn<KP, VP>(
2918        initial: BTreeMap<KP, VP>,
2919        uniqueness_violation: fn(a: &V, b: &V) -> bool,
2920    ) -> Result<Self, TryFromProtoError>
2921    where
2922        K: RustType<KP>,
2923        V: RustType<VP>,
2924    {
2925        let initial = initial
2926            .into_iter()
2927            .map(RustType::from_proto)
2928            .collect::<Result<_, _>>()?;
2929
2930        Ok(Self {
2931            initial,
2932            pending: BTreeMap::new(),
2933            uniqueness_violation: Some(uniqueness_violation),
2934        })
2935    }
2936
2937    /// Consumes and returns the pending changes and their diffs. `Diff` is
2938    /// guaranteed to be 1 or -1.
2939    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2940    where
2941        K: RustType<KP>,
2942        V: RustType<VP>,
2943    {
2944        soft_assert_no_log!(self.verify().is_ok());
2945        // Pending describes the desired final state for some keys. K,V pairs should be
2946        // retracted if they already exist and were deleted or are being updated.
2947        self.pending
2948            .into_iter()
2949            .flat_map(|(k, v)| {
2950                let mut v: Vec<_> = v
2951                    .into_iter()
2952                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2953                    .collect();
2954                differential_dataflow::consolidation::consolidate(&mut v);
2955                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2956            })
2957            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2958            .collect()
2959    }
2960
2961    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
2962    ///
2963    /// Runtime is O(n^2), where n is the number of items in `self`, if
2964    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
2965    fn verify(&self) -> Result<(), DurableCatalogError> {
2966        if let Some(uniqueness_violation) = self.uniqueness_violation {
2967            // Compare each value to each other value and ensure they are unique.
2968            let items = self.values();
2969            if V::HAS_UNIQUE_NAME {
2970                let by_name: BTreeMap<_, _> = items
2971                    .iter()
2972                    .enumerate()
2973                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2974                    .collect();
2975                for (i, vi) in items.iter().enumerate() {
2976                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
2977                        if i != *j && uniqueness_violation(vi, *vj) {
2978                            return Err(DurableCatalogError::UniquenessViolation);
2979                        }
2980                    }
2981                }
2982            } else {
2983                for (i, vi) in items.iter().enumerate() {
2984                    for (j, vj) in items.iter().enumerate() {
2985                        if i != j && uniqueness_violation(vi, vj) {
2986                            return Err(DurableCatalogError::UniquenessViolation);
2987                        }
2988                    }
2989                }
2990            }
2991        }
2992        soft_assert_no_log!(
2993            self.pending
2994                .values()
2995                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
2996            "pending should be sorted by timestamp: {:?}",
2997            self.pending
2998        );
2999        Ok(())
3000    }
3001
3002    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3003    ///
3004    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3005    /// items in `keys`.
3006    fn verify_keys<'a>(
3007        &self,
3008        keys: impl IntoIterator<Item = &'a K>,
3009    ) -> Result<(), DurableCatalogError>
3010    where
3011        K: 'a,
3012    {
3013        if let Some(uniqueness_violation) = self.uniqueness_violation {
3014            let entries: Vec<_> = keys
3015                .into_iter()
3016                .filter_map(|key| self.get(key).map(|value| (key, value)))
3017                .collect();
3018            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3019            for (ki, vi) in self.items() {
3020                for (kj, vj) in &entries {
3021                    if ki != *kj && uniqueness_violation(vi, vj) {
3022                        return Err(DurableCatalogError::UniquenessViolation);
3023                    }
3024                }
3025            }
3026        }
3027        soft_assert_no_log!(self.verify().is_ok());
3028        Ok(())
3029    }
3030
3031    /// Iterates over the items viewable in the current transaction in arbitrary
3032    /// order and applies `f` on all key, value pairs.
3033    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3034        let mut seen = BTreeSet::new();
3035        for k in self.pending.keys() {
3036            seen.insert(k);
3037            let v = self.get(k);
3038            // Deleted items don't exist so shouldn't be visited, but still suppress
3039            // visiting the key later.
3040            if let Some(v) = v {
3041                f(k, v);
3042            }
3043        }
3044        for (k, v) in self.initial.iter() {
3045            // Add on initial items that don't have updates.
3046            if !seen.contains(k) {
3047                f(k, v);
3048            }
3049        }
3050    }
3051
3052    /// Returns the current value of `k`.
3053    fn get(&self, k: &K) -> Option<&V> {
3054        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3055        let mut updates = Vec::with_capacity(pending.len() + 1);
3056        if let Some(initial) = self.initial.get(k) {
3057            updates.push((initial, Diff::ONE));
3058        }
3059        updates.extend(
3060            pending
3061                .into_iter()
3062                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3063        );
3064
3065        differential_dataflow::consolidation::consolidate(&mut updates);
3066        assert!(updates.len() <= 1);
3067        updates.into_iter().next().map(|(v, _)| v)
3068    }
3069
3070    /// Returns the items viewable in the current transaction. The items are
3071    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3072    /// [`Self::for_values`].
3073    // Used by tests.
3074    #[cfg(test)]
3075    fn items_cloned(&self) -> BTreeMap<K, V> {
3076        let mut items = BTreeMap::new();
3077        self.for_values(|k, v| {
3078            items.insert(k.clone(), v.clone());
3079        });
3080        items
3081    }
3082
3083    /// Returns the items viewable in the current transaction as references. Returns a map
3084    /// of references.
3085    fn items(&self) -> BTreeMap<&K, &V> {
3086        let mut items = BTreeMap::new();
3087        self.for_values(|k, v| {
3088            items.insert(k, v);
3089        });
3090        items
3091    }
3092
3093    /// Returns the values viewable in the current transaction as references.
3094    fn values(&self) -> BTreeSet<&V> {
3095        let mut items = BTreeSet::new();
3096        self.for_values(|_, v| {
3097            items.insert(v);
3098        });
3099        items
3100    }
3101
3102    /// Returns the number of items viewable in the current transaction.
3103    fn len(&self) -> usize {
3104        let mut count = 0;
3105        self.for_values(|_, _| {
3106            count += 1;
3107        });
3108        count
3109    }
3110
3111    /// Iterates over the items viewable in the current transaction, and provides a
3112    /// map where additional pending items can be inserted, which will be appended
3113    /// to current pending items. Does not verify uniqueness.
3114    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3115        &mut self,
3116        mut f: F,
3117    ) {
3118        let mut pending = BTreeMap::new();
3119        self.for_values(|k, v| f(&mut pending, k, v));
3120        for (k, updates) in pending {
3121            self.pending.entry(k).or_default().extend(updates);
3122        }
3123    }
3124
3125    /// Inserts a new k,v pair.
3126    ///
3127    /// Returns an error if the uniqueness check failed or the key already exists.
3128    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3129        let mut violation = None;
3130        self.for_values(|for_k, for_v| {
3131            if &k == for_k {
3132                violation = Some(DurableCatalogError::DuplicateKey);
3133            }
3134            if let Some(uniqueness_violation) = self.uniqueness_violation {
3135                if uniqueness_violation(for_v, &v) {
3136                    violation = Some(DurableCatalogError::UniquenessViolation);
3137                }
3138            }
3139        });
3140        if let Some(violation) = violation {
3141            return Err(violation);
3142        }
3143        self.pending.entry(k).or_default().push(TransactionUpdate {
3144            value: v,
3145            ts,
3146            diff: Diff::ONE,
3147        });
3148        soft_assert_no_log!(self.verify().is_ok());
3149        Ok(())
3150    }
3151
3152    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3153    /// value should be updated, otherwise `None`. Returns the number of changed
3154    /// entries.
3155    ///
3156    /// Returns an error if the uniqueness check failed.
3157    ///
3158    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3159    /// better performance.
3160    fn update<F: Fn(&K, &V) -> Option<V>>(
3161        &mut self,
3162        f: F,
3163        ts: Timestamp,
3164    ) -> Result<Diff, DurableCatalogError> {
3165        let mut changed = Diff::ZERO;
3166        let mut keys = BTreeSet::new();
3167        // Keep a copy of pending in case of uniqueness violation.
3168        let pending = self.pending.clone();
3169        self.for_values_mut(|p, k, v| {
3170            if let Some(next) = f(k, v) {
3171                changed += Diff::ONE;
3172                keys.insert(k.clone());
3173                let updates = p.entry(k.clone()).or_default();
3174                updates.push(TransactionUpdate {
3175                    value: v.clone(),
3176                    ts,
3177                    diff: Diff::MINUS_ONE,
3178                });
3179                updates.push(TransactionUpdate {
3180                    value: next,
3181                    ts,
3182                    diff: Diff::ONE,
3183                });
3184            }
3185        });
3186        // Check for uniqueness violation.
3187        if let Err(err) = self.verify_keys(&keys) {
3188            self.pending = pending;
3189            Err(err)
3190        } else {
3191            Ok(changed)
3192        }
3193    }
3194
3195    /// Updates `k`, `v` pair if `k` already exists in `self`.
3196    ///
3197    /// Returns `true` if `k` was updated, `false` otherwise.
3198    /// Returns an error if the uniqueness check failed.
3199    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3200        if let Some(cur_v) = self.get(&k) {
3201            if v != *cur_v {
3202                self.set(k, Some(v), ts)?;
3203            }
3204            Ok(true)
3205        } else {
3206            Ok(false)
3207        }
3208    }
3209
3210    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3211    ///
3212    /// Returns the number of changed entries.
3213    /// Returns an error if the uniqueness check failed.
3214    fn update_by_keys(
3215        &mut self,
3216        kvs: impl IntoIterator<Item = (K, V)>,
3217        ts: Timestamp,
3218    ) -> Result<Diff, DurableCatalogError> {
3219        let kvs: Vec<_> = kvs
3220            .into_iter()
3221            .filter_map(|(k, v)| match self.get(&k) {
3222                // Record if updating this entry would be a no-op.
3223                Some(cur_v) => Some((*cur_v == v, k, v)),
3224                None => None,
3225            })
3226            .collect();
3227        let changed = kvs.len();
3228        let changed =
3229            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3230        let kvs = kvs
3231            .into_iter()
3232            // Filter out no-ops to save some work.
3233            .filter(|(no_op, _, _)| !no_op)
3234            .map(|(_, k, v)| (k, Some(v)))
3235            .collect();
3236        self.set_many(kvs, ts)?;
3237        Ok(changed)
3238    }
3239
3240    /// Set the value for a key. Returns the previous entry if the key existed,
3241    /// otherwise None.
3242    ///
3243    /// Returns an error if the uniqueness check failed.
3244    ///
3245    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3246    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3247        let prev = self.get(&k).cloned();
3248        let entry = self.pending.entry(k.clone()).or_default();
3249        let restore_len = entry.len();
3250
3251        match (v, prev.clone()) {
3252            (Some(v), Some(prev)) => {
3253                entry.push(TransactionUpdate {
3254                    value: prev,
3255                    ts,
3256                    diff: Diff::MINUS_ONE,
3257                });
3258                entry.push(TransactionUpdate {
3259                    value: v,
3260                    ts,
3261                    diff: Diff::ONE,
3262                });
3263            }
3264            (Some(v), None) => {
3265                entry.push(TransactionUpdate {
3266                    value: v,
3267                    ts,
3268                    diff: Diff::ONE,
3269                });
3270            }
3271            (None, Some(prev)) => {
3272                entry.push(TransactionUpdate {
3273                    value: prev,
3274                    ts,
3275                    diff: Diff::MINUS_ONE,
3276                });
3277            }
3278            (None, None) => {}
3279        }
3280
3281        // Check for uniqueness violation.
3282        if let Err(err) = self.verify_keys([&k]) {
3283            // Revert self.pending to the state it was in before calling this
3284            // function.
3285            let pending = self.pending.get_mut(&k).expect("inserted above");
3286            pending.truncate(restore_len);
3287            Err(err)
3288        } else {
3289            Ok(prev)
3290        }
3291    }
3292
3293    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3294    /// otherwise None.
3295    ///
3296    /// Returns an error if any uniqueness check failed.
3297    fn set_many(
3298        &mut self,
3299        kvs: BTreeMap<K, Option<V>>,
3300        ts: Timestamp,
3301    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3302        if kvs.is_empty() {
3303            return Ok(BTreeMap::new());
3304        }
3305
3306        let mut prevs = BTreeMap::new();
3307        let mut restores = BTreeMap::new();
3308
3309        for (k, v) in kvs {
3310            let prev = self.get(&k).cloned();
3311            let entry = self.pending.entry(k.clone()).or_default();
3312            restores.insert(k.clone(), entry.len());
3313
3314            match (v, prev.clone()) {
3315                (Some(v), Some(prev)) => {
3316                    entry.push(TransactionUpdate {
3317                        value: prev,
3318                        ts,
3319                        diff: Diff::MINUS_ONE,
3320                    });
3321                    entry.push(TransactionUpdate {
3322                        value: v,
3323                        ts,
3324                        diff: Diff::ONE,
3325                    });
3326                }
3327                (Some(v), None) => {
3328                    entry.push(TransactionUpdate {
3329                        value: v,
3330                        ts,
3331                        diff: Diff::ONE,
3332                    });
3333                }
3334                (None, Some(prev)) => {
3335                    entry.push(TransactionUpdate {
3336                        value: prev,
3337                        ts,
3338                        diff: Diff::MINUS_ONE,
3339                    });
3340                }
3341                (None, None) => {}
3342            }
3343
3344            prevs.insert(k, prev);
3345        }
3346
3347        // Check for uniqueness violation.
3348        if let Err(err) = self.verify_keys(prevs.keys()) {
3349            for (k, restore_len) in restores {
3350                // Revert self.pending to the state it was in before calling this
3351                // function.
3352                let pending = self.pending.get_mut(&k).expect("inserted above");
3353                pending.truncate(restore_len);
3354            }
3355            Err(err)
3356        } else {
3357            Ok(prevs)
3358        }
3359    }
3360
3361    /// Deletes items for which `f` returns true. Returns the keys and values of
3362    /// the deleted entries.
3363    ///
3364    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3365    /// better performance.
3366    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3367        let mut deleted = Vec::new();
3368        self.for_values_mut(|p, k, v| {
3369            if f(k, v) {
3370                deleted.push((k.clone(), v.clone()));
3371                p.entry(k.clone()).or_default().push(TransactionUpdate {
3372                    value: v.clone(),
3373                    ts,
3374                    diff: Diff::MINUS_ONE,
3375                });
3376            }
3377        });
3378        soft_assert_no_log!(self.verify().is_ok());
3379        deleted
3380    }
3381
3382    /// Deletes item with key `k`.
3383    ///
3384    /// Returns the value of the deleted entry, if it existed.
3385    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3386        self.set(k, None, ts)
3387            .expect("deleting an entry cannot violate uniqueness")
3388    }
3389
3390    /// Deletes items with key in `ks`.
3391    ///
3392    /// Returns the keys and values of the deleted entries.
3393    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3394        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3395        let prevs = self
3396            .set_many(kvs, ts)
3397            .expect("deleting entries cannot violate uniqueness");
3398        prevs
3399            .into_iter()
3400            .filter_map(|(k, v)| v.map(|v| (k, v)))
3401            .collect()
3402    }
3403}
3404
3405#[cfg(test)]
3406#[allow(clippy::unwrap_used)]
3407mod tests {
3408    use super::*;
3409    use mz_ore::{assert_none, assert_ok};
3410
3411    use mz_ore::now::SYSTEM_TIME;
3412    use mz_persist_client::PersistClient;
3413
3414    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3415    use crate::memory;
3416
3417    #[mz_ore::test]
3418    fn test_table_transaction_simple() {
3419        fn uniqueness_violation(a: &String, b: &String) -> bool {
3420            a == b
3421        }
3422        let mut table = TableTransaction::new_with_uniqueness_fn(
3423            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3424            uniqueness_violation,
3425        )
3426        .unwrap();
3427
3428        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3429        // for DurableCatalogError.
3430        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3431        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3432        assert!(
3433            table
3434                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3435                .is_err()
3436        );
3437        assert!(
3438            table
3439                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3440                .is_err()
3441        );
3442    }
3443
3444    #[mz_ore::test]
3445    fn test_table_transaction() {
3446        fn uniqueness_violation(a: &String, b: &String) -> bool {
3447            a == b
3448        }
3449        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3450
3451        fn commit(
3452            table: &mut BTreeMap<Vec<u8>, String>,
3453            mut pending: Vec<(Vec<u8>, String, Diff)>,
3454        ) {
3455            // Sort by diff so that we process retractions first.
3456            pending.sort_by(|a, b| a.2.cmp(&b.2));
3457            for (k, v, diff) in pending {
3458                if diff == Diff::MINUS_ONE {
3459                    let prev = table.remove(&k);
3460                    assert_eq!(prev, Some(v));
3461                } else if diff == Diff::ONE {
3462                    let prev = table.insert(k, v);
3463                    assert_eq!(prev, None);
3464                } else {
3465                    panic!("unexpected diff: {diff}");
3466                }
3467            }
3468        }
3469
3470        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3471        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3472        let mut table_txn =
3473            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3474        assert_eq!(table_txn.items_cloned(), table);
3475        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3476        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3477        assert_eq!(
3478            table_txn.items_cloned(),
3479            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3480        );
3481        assert_eq!(
3482            table_txn
3483                .update(|_k, _v| Some("v3".to_string()), 2)
3484                .unwrap(),
3485            Diff::ONE
3486        );
3487
3488        // Uniqueness violation.
3489        table_txn
3490            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3491            .unwrap_err();
3492
3493        table_txn
3494            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3495            .unwrap();
3496        assert_eq!(
3497            table_txn.items_cloned(),
3498            BTreeMap::from([
3499                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3500                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3501            ])
3502        );
3503        let err = table_txn
3504            .update(|_k, _v| Some("v1".to_string()), 5)
3505            .unwrap_err();
3506        assert!(
3507            matches!(err, DurableCatalogError::UniquenessViolation),
3508            "unexpected err: {err:?}"
3509        );
3510        let pending = table_txn.pending();
3511        assert_eq!(
3512            pending,
3513            vec![
3514                (
3515                    1i64.to_le_bytes().to_vec(),
3516                    "v1".to_string(),
3517                    Diff::MINUS_ONE
3518                ),
3519                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3520                (
3521                    2i64.to_le_bytes().to_vec(),
3522                    "v2".to_string(),
3523                    Diff::MINUS_ONE
3524                ),
3525                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3526            ]
3527        );
3528        commit(&mut table, pending);
3529        assert_eq!(
3530            table,
3531            BTreeMap::from([
3532                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3533                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3534            ])
3535        );
3536
3537        let mut table_txn =
3538            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3539        // Deleting then creating an item that has a uniqueness violation should work.
3540        assert_eq!(
3541            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3542            1
3543        );
3544        table_txn
3545            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3546            .unwrap();
3547        // Uniqueness violation in value.
3548        table_txn
3549            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3550            .unwrap_err();
3551        // Key already exists, expect error.
3552        table_txn
3553            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3554            .unwrap_err();
3555        assert_eq!(
3556            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3557            1
3558        );
3559        // Both the inserts work now because the key and uniqueness violation are gone.
3560        table_txn
3561            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3562            .unwrap();
3563        table_txn
3564            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3565            .unwrap();
3566        let pending = table_txn.pending();
3567        assert_eq!(
3568            pending,
3569            vec![
3570                (
3571                    1i64.to_le_bytes().to_vec(),
3572                    "v3".to_string(),
3573                    Diff::MINUS_ONE
3574                ),
3575                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3576                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3577            ]
3578        );
3579        commit(&mut table, pending);
3580        assert_eq!(
3581            table,
3582            BTreeMap::from([
3583                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3584                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3585                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3586            ])
3587        );
3588
3589        let mut table_txn =
3590            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3591        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3592        table_txn
3593            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3594            .unwrap();
3595
3596        commit(&mut table, table_txn.pending());
3597        assert_eq!(
3598            table,
3599            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3600        );
3601
3602        let mut table_txn =
3603            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3604        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3605        table_txn
3606            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3607            .unwrap();
3608        commit(&mut table, table_txn.pending());
3609        assert_eq!(
3610            table,
3611            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3612        );
3613
3614        // Verify we don't try to delete v3 or v4 during commit.
3615        let mut table_txn =
3616            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3617        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3618        table_txn
3619            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3620            .unwrap();
3621        table_txn
3622            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3623            .unwrap_err();
3624        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3625        table_txn
3626            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3627            .unwrap();
3628        commit(&mut table, table_txn.pending());
3629        assert_eq!(
3630            table.clone().into_iter().collect::<Vec<_>>(),
3631            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3632        );
3633
3634        // Test `set`.
3635        let mut table_txn =
3636            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3637        // Uniqueness violation.
3638        table_txn
3639            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3640            .unwrap_err();
3641        table_txn
3642            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3643            .unwrap();
3644        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3645        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3646        let pending = table_txn.pending();
3647        assert_eq!(
3648            pending,
3649            vec![
3650                (
3651                    1i64.to_le_bytes().to_vec(),
3652                    "v5".to_string(),
3653                    Diff::MINUS_ONE
3654                ),
3655                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3656            ]
3657        );
3658        commit(&mut table, pending);
3659        assert_eq!(
3660            table,
3661            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3662        );
3663
3664        // Duplicate `set`.
3665        let mut table_txn =
3666            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3667        table_txn
3668            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3669            .unwrap();
3670        let pending = table_txn.pending::<Vec<u8>, String>();
3671        assert!(pending.is_empty());
3672
3673        // Test `set_many`.
3674        let mut table_txn =
3675            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3676        // Uniqueness violation.
3677        table_txn
3678            .set_many(
3679                BTreeMap::from([
3680                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3681                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3682                ]),
3683                0,
3684            )
3685            .unwrap_err();
3686        table_txn
3687            .set_many(
3688                BTreeMap::from([
3689                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3690                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3691                ]),
3692                1,
3693            )
3694            .unwrap();
3695        table_txn
3696            .set_many(
3697                BTreeMap::from([
3698                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3699                    (3i64.to_le_bytes().to_vec(), None),
3700                ]),
3701                2,
3702            )
3703            .unwrap();
3704        let pending = table_txn.pending();
3705        assert_eq!(
3706            pending,
3707            vec![
3708                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3709                (
3710                    3i64.to_le_bytes().to_vec(),
3711                    "v6".to_string(),
3712                    Diff::MINUS_ONE
3713                ),
3714                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3715            ]
3716        );
3717        commit(&mut table, pending);
3718        assert_eq!(
3719            table,
3720            BTreeMap::from([
3721                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3722                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3723            ])
3724        );
3725
3726        // Duplicate `set_many`.
3727        let mut table_txn =
3728            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3729        table_txn
3730            .set_many(
3731                BTreeMap::from([
3732                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3733                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3734                ]),
3735                0,
3736            )
3737            .unwrap();
3738        let pending = table_txn.pending::<Vec<u8>, String>();
3739        assert!(pending.is_empty());
3740        commit(&mut table, pending);
3741        assert_eq!(
3742            table,
3743            BTreeMap::from([
3744                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3745                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3746            ])
3747        );
3748
3749        // Test `update_by_key`
3750        let mut table_txn =
3751            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3752        // Uniqueness violation.
3753        table_txn
3754            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3755            .unwrap_err();
3756        assert!(
3757            table_txn
3758                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3759                .unwrap()
3760        );
3761        assert!(
3762            !table_txn
3763                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3764                .unwrap()
3765        );
3766        let pending = table_txn.pending();
3767        assert_eq!(
3768            pending,
3769            vec![
3770                (
3771                    1i64.to_le_bytes().to_vec(),
3772                    "v6".to_string(),
3773                    Diff::MINUS_ONE
3774                ),
3775                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3776            ]
3777        );
3778        commit(&mut table, pending);
3779        assert_eq!(
3780            table,
3781            BTreeMap::from([
3782                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3783                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3784            ])
3785        );
3786
3787        // Duplicate `update_by_key`.
3788        let mut table_txn =
3789            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3790        assert!(
3791            table_txn
3792                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3793                .unwrap()
3794        );
3795        let pending = table_txn.pending::<Vec<u8>, String>();
3796        assert!(pending.is_empty());
3797        commit(&mut table, pending);
3798        assert_eq!(
3799            table,
3800            BTreeMap::from([
3801                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3802                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3803            ])
3804        );
3805
3806        // Test `update_by_keys`
3807        let mut table_txn =
3808            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3809        // Uniqueness violation.
3810        table_txn
3811            .update_by_keys(
3812                [
3813                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3814                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3815                ],
3816                0,
3817            )
3818            .unwrap_err();
3819        let n = table_txn
3820            .update_by_keys(
3821                [
3822                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3823                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3824                ],
3825                1,
3826            )
3827            .unwrap();
3828        assert_eq!(n, Diff::ONE);
3829        let n = table_txn
3830            .update_by_keys(
3831                [
3832                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3833                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3834                ],
3835                2,
3836            )
3837            .unwrap();
3838        assert_eq!(n, Diff::ZERO);
3839        let pending = table_txn.pending();
3840        assert_eq!(
3841            pending,
3842            vec![
3843                (
3844                    1i64.to_le_bytes().to_vec(),
3845                    "v8".to_string(),
3846                    Diff::MINUS_ONE
3847                ),
3848                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3849            ]
3850        );
3851        commit(&mut table, pending);
3852        assert_eq!(
3853            table,
3854            BTreeMap::from([
3855                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3856                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3857            ])
3858        );
3859
3860        // Duplicate `update_by_keys`.
3861        let mut table_txn =
3862            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3863        let n = table_txn
3864            .update_by_keys(
3865                [
3866                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3867                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3868                ],
3869                0,
3870            )
3871            .unwrap();
3872        assert_eq!(n, Diff::from(2));
3873        let pending = table_txn.pending::<Vec<u8>, String>();
3874        assert!(pending.is_empty());
3875        commit(&mut table, pending);
3876        assert_eq!(
3877            table,
3878            BTreeMap::from([
3879                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3880                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3881            ])
3882        );
3883
3884        // Test `delete_by_key`
3885        let mut table_txn =
3886            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3887        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3888        assert_eq!(prev, Some("v9".to_string()));
3889        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3890        assert_none!(prev);
3891        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3892        assert_none!(prev);
3893        let pending = table_txn.pending();
3894        assert_eq!(
3895            pending,
3896            vec![(
3897                1i64.to_le_bytes().to_vec(),
3898                "v9".to_string(),
3899                Diff::MINUS_ONE
3900            ),]
3901        );
3902        commit(&mut table, pending);
3903        assert_eq!(
3904            table,
3905            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3906        );
3907
3908        // Test `delete_by_keys`
3909        let mut table_txn =
3910            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3911        let prevs = table_txn.delete_by_keys(
3912            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3913            0,
3914        );
3915        assert_eq!(
3916            prevs,
3917            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3918        );
3919        let prevs = table_txn.delete_by_keys(
3920            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3921            1,
3922        );
3923        assert_eq!(prevs, vec![]);
3924        let prevs = table_txn.delete_by_keys(
3925            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3926            2,
3927        );
3928        assert_eq!(prevs, vec![]);
3929        let pending = table_txn.pending();
3930        assert_eq!(
3931            pending,
3932            vec![(
3933                42i64.to_le_bytes().to_vec(),
3934                "v7".to_string(),
3935                Diff::MINUS_ONE
3936            ),]
3937        );
3938        commit(&mut table, pending);
3939        assert_eq!(table, BTreeMap::new());
3940    }
3941
3942    #[mz_ore::test(tokio::test)]
3943    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3944    async fn test_savepoint() {
3945        let persist_client = PersistClient::new_for_tests().await;
3946        let state_builder =
3947            TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
3948
3949        // Initialize catalog.
3950        let _ = state_builder
3951            .clone()
3952            .unwrap_build()
3953            .await
3954            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3955            .await
3956            .unwrap()
3957            .0;
3958        let mut savepoint_state = state_builder
3959            .unwrap_build()
3960            .await
3961            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3962            .await
3963            .unwrap()
3964            .0;
3965
3966        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3967        assert!(!initial_snapshot.is_empty());
3968
3969        let db_name = "db";
3970        let db_owner = RoleId::User(42);
3971        let db_privileges = Vec::new();
3972        let mut txn = savepoint_state.transaction().await.unwrap();
3973        let (db_id, db_oid) = txn
3974            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
3975            .unwrap();
3976        let commit_ts = txn.upper();
3977        txn.commit_internal(commit_ts).await.unwrap();
3978        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
3979        let update = updates.into_element();
3980
3981        assert_eq!(update.diff, StateDiff::Addition);
3982
3983        let db = match update.kind {
3984            memory::objects::StateUpdateKind::Database(db) => db,
3985            update => panic!("unexpected update: {update:?}"),
3986        };
3987
3988        assert_eq!(db_id, db.id);
3989        assert_eq!(db_oid, db.oid);
3990        assert_eq!(db_name, db.name);
3991        assert_eq!(db_owner, db.owner_id);
3992        assert_eq!(db_privileges, db.privileges);
3993    }
3994
3995    #[mz_ore::test]
3996    fn test_allocate_introspection_source_index_id() {
3997        let cluster_variant: u8 = 0b0000_0001;
3998        let cluster_id_inner: u64 =
3999            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4000        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4001
4002        let cluster_id = ClusterId::System(cluster_id_inner);
4003        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4004
4005        let introspection_source_index_id: u64 =
4006            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4007
4008        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4009        {
4010            let mut cluster_variant_mask = 0xFF << 56;
4011            cluster_variant_mask &= introspection_source_index_id;
4012            cluster_variant_mask >>= 56;
4013            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4014        }
4015
4016        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4017        {
4018            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4019            cluster_id_inner_mask &= introspection_source_index_id;
4020            cluster_id_inner_mask >>= 8;
4021            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4022        }
4023
4024        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4025        {
4026            let mut log_variant_mask = 0xFF;
4027            log_variant_mask &= introspection_source_index_id;
4028            assert_eq!(
4029                log_variant_mask,
4030                u64::from(timely_messages_received_log_variant)
4031            );
4032        }
4033
4034        let (catalog_item_id, global_id) =
4035            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4036
4037        assert_eq!(
4038            catalog_item_id,
4039            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4040        );
4041        assert_eq!(
4042            global_id,
4043            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4044        );
4045    }
4046}