Skip to main content

mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35    RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53    ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54    Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55    DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57    ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67    SCHEMA_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
68    SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
69    USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
70};
71use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
72
73type Timestamp = u64;
74
75/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
76/// An operation also logically groups multiple catalog updates together.
77#[derive(Derivative)]
78#[derivative(Debug)]
79pub struct Transaction<'a> {
80    #[derivative(Debug = "ignore")]
81    #[derivative(PartialEq = "ignore")]
82    durable_catalog: &'a mut dyn DurableCatalogState,
83    databases: TableTransaction<DatabaseKey, DatabaseValue>,
84    schemas: TableTransaction<SchemaKey, SchemaValue>,
85    items: TableTransaction<ItemKey, ItemValue>,
86    comments: TableTransaction<CommentKey, CommentValue>,
87    roles: TableTransaction<RoleKey, RoleValue>,
88    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
89    clusters: TableTransaction<ClusterKey, ClusterValue>,
90    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
91    introspection_sources:
92        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
93    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
94    configs: TableTransaction<ConfigKey, ConfigValue>,
95    settings: TableTransaction<SettingKey, SettingValue>,
96    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
97    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
98    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
99    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
100    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
101    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
102    storage_collection_metadata:
103        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
104    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
105    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
106    // Don't make this a table transaction so that it's not read into the
107    // in-memory cache.
108    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
109    /// The upper of `durable_catalog` at the start of the transaction.
110    upper: mz_repr::Timestamp,
111    /// The ID of the current operation of this transaction.
112    op_id: Timestamp,
113}
114
115impl<'a> Transaction<'a> {
116    pub fn new(
117        durable_catalog: &'a mut dyn DurableCatalogState,
118        Snapshot {
119            databases,
120            schemas,
121            roles,
122            role_auth,
123            items,
124            comments,
125            clusters,
126            network_policies,
127            cluster_replicas,
128            introspection_sources,
129            id_allocator,
130            configs,
131            settings,
132            source_references,
133            system_object_mappings,
134            system_configurations,
135            default_privileges,
136            system_privileges,
137            storage_collection_metadata,
138            unfinalized_shards,
139            txn_wal_shard,
140        }: Snapshot,
141        upper: mz_repr::Timestamp,
142    ) -> Result<Transaction<'a>, CatalogError> {
143        Ok(Transaction {
144            durable_catalog,
145            databases: TableTransaction::new_with_uniqueness_fn(
146                databases,
147                |a: &DatabaseValue, b| a.name == b.name,
148            )?,
149            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
150                a.database_id == b.database_id && a.name == b.name
151            })?,
152            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
153                a.schema_id == b.schema_id && a.name == b.name && {
154                    // `item_type` is slow, only compute if needed.
155                    let a_type = a.item_type();
156                    let b_type = b.item_type();
157                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
158                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
159                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
160                }
161            })?,
162            comments: TableTransaction::new(comments)?,
163            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
164                a.name == b.name
165            })?,
166            role_auth: TableTransaction::new(role_auth)?,
167            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
168                a.name == b.name
169            })?,
170            network_policies: TableTransaction::new_with_uniqueness_fn(
171                network_policies,
172                |a: &NetworkPolicyValue, b| a.name == b.name,
173            )?,
174            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
175                cluster_replicas,
176                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
177            )?,
178            introspection_sources: TableTransaction::new(introspection_sources)?,
179            id_allocator: TableTransaction::new(id_allocator)?,
180            configs: TableTransaction::new(configs)?,
181            settings: TableTransaction::new(settings)?,
182            source_references: TableTransaction::new(source_references)?,
183            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
184            system_configurations: TableTransaction::new(system_configurations)?,
185            default_privileges: TableTransaction::new(default_privileges)?,
186            system_privileges: TableTransaction::new(system_privileges)?,
187            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
188            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
189            // Uniqueness violations for this value occur at the key rather than
190            // the value (the key is the unit struct `()` so this is a singleton
191            // value).
192            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
193            audit_log_updates: Vec::new(),
194            upper,
195            op_id: 0,
196        })
197    }
198
199    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
200        let key = ItemKey { id: *id };
201        self.items
202            .get(&key)
203            .map(|v| DurableType::from_key_value(key, v.clone()))
204    }
205
206    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
207        self.items
208            .items()
209            .into_iter()
210            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
211            .sorted_by_key(|Item { id, .. }| *id)
212    }
213
214    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
215        self.insert_audit_log_events([event]);
216    }
217
218    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
219        let events = events
220            .into_iter()
221            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
222        self.audit_log_updates.extend(events);
223    }
224
225    pub fn insert_user_database(
226        &mut self,
227        database_name: &str,
228        owner_id: RoleId,
229        privileges: Vec<MzAclItem>,
230        temporary_oids: &HashSet<u32>,
231    ) -> Result<(DatabaseId, u32), CatalogError> {
232        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
233        let id = DatabaseId::User(id);
234        let oid = self.allocate_oid(temporary_oids)?;
235        self.insert_database(id, database_name, owner_id, privileges, oid)?;
236        Ok((id, oid))
237    }
238
239    pub(crate) fn insert_database(
240        &mut self,
241        id: DatabaseId,
242        database_name: &str,
243        owner_id: RoleId,
244        privileges: Vec<MzAclItem>,
245        oid: u32,
246    ) -> Result<u32, CatalogError> {
247        match self.databases.insert(
248            DatabaseKey { id },
249            DatabaseValue {
250                name: database_name.to_string(),
251                owner_id,
252                privileges,
253                oid,
254            },
255            self.op_id,
256        ) {
257            Ok(_) => Ok(oid),
258            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
259        }
260    }
261
262    pub fn insert_user_schema(
263        &mut self,
264        database_id: DatabaseId,
265        schema_name: &str,
266        owner_id: RoleId,
267        privileges: Vec<MzAclItem>,
268        temporary_oids: &HashSet<u32>,
269    ) -> Result<(SchemaId, u32), CatalogError> {
270        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
271        let id = SchemaId::User(id);
272        let oid = self.allocate_oid(temporary_oids)?;
273        self.insert_schema(
274            id,
275            Some(database_id),
276            schema_name.to_string(),
277            owner_id,
278            privileges,
279            oid,
280        )?;
281        Ok((id, oid))
282    }
283
284    pub fn insert_system_schema(
285        &mut self,
286        schema_id: u64,
287        schema_name: &str,
288        owner_id: RoleId,
289        privileges: Vec<MzAclItem>,
290        oid: u32,
291    ) -> Result<(), CatalogError> {
292        let id = SchemaId::System(schema_id);
293        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
294    }
295
296    pub(crate) fn insert_schema(
297        &mut self,
298        schema_id: SchemaId,
299        database_id: Option<DatabaseId>,
300        schema_name: String,
301        owner_id: RoleId,
302        privileges: Vec<MzAclItem>,
303        oid: u32,
304    ) -> Result<(), CatalogError> {
305        match self.schemas.insert(
306            SchemaKey { id: schema_id },
307            SchemaValue {
308                database_id,
309                name: schema_name.clone(),
310                owner_id,
311                privileges,
312                oid,
313            },
314            self.op_id,
315        ) {
316            Ok(_) => Ok(()),
317            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
318        }
319    }
320
321    pub fn insert_builtin_role(
322        &mut self,
323        id: RoleId,
324        name: String,
325        attributes: RoleAttributesRaw,
326        membership: RoleMembership,
327        vars: RoleVars,
328        oid: u32,
329    ) -> Result<RoleId, CatalogError> {
330        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
331        self.insert_role(id, name, attributes, membership, vars, oid)?;
332        Ok(id)
333    }
334
335    pub fn insert_user_role(
336        &mut self,
337        name: String,
338        attributes: RoleAttributesRaw,
339        membership: RoleMembership,
340        vars: RoleVars,
341        temporary_oids: &HashSet<u32>,
342    ) -> Result<(RoleId, u32), CatalogError> {
343        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
344        let id = RoleId::User(id);
345        let oid = self.allocate_oid(temporary_oids)?;
346        self.insert_role(id, name, attributes, membership, vars, oid)?;
347        Ok((id, oid))
348    }
349
350    fn insert_role(
351        &mut self,
352        id: RoleId,
353        name: String,
354        attributes: RoleAttributesRaw,
355        membership: RoleMembership,
356        vars: RoleVars,
357        oid: u32,
358    ) -> Result<(), CatalogError> {
359        if let Some(ref password) = attributes.password {
360            let hash = mz_auth::hash::scram256_hash(
361                password,
362                &attributes
363                    .scram_iterations
364                    .or_else(|| {
365                        soft_panic_or_log!(
366                            "Hash iterations must be set if a password is provided."
367                        );
368                        None
369                    })
370                    // This should never happen, but rather than panicking we'll
371                    // set a known secure value as a fallback.
372                    .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
373            )
374            .expect("password hash should be valid");
375            match self.role_auth.insert(
376                RoleAuthKey { role_id: id },
377                RoleAuthValue {
378                    password_hash: Some(hash),
379                    updated_at: SYSTEM_TIME(),
380                },
381                self.op_id,
382            ) {
383                Ok(_) => {}
384                Err(_) => {
385                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
386                }
387            }
388        }
389
390        match self.roles.insert(
391            RoleKey { id },
392            RoleValue {
393                name: name.clone(),
394                attributes: attributes.into(),
395                membership,
396                vars,
397                oid,
398            },
399            self.op_id,
400        ) {
401            Ok(_) => Ok(()),
402            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
403        }
404    }
405
406    /// Panics if any introspection source id is not a system id
407    pub fn insert_user_cluster(
408        &mut self,
409        cluster_id: ClusterId,
410        cluster_name: &str,
411        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
412        owner_id: RoleId,
413        privileges: Vec<MzAclItem>,
414        config: ClusterConfig,
415        temporary_oids: &HashSet<u32>,
416    ) -> Result<(), CatalogError> {
417        self.insert_cluster(
418            cluster_id,
419            cluster_name,
420            introspection_source_indexes,
421            owner_id,
422            privileges,
423            config,
424            temporary_oids,
425        )
426    }
427
428    /// Panics if any introspection source id is not a system id
429    pub fn insert_system_cluster(
430        &mut self,
431        cluster_name: &str,
432        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
433        privileges: Vec<MzAclItem>,
434        owner_id: RoleId,
435        config: ClusterConfig,
436        temporary_oids: &HashSet<u32>,
437    ) -> Result<ClusterId, CatalogError> {
438        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
439        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
440        self.insert_cluster(
441            cluster_id,
442            cluster_name,
443            introspection_source_indexes,
444            owner_id,
445            privileges,
446            config,
447            temporary_oids,
448        )?;
449        Ok(cluster_id)
450    }
451
452    fn insert_cluster(
453        &mut self,
454        cluster_id: ClusterId,
455        cluster_name: &str,
456        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457        owner_id: RoleId,
458        privileges: Vec<MzAclItem>,
459        config: ClusterConfig,
460        temporary_oids: &HashSet<u32>,
461    ) -> Result<(), CatalogError> {
462        if let Err(_) = self.clusters.insert(
463            ClusterKey { id: cluster_id },
464            ClusterValue {
465                name: cluster_name.to_string(),
466                owner_id,
467                privileges,
468                config,
469            },
470            self.op_id,
471        ) {
472            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473        };
474
475        let amount = usize_to_u64(introspection_source_indexes.len());
476        let oids = self.allocate_oids(amount, temporary_oids)?;
477        let introspection_source_indexes: Vec<_> = introspection_source_indexes
478            .into_iter()
479            .zip_eq(oids)
480            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481            .collect();
482        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483            let introspection_source_index = IntrospectionSourceIndex {
484                cluster_id,
485                name: builtin.name.to_string(),
486                item_id,
487                index_id,
488                oid,
489            };
490            let (key, value) = introspection_source_index.into_key_value();
491            self.introspection_sources
492                .insert(key, value, self.op_id)
493                .expect("no uniqueness violation");
494        }
495
496        Ok(())
497    }
498
499    pub fn rename_cluster(
500        &mut self,
501        cluster_id: ClusterId,
502        cluster_name: &str,
503        cluster_to_name: &str,
504    ) -> Result<(), CatalogError> {
505        let key = ClusterKey { id: cluster_id };
506
507        match self.clusters.update(
508            |k, v| {
509                if *k == key {
510                    let mut value = v.clone();
511                    value.name = cluster_to_name.to_string();
512                    Some(value)
513                } else {
514                    None
515                }
516            },
517            self.op_id,
518        )? {
519            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520            Diff::ONE => Ok(()),
521            n => panic!(
522                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523            ),
524        }
525    }
526
527    pub fn rename_cluster_replica(
528        &mut self,
529        replica_id: ReplicaId,
530        replica_name: &QualifiedReplica,
531        replica_to_name: &str,
532    ) -> Result<(), CatalogError> {
533        let key = ClusterReplicaKey { id: replica_id };
534
535        match self.cluster_replicas.update(
536            |k, v| {
537                if *k == key {
538                    let mut value = v.clone();
539                    value.name = replica_to_name.to_string();
540                    Some(value)
541                } else {
542                    None
543                }
544            },
545            self.op_id,
546        )? {
547            Diff::ZERO => {
548                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549            }
550            Diff::ONE => Ok(()),
551            n => panic!(
552                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553            ),
554        }
555    }
556
557    pub fn insert_cluster_replica(
558        &mut self,
559        cluster_id: ClusterId,
560        replica_name: &str,
561        config: ReplicaConfig,
562        owner_id: RoleId,
563    ) -> Result<ReplicaId, CatalogError> {
564        let replica_id = match cluster_id {
565            ClusterId::System(_) => self.allocate_system_replica_id()?,
566            ClusterId::User(_) => self.allocate_user_replica_id()?,
567        };
568        self.insert_cluster_replica_with_id(
569            cluster_id,
570            replica_id,
571            replica_name,
572            config,
573            owner_id,
574        )?;
575        Ok(replica_id)
576    }
577
578    pub(crate) fn insert_cluster_replica_with_id(
579        &mut self,
580        cluster_id: ClusterId,
581        replica_id: ReplicaId,
582        replica_name: &str,
583        config: ReplicaConfig,
584        owner_id: RoleId,
585    ) -> Result<(), CatalogError> {
586        if let Err(_) = self.cluster_replicas.insert(
587            ClusterReplicaKey { id: replica_id },
588            ClusterReplicaValue {
589                cluster_id,
590                name: replica_name.into(),
591                config,
592                owner_id,
593            },
594            self.op_id,
595        ) {
596            let cluster = self
597                .clusters
598                .get(&ClusterKey { id: cluster_id })
599                .expect("cluster exists");
600            return Err(SqlCatalogError::DuplicateReplica(
601                replica_name.to_string(),
602                cluster.name.to_string(),
603            )
604            .into());
605        };
606        Ok(())
607    }
608
609    pub fn insert_user_network_policy(
610        &mut self,
611        name: String,
612        rules: Vec<NetworkPolicyRule>,
613        privileges: Vec<MzAclItem>,
614        owner_id: RoleId,
615        temporary_oids: &HashSet<u32>,
616    ) -> Result<NetworkPolicyId, CatalogError> {
617        let oid = self.allocate_oid(temporary_oids)?;
618        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619        let id = NetworkPolicyId::User(id);
620        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621    }
622
623    pub fn insert_network_policy(
624        &mut self,
625        id: NetworkPolicyId,
626        name: String,
627        rules: Vec<NetworkPolicyRule>,
628        privileges: Vec<MzAclItem>,
629        owner_id: RoleId,
630        oid: u32,
631    ) -> Result<NetworkPolicyId, CatalogError> {
632        match self.network_policies.insert(
633            NetworkPolicyKey { id },
634            NetworkPolicyValue {
635                name: name.clone(),
636                rules,
637                privileges,
638                owner_id,
639                oid,
640            },
641            self.op_id,
642        ) {
643            Ok(_) => Ok(id),
644            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645        }
646    }
647
648    /// Updates persisted information about persisted introspection source
649    /// indexes.
650    ///
651    /// Panics if provided id is not a system id.
652    pub fn update_introspection_source_index_gids(
653        &mut self,
654        mappings: impl Iterator<
655            Item = (
656                ClusterId,
657                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658            ),
659        >,
660    ) -> Result<(), CatalogError> {
661        for (cluster_id, updates) in mappings {
662            for (name, item_id, index_id, oid) in updates {
663                let introspection_source_index = IntrospectionSourceIndex {
664                    cluster_id,
665                    name,
666                    item_id,
667                    index_id,
668                    oid,
669                };
670                let (key, value) = introspection_source_index.into_key_value();
671
672                let prev = self
673                    .introspection_sources
674                    .set(key, Some(value), self.op_id)?;
675                if prev.is_none() {
676                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677                        "{index_id}"
678                    ))
679                    .into());
680                }
681            }
682        }
683        Ok(())
684    }
685
686    pub fn insert_user_item(
687        &mut self,
688        id: CatalogItemId,
689        global_id: GlobalId,
690        schema_id: SchemaId,
691        item_name: &str,
692        create_sql: String,
693        owner_id: RoleId,
694        privileges: Vec<MzAclItem>,
695        temporary_oids: &HashSet<u32>,
696        versions: BTreeMap<RelationVersion, GlobalId>,
697    ) -> Result<u32, CatalogError> {
698        let oid = self.allocate_oid(temporary_oids)?;
699        self.insert_item(
700            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701        )?;
702        Ok(oid)
703    }
704
705    pub fn insert_item(
706        &mut self,
707        id: CatalogItemId,
708        oid: u32,
709        global_id: GlobalId,
710        schema_id: SchemaId,
711        item_name: &str,
712        create_sql: String,
713        owner_id: RoleId,
714        privileges: Vec<MzAclItem>,
715        extra_versions: BTreeMap<RelationVersion, GlobalId>,
716    ) -> Result<(), CatalogError> {
717        match self.items.insert(
718            ItemKey { id },
719            ItemValue {
720                schema_id,
721                name: item_name.to_string(),
722                create_sql,
723                owner_id,
724                privileges,
725                oid,
726                global_id,
727                extra_versions,
728            },
729            self.op_id,
730        ) {
731            Ok(_) => Ok(()),
732            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733        }
734    }
735
736    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738    }
739
740    pub fn get_and_increment_id_by(
741        &mut self,
742        key: String,
743        amount: u64,
744    ) -> Result<Vec<u64>, CatalogError> {
745        assert!(
746            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747            "system item IDs cannot be allocated outside of bootstrap"
748        );
749
750        let current_id = self
751            .id_allocator
752            .items()
753            .get(&IdAllocKey { name: key.clone() })
754            .unwrap_or_else(|| panic!("{key} id allocator missing"))
755            .next_id;
756        let next_id = current_id
757            .checked_add(amount)
758            .ok_or(SqlCatalogError::IdExhaustion)?;
759        let prev = self.id_allocator.set(
760            IdAllocKey { name: key },
761            Some(IdAllocValue { next_id }),
762            self.op_id,
763        )?;
764        assert_eq!(
765            prev,
766            Some(IdAllocValue {
767                next_id: current_id
768            })
769        );
770        Ok((current_id..next_id).collect())
771    }
772
773    pub fn allocate_system_item_ids(
774        &mut self,
775        amount: u64,
776    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777        assert!(
778            !self.durable_catalog.is_bootstrap_complete(),
779            "we can only allocate system item IDs during bootstrap"
780        );
781        Ok(self
782            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783            .into_iter()
784            // TODO(alter_table): Use separate ID allocators.
785            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786            .collect())
787    }
788
789    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
790    /// from the `cluster_id` and `log_variant`.
791    ///
792    /// Introspection source indexes are a special edge case of items. They are considered system
793    /// items, but they are the only system item that can be created by the user at any time. All
794    /// other system items can only be created by the system during the startup of an upgrade.
795    ///
796    /// Furthermore, all other system item IDs are allocated deterministically in the same order
797    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
798    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
799    /// only one of them can successfully write the IDs down to the catalog. This removes the need
800    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
801    ///
802    /// Since introspection IDs can be allocated at any time, read-only instances would either need
803    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
804    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
805    ///
806    /// Introspection source index IDs are 64 bit integers, with the following format (not to
807    /// scale):
808    ///
809    /// -------------------------------------------------------------
810    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
811    /// |--------------------|------------------------|-------------|
812    /// |       8-bits       |         48-bits        |   8-bits    |
813    /// -------------------------------------------------------------
814    ///
815    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
816    ///                          to.
817    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
818    ///                          belongs to.
819    /// Log Variant:             A unique number indicating the log variant this index is on.
820    pub fn allocate_introspection_source_index_id(
821        cluster_id: &ClusterId,
822        log_variant: LogVariant,
823    ) -> (CatalogItemId, GlobalId) {
824        let cluster_variant: u8 = match cluster_id {
825            ClusterId::System(_) => 1,
826            ClusterId::User(_) => 2,
827        };
828        let cluster_id: u64 = cluster_id.inner_id();
829        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830        assert_eq!(
831            CLUSTER_ID_MASK & cluster_id,
832            0,
833            "invalid cluster ID: {cluster_id}"
834        );
835        let log_variant: u8 = match log_variant {
836            LogVariant::Timely(TimelyLog::Operates) => 1,
837            LogVariant::Timely(TimelyLog::Channels) => 2,
838            LogVariant::Timely(TimelyLog::Elapsed) => 3,
839            LogVariant::Timely(TimelyLog::Histogram) => 4,
840            LogVariant::Timely(TimelyLog::Addresses) => 5,
841            LogVariant::Timely(TimelyLog::Parks) => 6,
842            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844            LogVariant::Timely(TimelyLog::Reachability) => 9,
845            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849            LogVariant::Differential(DifferentialLog::Sharing) => 14,
850            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
863            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
864            LogVariant::Compute(ComputeLog::LirMapping) => 30,
865            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
866            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
867            LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
868        };
869
870        let mut id: u64 = u64::from(cluster_variant) << 56;
871        id |= cluster_id << 8;
872        id |= u64::from(log_variant);
873
874        (
875            CatalogItemId::IntrospectionSourceIndex(id),
876            GlobalId::IntrospectionSourceIndex(id),
877        )
878    }
879
880    pub fn allocate_user_item_ids(
881        &mut self,
882        amount: u64,
883    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
884        Ok(self
885            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
886            .into_iter()
887            // TODO(alter_table): Use separate ID allocators.
888            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
889            .collect())
890    }
891
892    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
893        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
894        Ok(ReplicaId::User(id))
895    }
896
897    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
898        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
899        Ok(ReplicaId::System(id))
900    }
901
902    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
903        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
904    }
905
906    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
907    /// object.
908    #[mz_ore::instrument]
909    fn allocate_oids(
910        &mut self,
911        amount: u64,
912        temporary_oids: &HashSet<u32>,
913    ) -> Result<Vec<u32>, CatalogError> {
914        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
915        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
916        struct UserOid(u32);
917
918        impl UserOid {
919            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
920                if oid < FIRST_USER_OID {
921                    Err(anyhow!("invalid user OID {oid}"))
922                } else {
923                    Ok(UserOid(oid))
924                }
925            }
926        }
927
928        impl std::ops::AddAssign<u32> for UserOid {
929            fn add_assign(&mut self, rhs: u32) {
930                let (res, overflow) = self.0.overflowing_add(rhs);
931                self.0 = if overflow { FIRST_USER_OID + res } else { res };
932            }
933        }
934
935        if amount > u32::MAX.into() {
936            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
937        }
938
939        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
940        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
941        // However, benchmarking shows that this doesn't make a noticeable difference and the other
942        // approach requires making sure that allocator always stays in-sync which can be
943        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
944        let mut allocated_oids = HashSet::with_capacity(
945            self.databases.len()
946                + self.schemas.len()
947                + self.roles.len()
948                + self.items.len()
949                + self.introspection_sources.len()
950                + temporary_oids.len(),
951        );
952        self.databases.for_values(|_, value| {
953            allocated_oids.insert(value.oid);
954        });
955        self.schemas.for_values(|_, value| {
956            allocated_oids.insert(value.oid);
957        });
958        self.roles.for_values(|_, value| {
959            allocated_oids.insert(value.oid);
960        });
961        self.items.for_values(|_, value| {
962            allocated_oids.insert(value.oid);
963        });
964        self.introspection_sources.for_values(|_, value| {
965            allocated_oids.insert(value.oid);
966        });
967
968        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
969
970        let start_oid: u32 = self
971            .id_allocator
972            .items()
973            .get(&IdAllocKey {
974                name: OID_ALLOC_KEY.to_string(),
975            })
976            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
977            .next_id
978            .try_into()
979            .expect("we should never persist an oid outside of the u32 range");
980        let mut current_oid = UserOid::new(start_oid)
981            .expect("we should never persist an oid outside of user OID range");
982        let mut oids = Vec::new();
983        while oids.len() < u64_to_usize(amount) {
984            if !is_allocated(current_oid.0) {
985                oids.push(current_oid.0);
986            }
987            current_oid += 1;
988
989            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
990                // We've exhausted all possible OIDs and still don't have `amount`.
991                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
992            }
993        }
994
995        let next_id = current_oid.0;
996        let prev = self.id_allocator.set(
997            IdAllocKey {
998                name: OID_ALLOC_KEY.to_string(),
999            },
1000            Some(IdAllocValue {
1001                next_id: next_id.into(),
1002            }),
1003            self.op_id,
1004        )?;
1005        assert_eq!(
1006            prev,
1007            Some(IdAllocValue {
1008                next_id: start_oid.into(),
1009            })
1010        );
1011
1012        Ok(oids)
1013    }
1014
1015    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1016    /// object.
1017    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1018        self.allocate_oids(1, temporary_oids)
1019            .map(|oids| oids.into_element())
1020    }
1021
1022    /// Exports the current state of this transaction as a [`Snapshot`].
1023    ///
1024    /// This merges each `TableTransaction`'s initial data with its pending
1025    /// changes to produce the current view, then converts back to proto types.
1026    /// Used to persist transaction state between incremental DDL dry runs so
1027    /// the next dry run's fresh `Transaction` starts in sync with the
1028    /// accumulated `CatalogState`.
1029    pub fn current_snapshot(&self) -> Snapshot {
1030        Snapshot {
1031            databases: self.databases.current_items_proto(),
1032            schemas: self.schemas.current_items_proto(),
1033            roles: self.roles.current_items_proto(),
1034            role_auth: self.role_auth.current_items_proto(),
1035            items: self.items.current_items_proto(),
1036            comments: self.comments.current_items_proto(),
1037            clusters: self.clusters.current_items_proto(),
1038            network_policies: self.network_policies.current_items_proto(),
1039            cluster_replicas: self.cluster_replicas.current_items_proto(),
1040            introspection_sources: self.introspection_sources.current_items_proto(),
1041            id_allocator: self.id_allocator.current_items_proto(),
1042            configs: self.configs.current_items_proto(),
1043            settings: self.settings.current_items_proto(),
1044            system_object_mappings: self.system_gid_mapping.current_items_proto(),
1045            system_configurations: self.system_configurations.current_items_proto(),
1046            default_privileges: self.default_privileges.current_items_proto(),
1047            source_references: self.source_references.current_items_proto(),
1048            system_privileges: self.system_privileges.current_items_proto(),
1049            storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1050            unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1051            txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1052        }
1053    }
1054
1055    pub(crate) fn insert_id_allocator(
1056        &mut self,
1057        name: String,
1058        next_id: u64,
1059    ) -> Result<(), CatalogError> {
1060        match self.id_allocator.insert(
1061            IdAllocKey { name: name.clone() },
1062            IdAllocValue { next_id },
1063            self.op_id,
1064        ) {
1065            Ok(_) => Ok(()),
1066            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1067        }
1068    }
1069
1070    /// Removes the database `id` from the transaction.
1071    ///
1072    /// Returns an error if `id` is not found.
1073    ///
1074    /// Runtime is linear with respect to the total number of databases in the catalog.
1075    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1076    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1077        let prev = self
1078            .databases
1079            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1080        if prev.is_some() {
1081            Ok(())
1082        } else {
1083            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1084        }
1085    }
1086
1087    /// Removes all databases in `databases` from the transaction.
1088    ///
1089    /// Returns an error if any id in `databases` is not found.
1090    ///
1091    /// NOTE: On error, there still may be some databases removed from the transaction. It
1092    /// is up to the caller to either abort the transaction or commit.
1093    pub fn remove_databases(
1094        &mut self,
1095        databases: &BTreeSet<DatabaseId>,
1096    ) -> Result<(), CatalogError> {
1097        if databases.is_empty() {
1098            return Ok(());
1099        }
1100
1101        let to_remove = databases
1102            .iter()
1103            .map(|id| (DatabaseKey { id: *id }, None))
1104            .collect();
1105        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1106        prev.retain(|_k, val| val.is_none());
1107
1108        if !prev.is_empty() {
1109            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1110            return Err(SqlCatalogError::UnknownDatabase(err).into());
1111        }
1112
1113        Ok(())
1114    }
1115
1116    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1117    ///
1118    /// Returns an error if `(database_id, schema_id)` is not found.
1119    ///
1120    /// Runtime is linear with respect to the total number of schemas in the catalog.
1121    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1122    pub fn remove_schema(
1123        &mut self,
1124        database_id: &Option<DatabaseId>,
1125        schema_id: &SchemaId,
1126    ) -> Result<(), CatalogError> {
1127        let prev = self
1128            .schemas
1129            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1130        if prev.is_some() {
1131            Ok(())
1132        } else {
1133            let database_name = match database_id {
1134                Some(id) => format!("{id}."),
1135                None => "".to_string(),
1136            };
1137            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1138        }
1139    }
1140
1141    /// Removes all schemas in `schemas` from the transaction.
1142    ///
1143    /// Returns an error if any id in `schemas` is not found.
1144    ///
1145    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1146    /// is up to the caller to either abort the transaction or commit.
1147    pub fn remove_schemas(
1148        &mut self,
1149        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1150    ) -> Result<(), CatalogError> {
1151        if schemas.is_empty() {
1152            return Ok(());
1153        }
1154
1155        let to_remove = schemas
1156            .iter()
1157            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1158            .collect();
1159        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1160        prev.retain(|_k, v| v.is_none());
1161
1162        if !prev.is_empty() {
1163            let err = prev
1164                .keys()
1165                .map(|k| {
1166                    let db_spec = schemas.get(&k.id).expect("should_exist");
1167                    let db_name = match db_spec {
1168                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1169                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1170                    };
1171                    format!("{}.{}", db_name, k.id)
1172                })
1173                .join(", ");
1174
1175            return Err(SqlCatalogError::UnknownSchema(err).into());
1176        }
1177
1178        Ok(())
1179    }
1180
1181    pub fn remove_source_references(
1182        &mut self,
1183        source_id: CatalogItemId,
1184    ) -> Result<(), CatalogError> {
1185        let deleted = self
1186            .source_references
1187            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1188            .is_some();
1189        if deleted {
1190            Ok(())
1191        } else {
1192            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1193        }
1194    }
1195
1196    /// Removes all user roles in `roles` from the transaction.
1197    ///
1198    /// Returns an error if any id in `roles` is not found.
1199    ///
1200    /// NOTE: On error, there still may be some roles removed from the transaction. It
1201    /// is up to the caller to either abort the transaction or commit.
1202    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1203        assert!(
1204            roles.iter().all(|id| id.is_user()),
1205            "cannot delete non-user roles"
1206        );
1207        self.remove_roles(roles)
1208    }
1209
1210    /// Removes all roles in `roles` from the transaction.
1211    ///
1212    /// Returns an error if any id in `roles` is not found.
1213    ///
1214    /// NOTE: On error, there still may be some roles removed from the transaction. It
1215    /// is up to the caller to either abort the transaction or commit.
1216    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1217        if roles.is_empty() {
1218            return Ok(());
1219        }
1220
1221        let to_remove_keys = roles
1222            .iter()
1223            .map(|role_id| RoleKey { id: *role_id })
1224            .collect::<Vec<_>>();
1225
1226        let to_remove_roles = to_remove_keys
1227            .iter()
1228            .map(|role_key| (role_key.clone(), None))
1229            .collect();
1230
1231        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1232
1233        let to_remove_role_auth = to_remove_keys
1234            .iter()
1235            .map(|role_key| {
1236                (
1237                    RoleAuthKey {
1238                        role_id: role_key.id,
1239                    },
1240                    None,
1241                )
1242            })
1243            .collect();
1244
1245        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1246
1247        prev.retain(|_k, v| v.is_none());
1248        if !prev.is_empty() {
1249            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1250            return Err(SqlCatalogError::UnknownRole(err).into());
1251        }
1252
1253        role_auth_prev.retain(|_k, v| v.is_none());
1254        // The reason we don't to the same check as above is that the role auth table
1255        // is not required to have all roles in the role table.
1256
1257        Ok(())
1258    }
1259
1260    /// Removes all cluster in `clusters` from the transaction.
1261    ///
1262    /// Returns an error if any id in `clusters` is not found.
1263    ///
1264    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1265    /// the caller to either abort the transaction or commit.
1266    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1267        if clusters.is_empty() {
1268            return Ok(());
1269        }
1270
1271        let to_remove = clusters
1272            .iter()
1273            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1274            .collect();
1275        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1276
1277        prev.retain(|_k, v| v.is_none());
1278        if !prev.is_empty() {
1279            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1280            return Err(SqlCatalogError::UnknownCluster(err).into());
1281        }
1282
1283        // Cascade delete introspection sources and cluster replicas.
1284        //
1285        // TODO(benesch): this doesn't seem right. Cascade deletions should
1286        // be entirely the domain of the higher catalog layer, not the
1287        // storage layer.
1288        self.cluster_replicas
1289            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1290        self.introspection_sources
1291            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1292
1293        Ok(())
1294    }
1295
1296    /// Removes the cluster replica `id` from the transaction.
1297    ///
1298    /// Returns an error if `id` is not found.
1299    ///
1300    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1301    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1302    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1303        let deleted = self
1304            .cluster_replicas
1305            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1306            .is_some();
1307        if deleted {
1308            Ok(())
1309        } else {
1310            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1311        }
1312    }
1313
1314    /// Removes all cluster replicas in `replicas` from the transaction.
1315    ///
1316    /// Returns an error if any id in `replicas` is not found.
1317    ///
1318    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1319    /// is up to the caller to either abort the transaction or commit.
1320    pub fn remove_cluster_replicas(
1321        &mut self,
1322        replicas: &BTreeSet<ReplicaId>,
1323    ) -> Result<(), CatalogError> {
1324        if replicas.is_empty() {
1325            return Ok(());
1326        }
1327
1328        let to_remove = replicas
1329            .iter()
1330            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1331            .collect();
1332        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1333
1334        prev.retain(|_k, v| v.is_none());
1335        if !prev.is_empty() {
1336            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1337            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1338        }
1339
1340        Ok(())
1341    }
1342
1343    /// Removes item `id` from the transaction.
1344    ///
1345    /// Returns an error if `id` is not found.
1346    ///
1347    /// Runtime is linear with respect to the total number of items in the catalog.
1348    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1349    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1350        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1351        if prev.is_some() {
1352            Ok(())
1353        } else {
1354            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1355        }
1356    }
1357
1358    /// Removes all items in `ids` from the transaction.
1359    ///
1360    /// Returns an error if any id in `ids` is not found.
1361    ///
1362    /// NOTE: On error, there still may be some items removed from the transaction. It is
1363    /// up to the caller to either abort the transaction or commit.
1364    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1365        if ids.is_empty() {
1366            return Ok(());
1367        }
1368
1369        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1370        let n = self.items.delete_by_keys(ks, self.op_id).len();
1371        if n == ids.len() {
1372            Ok(())
1373        } else {
1374            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1375            let mut unknown = ids.difference(&item_ids);
1376            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1377        }
1378    }
1379
1380    /// Removes all system object mappings in `descriptions` from the transaction.
1381    ///
1382    /// Returns an error if any description in `descriptions` is not found.
1383    ///
1384    /// NOTE: On error, there still may be some items removed from the transaction. It is
1385    /// up to the caller to either abort the transaction or commit.
1386    pub fn remove_system_object_mappings(
1387        &mut self,
1388        descriptions: BTreeSet<SystemObjectDescription>,
1389    ) -> Result<(), CatalogError> {
1390        if descriptions.is_empty() {
1391            return Ok(());
1392        }
1393
1394        let ks: Vec<_> = descriptions
1395            .clone()
1396            .into_iter()
1397            .map(|desc| GidMappingKey {
1398                schema_name: desc.schema_name,
1399                object_type: desc.object_type,
1400                object_name: desc.object_name,
1401            })
1402            .collect();
1403        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1404
1405        if n == descriptions.len() {
1406            Ok(())
1407        } else {
1408            let item_descriptions = self
1409                .system_gid_mapping
1410                .items()
1411                .keys()
1412                .map(|k| SystemObjectDescription {
1413                    schema_name: k.schema_name.clone(),
1414                    object_type: k.object_type.clone(),
1415                    object_name: k.object_name.clone(),
1416                })
1417                .collect();
1418            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1419                format!(
1420                    "{} {}.{}",
1421                    desc.object_type, desc.schema_name, desc.object_name
1422                )
1423            });
1424            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1425        }
1426    }
1427
1428    /// Removes all introspection source indexes in `indexes` from the transaction.
1429    ///
1430    /// Returns an error if any index in `indexes` is not found.
1431    ///
1432    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1433    /// up to the caller to either abort the transaction or commit.
1434    pub fn remove_introspection_source_indexes(
1435        &mut self,
1436        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1437    ) -> Result<(), CatalogError> {
1438        if introspection_source_indexes.is_empty() {
1439            return Ok(());
1440        }
1441
1442        let ks: Vec<_> = introspection_source_indexes
1443            .clone()
1444            .into_iter()
1445            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1446            .collect();
1447        let n = self
1448            .introspection_sources
1449            .delete_by_keys(ks, self.op_id)
1450            .len();
1451        if n == introspection_source_indexes.len() {
1452            Ok(())
1453        } else {
1454            let txn_indexes = self
1455                .introspection_sources
1456                .items()
1457                .keys()
1458                .map(|k| (k.cluster_id, k.name.clone()))
1459                .collect();
1460            let mut unknown = introspection_source_indexes
1461                .difference(&txn_indexes)
1462                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1463            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1464        }
1465    }
1466
1467    /// Updates item `id` in the transaction to `item_name` and `item`.
1468    ///
1469    /// Returns an error if `id` is not found.
1470    ///
1471    /// Runtime is linear with respect to the total number of items in the catalog.
1472    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1473    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1474        let updated =
1475            self.items
1476                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1477        if updated {
1478            Ok(())
1479        } else {
1480            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1481        }
1482    }
1483
1484    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1485    /// corresponding value in `items`.
1486    ///
1487    /// Returns an error if any id in `items` is not found.
1488    ///
1489    /// NOTE: On error, there still may be some items updated in the transaction. It is
1490    /// up to the caller to either abort the transaction or commit.
1491    pub fn update_items(
1492        &mut self,
1493        items: BTreeMap<CatalogItemId, Item>,
1494    ) -> Result<(), CatalogError> {
1495        if items.is_empty() {
1496            return Ok(());
1497        }
1498
1499        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1500        let kvs: Vec<_> = items
1501            .clone()
1502            .into_iter()
1503            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1504            .collect();
1505        let n = self.items.update_by_keys(kvs, self.op_id)?;
1506        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1507        if n == update_ids.len() {
1508            Ok(())
1509        } else {
1510            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1511            let mut unknown = update_ids.difference(&item_ids);
1512            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1513        }
1514    }
1515
1516    /// Updates role `id` in the transaction to `role`.
1517    ///
1518    /// Returns an error if `id` is not found.
1519    ///
1520    /// Runtime is linear with respect to the total number of items in the catalog.
1521    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1522    /// You should model it after [`Self::update_items`].
1523    pub fn update_role(
1524        &mut self,
1525        id: RoleId,
1526        role: Role,
1527        password: PasswordAction,
1528    ) -> Result<(), CatalogError> {
1529        let key = RoleKey { id };
1530        if self.roles.get(&key).is_some() {
1531            let auth_key = RoleAuthKey { role_id: id };
1532
1533            match password {
1534                PasswordAction::Set(new_password) => {
1535                    let hash = mz_auth::hash::scram256_hash(
1536                        &new_password.password,
1537                        &new_password.scram_iterations,
1538                    )
1539                    .expect("password hash should be valid");
1540                    let value = RoleAuthValue {
1541                        password_hash: Some(hash),
1542                        updated_at: SYSTEM_TIME(),
1543                    };
1544
1545                    if self.role_auth.get(&auth_key).is_some() {
1546                        self.role_auth
1547                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1548                    } else {
1549                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1550                    }
1551                }
1552                PasswordAction::Clear => {
1553                    let value = RoleAuthValue {
1554                        password_hash: None,
1555                        updated_at: SYSTEM_TIME(),
1556                    };
1557                    if self.role_auth.get(&auth_key).is_some() {
1558                        self.role_auth
1559                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1560                    }
1561                }
1562                PasswordAction::NoChange => {}
1563            }
1564
1565            self.roles
1566                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1567
1568            Ok(())
1569        } else {
1570            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1571        }
1572    }
1573
1574    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1575    /// corresponding value in `roles`.
1576    ///
1577    /// This function does *not* write role_authentication information to the catalog.
1578    /// It is purely for updating the role itself.
1579    ///
1580    /// Returns an error if any id in `roles` is not found.
1581    ///
1582    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1583    /// up to the caller to either abort the transaction or commit.
1584    pub fn update_roles_without_auth(
1585        &mut self,
1586        roles: BTreeMap<RoleId, Role>,
1587    ) -> Result<(), CatalogError> {
1588        if roles.is_empty() {
1589            return Ok(());
1590        }
1591
1592        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1593        let kvs: Vec<_> = roles
1594            .into_iter()
1595            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1596            .collect();
1597        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1598        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1599
1600        if n == update_role_ids.len() {
1601            Ok(())
1602        } else {
1603            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1604            let mut unknown = update_role_ids.difference(&role_ids);
1605            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1606        }
1607    }
1608
1609    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1610    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1611    ///
1612    /// Panics if provided id is not a system id.
1613    pub fn update_system_object_mappings(
1614        &mut self,
1615        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1616    ) -> Result<(), CatalogError> {
1617        if mappings.is_empty() {
1618            return Ok(());
1619        }
1620
1621        let n = self.system_gid_mapping.update(
1622            |_k, v| {
1623                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1624                    let (_, new_value) = mapping.clone().into_key_value();
1625                    Some(new_value)
1626                } else {
1627                    None
1628                }
1629            },
1630            self.op_id,
1631        )?;
1632
1633        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1634            != mappings.len()
1635        {
1636            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1637            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1638        }
1639
1640        Ok(())
1641    }
1642
1643    /// Updates cluster `id` in the transaction to `cluster`.
1644    ///
1645    /// Returns an error if `id` is not found.
1646    ///
1647    /// Runtime is linear with respect to the total number of clusters in the catalog.
1648    /// DO NOT call this function in a loop.
1649    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1650        let updated = self.clusters.update_by_key(
1651            ClusterKey { id },
1652            cluster.into_key_value().1,
1653            self.op_id,
1654        )?;
1655        if updated {
1656            Ok(())
1657        } else {
1658            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1659        }
1660    }
1661
1662    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1663    ///
1664    /// Returns an error if `replica_id` is not found.
1665    ///
1666    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1667    /// DO NOT call this function in a loop.
1668    pub fn update_cluster_replica(
1669        &mut self,
1670        replica_id: ReplicaId,
1671        replica: ClusterReplica,
1672    ) -> Result<(), CatalogError> {
1673        let updated = self.cluster_replicas.update_by_key(
1674            ClusterReplicaKey { id: replica_id },
1675            replica.into_key_value().1,
1676            self.op_id,
1677        )?;
1678        if updated {
1679            Ok(())
1680        } else {
1681            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1682        }
1683    }
1684
1685    /// Updates database `id` in the transaction to `database`.
1686    ///
1687    /// Returns an error if `id` is not found.
1688    ///
1689    /// Runtime is linear with respect to the total number of databases in the catalog.
1690    /// DO NOT call this function in a loop.
1691    pub fn update_database(
1692        &mut self,
1693        id: DatabaseId,
1694        database: Database,
1695    ) -> Result<(), CatalogError> {
1696        let updated = self.databases.update_by_key(
1697            DatabaseKey { id },
1698            database.into_key_value().1,
1699            self.op_id,
1700        )?;
1701        if updated {
1702            Ok(())
1703        } else {
1704            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1705        }
1706    }
1707
1708    /// Updates schema `schema_id` in the transaction to `schema`.
1709    ///
1710    /// Returns an error if `schema_id` is not found.
1711    ///
1712    /// Runtime is linear with respect to the total number of schemas in the catalog.
1713    /// DO NOT call this function in a loop.
1714    pub fn update_schema(
1715        &mut self,
1716        schema_id: SchemaId,
1717        schema: Schema,
1718    ) -> Result<(), CatalogError> {
1719        let updated = self.schemas.update_by_key(
1720            SchemaKey { id: schema_id },
1721            schema.into_key_value().1,
1722            self.op_id,
1723        )?;
1724        if updated {
1725            Ok(())
1726        } else {
1727            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1728        }
1729    }
1730
1731    /// Updates `network_policy_id` in the transaction to `network policy`.
1732    ///
1733    /// Returns an error if `id` is not found.
1734    ///
1735    /// Runtime is linear with respect to the total number of databases in the catalog.
1736    /// DO NOT call this function in a loop.
1737    pub fn update_network_policy(
1738        &mut self,
1739        id: NetworkPolicyId,
1740        network_policy: NetworkPolicy,
1741    ) -> Result<(), CatalogError> {
1742        let updated = self.network_policies.update_by_key(
1743            NetworkPolicyKey { id },
1744            network_policy.into_key_value().1,
1745            self.op_id,
1746        )?;
1747        if updated {
1748            Ok(())
1749        } else {
1750            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1751        }
1752    }
1753    /// Removes all network policies in `network policies` from the transaction.
1754    ///
1755    /// Returns an error if any id in `network policy` is not found.
1756    ///
1757    /// NOTE: On error, there still may be some roles removed from the transaction. It
1758    /// is up to the caller to either abort the transaction or commit.
1759    pub fn remove_network_policies(
1760        &mut self,
1761        network_policies: &BTreeSet<NetworkPolicyId>,
1762    ) -> Result<(), CatalogError> {
1763        if network_policies.is_empty() {
1764            return Ok(());
1765        }
1766
1767        let to_remove = network_policies
1768            .iter()
1769            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1770            .collect();
1771        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1772        assert!(
1773            prev.iter().all(|(k, _)| k.id.is_user()),
1774            "cannot delete non-user network policy"
1775        );
1776
1777        prev.retain(|_k, v| v.is_none());
1778        if !prev.is_empty() {
1779            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1780            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1781        }
1782
1783        Ok(())
1784    }
1785    /// Set persisted default privilege.
1786    ///
1787    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1788    pub fn set_default_privilege(
1789        &mut self,
1790        role_id: RoleId,
1791        database_id: Option<DatabaseId>,
1792        schema_id: Option<SchemaId>,
1793        object_type: ObjectType,
1794        grantee: RoleId,
1795        privileges: Option<AclMode>,
1796    ) -> Result<(), CatalogError> {
1797        self.default_privileges.set(
1798            DefaultPrivilegesKey {
1799                role_id,
1800                database_id,
1801                schema_id,
1802                object_type,
1803                grantee,
1804            },
1805            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1806            self.op_id,
1807        )?;
1808        Ok(())
1809    }
1810
1811    /// Set persisted default privileges.
1812    pub fn set_default_privileges(
1813        &mut self,
1814        default_privileges: Vec<DefaultPrivilege>,
1815    ) -> Result<(), CatalogError> {
1816        if default_privileges.is_empty() {
1817            return Ok(());
1818        }
1819
1820        let default_privileges = default_privileges
1821            .into_iter()
1822            .map(DurableType::into_key_value)
1823            .map(|(k, v)| (k, Some(v)))
1824            .collect();
1825        self.default_privileges
1826            .set_many(default_privileges, self.op_id)?;
1827        Ok(())
1828    }
1829
1830    /// Set persisted system privilege.
1831    ///
1832    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1833    pub fn set_system_privilege(
1834        &mut self,
1835        grantee: RoleId,
1836        grantor: RoleId,
1837        acl_mode: Option<AclMode>,
1838    ) -> Result<(), CatalogError> {
1839        self.system_privileges.set(
1840            SystemPrivilegesKey { grantee, grantor },
1841            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1842            self.op_id,
1843        )?;
1844        Ok(())
1845    }
1846
1847    /// Set persisted system privileges.
1848    pub fn set_system_privileges(
1849        &mut self,
1850        system_privileges: Vec<MzAclItem>,
1851    ) -> Result<(), CatalogError> {
1852        if system_privileges.is_empty() {
1853            return Ok(());
1854        }
1855
1856        let system_privileges = system_privileges
1857            .into_iter()
1858            .map(DurableType::into_key_value)
1859            .map(|(k, v)| (k, Some(v)))
1860            .collect();
1861        self.system_privileges
1862            .set_many(system_privileges, self.op_id)?;
1863        Ok(())
1864    }
1865
1866    /// Set persisted setting.
1867    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1868        self.settings.set(
1869            SettingKey { name },
1870            value.map(|value| SettingValue { value }),
1871            self.op_id,
1872        )?;
1873        Ok(())
1874    }
1875
1876    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1877        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1878    }
1879
1880    /// Insert persisted introspection source index.
1881    pub fn insert_introspection_source_indexes(
1882        &mut self,
1883        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1884        temporary_oids: &HashSet<u32>,
1885    ) -> Result<(), CatalogError> {
1886        if introspection_source_indexes.is_empty() {
1887            return Ok(());
1888        }
1889
1890        let amount = usize_to_u64(introspection_source_indexes.len());
1891        let oids = self.allocate_oids(amount, temporary_oids)?;
1892        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1893            .into_iter()
1894            .zip_eq(oids)
1895            .map(
1896                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1897                    cluster_id,
1898                    name,
1899                    item_id,
1900                    index_id,
1901                    oid,
1902                },
1903            )
1904            .collect();
1905
1906        for introspection_source_index in introspection_source_indexes {
1907            let (key, value) = introspection_source_index.into_key_value();
1908            self.introspection_sources.insert(key, value, self.op_id)?;
1909        }
1910
1911        Ok(())
1912    }
1913
1914    /// Set persisted system object mappings.
1915    pub fn set_system_object_mappings(
1916        &mut self,
1917        mappings: Vec<SystemObjectMapping>,
1918    ) -> Result<(), CatalogError> {
1919        if mappings.is_empty() {
1920            return Ok(());
1921        }
1922
1923        let mappings = mappings
1924            .into_iter()
1925            .map(DurableType::into_key_value)
1926            .map(|(k, v)| (k, Some(v)))
1927            .collect();
1928        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1929        Ok(())
1930    }
1931
1932    /// Set persisted replica.
1933    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1934        if replicas.is_empty() {
1935            return Ok(());
1936        }
1937
1938        let replicas = replicas
1939            .into_iter()
1940            .map(DurableType::into_key_value)
1941            .map(|(k, v)| (k, Some(v)))
1942            .collect();
1943        self.cluster_replicas.set_many(replicas, self.op_id)?;
1944        Ok(())
1945    }
1946
1947    /// Set persisted configuration.
1948    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1949        match value {
1950            Some(value) => {
1951                let config = Config { key, value };
1952                let (key, value) = config.into_key_value();
1953                self.configs.set(key, Some(value), self.op_id)?;
1954            }
1955            None => {
1956                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1957            }
1958        }
1959        Ok(())
1960    }
1961
1962    /// Get the value of a persisted config.
1963    pub fn get_config(&self, key: String) -> Option<u64> {
1964        self.configs
1965            .get(&ConfigKey { key })
1966            .map(|entry| entry.value)
1967    }
1968
1969    /// Get the value of a persisted setting.
1970    pub fn get_setting(&self, name: String) -> Option<&str> {
1971        self.settings
1972            .get(&SettingKey { name })
1973            .map(|entry| &*entry.value)
1974    }
1975
1976    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1977        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1978            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1979    }
1980
1981    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1982        self.set_setting(
1983            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1984            Some(shard_id.to_string()),
1985        )
1986    }
1987
1988    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1989        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1990            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1991    }
1992
1993    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1994        self.set_setting(
1995            EXPRESSION_CACHE_SHARD_KEY.to_string(),
1996            Some(shard_id.to_string()),
1997        )
1998    }
1999
2000    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
2001    /// match the `with_0dt_deployment_max_wait` "system var" value.
2002    ///
2003    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2004    /// but use it in boot before Launch Darkly is available.
2005    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2006        self.set_config(
2007            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2008            Some(
2009                value
2010                    .as_millis()
2011                    .try_into()
2012                    .expect("max wait fits into u64"),
2013            ),
2014        )
2015    }
2016
2017    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
2018    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2019    /// value.
2020    ///
2021    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2022    /// but use it in boot before Launch Darkly is available.
2023    pub fn set_0dt_deployment_ddl_check_interval(
2024        &mut self,
2025        value: Duration,
2026    ) -> Result<(), CatalogError> {
2027        self.set_config(
2028            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2029            Some(
2030                value
2031                    .as_millis()
2032                    .try_into()
2033                    .expect("ddl check interval fits into u64"),
2034            ),
2035        )
2036    }
2037
2038    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
2039    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
2040    ///
2041    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2042    /// but use it in boot before Launch Darkly is available.
2043    pub fn set_enable_0dt_deployment_panic_after_timeout(
2044        &mut self,
2045        value: bool,
2046    ) -> Result<(), CatalogError> {
2047        self.set_config(
2048            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2049            Some(u64::from(value)),
2050        )
2051    }
2052
2053    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2054    /// match the `with_0dt_deployment_max_wait` "system var" value.
2055    ///
2056    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2057    /// but use it in boot before LaunchDarkly is available.
2058    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2059        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2060    }
2061
2062    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2063    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2064    /// value.
2065    ///
2066    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2067    /// use it in boot before LaunchDarkly is available.
2068    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2069        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2070    }
2071
2072    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2073    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2074    /// var" value.
2075    ///
2076    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2077    /// use it in boot before LaunchDarkly is available.
2078    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2079        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2080    }
2081
2082    /// Updates the catalog `system_config_synced` "config" value to true.
2083    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2084        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2085    }
2086
2087    pub fn update_comment(
2088        &mut self,
2089        object_id: CommentObjectId,
2090        sub_component: Option<usize>,
2091        comment: Option<String>,
2092    ) -> Result<(), CatalogError> {
2093        let key = CommentKey {
2094            object_id,
2095            sub_component,
2096        };
2097        let value = comment.map(|c| CommentValue { comment: c });
2098        self.comments.set(key, value, self.op_id)?;
2099
2100        Ok(())
2101    }
2102
2103    pub fn drop_comments(
2104        &mut self,
2105        object_ids: &BTreeSet<CommentObjectId>,
2106    ) -> Result<(), CatalogError> {
2107        if object_ids.is_empty() {
2108            return Ok(());
2109        }
2110
2111        self.comments
2112            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2113        Ok(())
2114    }
2115
2116    pub fn update_source_references(
2117        &mut self,
2118        source_id: CatalogItemId,
2119        references: Vec<SourceReference>,
2120        updated_at: u64,
2121    ) -> Result<(), CatalogError> {
2122        let key = SourceReferencesKey { source_id };
2123        let value = SourceReferencesValue {
2124            references,
2125            updated_at,
2126        };
2127        self.source_references.set(key, Some(value), self.op_id)?;
2128        Ok(())
2129    }
2130
2131    /// Upserts persisted system configuration `name` to `value`.
2132    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2133        let key = ServerConfigurationKey {
2134            name: name.to_string(),
2135        };
2136        let value = ServerConfigurationValue { value };
2137        self.system_configurations
2138            .set(key, Some(value), self.op_id)?;
2139        Ok(())
2140    }
2141
2142    /// Removes persisted system configuration `name`.
2143    pub fn remove_system_config(&mut self, name: &str) {
2144        let key = ServerConfigurationKey {
2145            name: name.to_string(),
2146        };
2147        self.system_configurations
2148            .set(key, None, self.op_id)
2149            .expect("cannot have uniqueness violation");
2150    }
2151
2152    /// Removes all persisted system configurations.
2153    pub fn clear_system_configs(&mut self) {
2154        self.system_configurations.delete(|_k, _v| true, self.op_id);
2155    }
2156
2157    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2158        match self.configs.insert(
2159            ConfigKey { key: key.clone() },
2160            ConfigValue { value },
2161            self.op_id,
2162        ) {
2163            Ok(_) => Ok(()),
2164            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2165        }
2166    }
2167
2168    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2169        self.clusters
2170            .items()
2171            .into_iter()
2172            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2173    }
2174
2175    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2176        self.cluster_replicas
2177            .items()
2178            .into_iter()
2179            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2180    }
2181
2182    pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2183        self.databases
2184            .items()
2185            .into_iter()
2186            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2187    }
2188
2189    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2190        self.roles
2191            .items()
2192            .into_iter()
2193            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2194    }
2195
2196    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2197        self.network_policies
2198            .items()
2199            .into_iter()
2200            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2201    }
2202
2203    pub fn get_system_object_mappings(
2204        &self,
2205    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2206        self.system_gid_mapping
2207            .items()
2208            .into_iter()
2209            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2210    }
2211
2212    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2213        self.schemas
2214            .items()
2215            .into_iter()
2216            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2217    }
2218
2219    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2220        self.system_configurations
2221            .items()
2222            .into_iter()
2223            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2224    }
2225
2226    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2227        let key = SchemaKey { id: *id };
2228        self.schemas
2229            .get(&key)
2230            .map(|v| DurableType::from_key_value(key, v.clone()))
2231    }
2232
2233    pub fn get_introspection_source_indexes(
2234        &self,
2235        cluster_id: ClusterId,
2236    ) -> BTreeMap<&str, (GlobalId, u32)> {
2237        self.introspection_sources
2238            .items()
2239            .into_iter()
2240            .filter(|(k, _v)| k.cluster_id == cluster_id)
2241            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2242            .collect()
2243    }
2244
2245    pub fn get_catalog_content_version(&self) -> Option<&str> {
2246        self.settings
2247            .get(&SettingKey {
2248                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2249            })
2250            .map(|value| &*value.value)
2251    }
2252
2253    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2254        self.settings
2255            .get(&SettingKey {
2256                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2257            })
2258            .map(|value| value.value.clone())
2259    }
2260
2261    /// Commit the current operation within the transaction. This does not cause anything to be
2262    /// written durably, but signals to the current transaction that we are moving on to the next
2263    /// operation.
2264    ///
2265    /// Returns the updates of the committed operation.
2266    #[must_use]
2267    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2268        let updates = self.get_op_updates();
2269        self.commit_op();
2270        updates
2271    }
2272
2273    fn get_op_updates(&self) -> Vec<StateUpdate> {
2274        fn get_collection_op_updates<'a, T>(
2275            table_txn: &'a TableTransaction<T::Key, T::Value>,
2276            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2277            op: Timestamp,
2278        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2279        where
2280            T::Key: Ord + Eq + Clone + Debug,
2281            T::Value: Ord + Clone + Debug,
2282            T: DurableType,
2283        {
2284            table_txn
2285                .pending
2286                .iter()
2287                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2288                .filter_map(move |(k, v)| {
2289                    if v.ts == op {
2290                        let key = k.clone();
2291                        let value = v.value.clone();
2292                        let diff = v.diff.clone().try_into().expect("invalid diff");
2293                        let update = DurableType::from_key_value(key, value);
2294                        let kind = kind_fn(update);
2295                        Some((kind, diff))
2296                    } else {
2297                        None
2298                    }
2299                })
2300        }
2301
2302        fn get_large_collection_op_updates<'a, T>(
2303            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2304            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2305            op: Timestamp,
2306        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2307        where
2308            T::Key: Ord + Eq + Clone + Debug,
2309            T: DurableType<Value = ()>,
2310        {
2311            collection.iter().filter_map(move |(k, diff, ts)| {
2312                if *ts == op {
2313                    let key = k.clone();
2314                    let diff = diff.clone().try_into().expect("invalid diff");
2315                    let update = DurableType::from_key_value(key, ());
2316                    let kind = kind_fn(update);
2317                    Some((kind, diff))
2318                } else {
2319                    None
2320                }
2321            })
2322        }
2323
2324        let Transaction {
2325            durable_catalog: _,
2326            databases,
2327            schemas,
2328            items,
2329            comments,
2330            roles,
2331            role_auth,
2332            clusters,
2333            network_policies,
2334            cluster_replicas,
2335            introspection_sources,
2336            system_gid_mapping,
2337            system_configurations,
2338            default_privileges,
2339            source_references,
2340            system_privileges,
2341            audit_log_updates,
2342            storage_collection_metadata,
2343            unfinalized_shards,
2344            // Not representable as a `StateUpdate`.
2345            id_allocator: _,
2346            configs: _,
2347            settings: _,
2348            txn_wal_shard: _,
2349            upper,
2350            op_id: _,
2351        } = &self;
2352
2353        let updates = std::iter::empty()
2354            .chain(get_collection_op_updates(
2355                roles,
2356                StateUpdateKind::Role,
2357                self.op_id,
2358            ))
2359            .chain(get_collection_op_updates(
2360                role_auth,
2361                StateUpdateKind::RoleAuth,
2362                self.op_id,
2363            ))
2364            .chain(get_collection_op_updates(
2365                databases,
2366                StateUpdateKind::Database,
2367                self.op_id,
2368            ))
2369            .chain(get_collection_op_updates(
2370                schemas,
2371                StateUpdateKind::Schema,
2372                self.op_id,
2373            ))
2374            .chain(get_collection_op_updates(
2375                default_privileges,
2376                StateUpdateKind::DefaultPrivilege,
2377                self.op_id,
2378            ))
2379            .chain(get_collection_op_updates(
2380                system_privileges,
2381                StateUpdateKind::SystemPrivilege,
2382                self.op_id,
2383            ))
2384            .chain(get_collection_op_updates(
2385                system_configurations,
2386                StateUpdateKind::SystemConfiguration,
2387                self.op_id,
2388            ))
2389            .chain(get_collection_op_updates(
2390                clusters,
2391                StateUpdateKind::Cluster,
2392                self.op_id,
2393            ))
2394            .chain(get_collection_op_updates(
2395                network_policies,
2396                StateUpdateKind::NetworkPolicy,
2397                self.op_id,
2398            ))
2399            .chain(get_collection_op_updates(
2400                introspection_sources,
2401                StateUpdateKind::IntrospectionSourceIndex,
2402                self.op_id,
2403            ))
2404            .chain(get_collection_op_updates(
2405                cluster_replicas,
2406                StateUpdateKind::ClusterReplica,
2407                self.op_id,
2408            ))
2409            .chain(get_collection_op_updates(
2410                system_gid_mapping,
2411                StateUpdateKind::SystemObjectMapping,
2412                self.op_id,
2413            ))
2414            .chain(get_collection_op_updates(
2415                items,
2416                StateUpdateKind::Item,
2417                self.op_id,
2418            ))
2419            .chain(get_collection_op_updates(
2420                comments,
2421                StateUpdateKind::Comment,
2422                self.op_id,
2423            ))
2424            .chain(get_collection_op_updates(
2425                source_references,
2426                StateUpdateKind::SourceReferences,
2427                self.op_id,
2428            ))
2429            .chain(get_collection_op_updates(
2430                storage_collection_metadata,
2431                StateUpdateKind::StorageCollectionMetadata,
2432                self.op_id,
2433            ))
2434            .chain(get_collection_op_updates(
2435                unfinalized_shards,
2436                StateUpdateKind::UnfinalizedShard,
2437                self.op_id,
2438            ))
2439            .chain(get_large_collection_op_updates(
2440                audit_log_updates,
2441                StateUpdateKind::AuditLog,
2442                self.op_id,
2443            ))
2444            .map(|(kind, diff)| StateUpdate {
2445                kind,
2446                ts: upper.clone(),
2447                diff,
2448            })
2449            .collect();
2450
2451        updates
2452    }
2453
2454    pub fn is_savepoint(&self) -> bool {
2455        self.durable_catalog.is_savepoint()
2456    }
2457
2458    fn commit_op(&mut self) {
2459        self.op_id += 1;
2460    }
2461
2462    pub fn op_id(&self) -> Timestamp {
2463        self.op_id
2464    }
2465
2466    pub fn upper(&self) -> mz_repr::Timestamp {
2467        self.upper
2468    }
2469
2470    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2471        let audit_log_updates = self
2472            .audit_log_updates
2473            .into_iter()
2474            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2475            .collect();
2476
2477        let txn_batch = TransactionBatch {
2478            databases: self.databases.pending(),
2479            schemas: self.schemas.pending(),
2480            items: self.items.pending(),
2481            comments: self.comments.pending(),
2482            roles: self.roles.pending(),
2483            role_auth: self.role_auth.pending(),
2484            clusters: self.clusters.pending(),
2485            cluster_replicas: self.cluster_replicas.pending(),
2486            network_policies: self.network_policies.pending(),
2487            introspection_sources: self.introspection_sources.pending(),
2488            id_allocator: self.id_allocator.pending(),
2489            configs: self.configs.pending(),
2490            source_references: self.source_references.pending(),
2491            settings: self.settings.pending(),
2492            system_gid_mapping: self.system_gid_mapping.pending(),
2493            system_configurations: self.system_configurations.pending(),
2494            default_privileges: self.default_privileges.pending(),
2495            system_privileges: self.system_privileges.pending(),
2496            storage_collection_metadata: self.storage_collection_metadata.pending(),
2497            unfinalized_shards: self.unfinalized_shards.pending(),
2498            txn_wal_shard: self.txn_wal_shard.pending(),
2499            audit_log_updates,
2500            upper: self.upper,
2501        };
2502        (txn_batch, self.durable_catalog)
2503    }
2504
2505    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2506    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2507    /// before proceeding. In general, this must be fatal to the calling process. We do not
2508    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2509    ///
2510    /// The transaction is committed at `commit_ts`.
2511    ///
2512    /// Returns what the upper was directly after the transaction committed.
2513    ///
2514    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2515    /// catalog is not writeable.
2516    #[mz_ore::instrument(level = "debug")]
2517    pub(crate) async fn commit_internal(
2518        self,
2519        commit_ts: mz_repr::Timestamp,
2520    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2521        let (mut txn_batch, durable_catalog) = self.into_parts();
2522        let TransactionBatch {
2523            databases,
2524            schemas,
2525            items,
2526            comments,
2527            roles,
2528            role_auth,
2529            clusters,
2530            cluster_replicas,
2531            network_policies,
2532            introspection_sources,
2533            id_allocator,
2534            configs,
2535            source_references,
2536            settings,
2537            system_gid_mapping,
2538            system_configurations,
2539            default_privileges,
2540            system_privileges,
2541            storage_collection_metadata,
2542            unfinalized_shards,
2543            txn_wal_shard,
2544            audit_log_updates,
2545            upper: _,
2546        } = &mut txn_batch;
2547        // Consolidate in memory because it will likely be faster than consolidating after the
2548        // transaction has been made durable.
2549        differential_dataflow::consolidation::consolidate_updates(databases);
2550        differential_dataflow::consolidation::consolidate_updates(schemas);
2551        differential_dataflow::consolidation::consolidate_updates(items);
2552        differential_dataflow::consolidation::consolidate_updates(comments);
2553        differential_dataflow::consolidation::consolidate_updates(roles);
2554        differential_dataflow::consolidation::consolidate_updates(role_auth);
2555        differential_dataflow::consolidation::consolidate_updates(clusters);
2556        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2557        differential_dataflow::consolidation::consolidate_updates(network_policies);
2558        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2559        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2560        differential_dataflow::consolidation::consolidate_updates(configs);
2561        differential_dataflow::consolidation::consolidate_updates(settings);
2562        differential_dataflow::consolidation::consolidate_updates(source_references);
2563        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2564        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2565        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2566        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2567        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2568        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2569        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2570        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2571
2572        let upper = durable_catalog
2573            .commit_transaction(txn_batch, commit_ts)
2574            .await?;
2575        Ok((durable_catalog, upper))
2576    }
2577
2578    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2579    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2580    /// before proceeding. In general, this must be fatal to the calling process. We do not
2581    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2582    ///
2583    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2584    /// catalog is not writeable.
2585    ///
2586    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2587    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2588    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2589    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2590    ///
2591    /// An alternative implementation would be for the caller to explicitly consume their updates
2592    /// after committing and only then apply the updates in-memory. While this removes assumptions
2593    /// about the caller in this method, in practice it results in duplicate work on every commit.
2594    #[mz_ore::instrument(level = "debug")]
2595    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2596        let op_updates = self.get_op_updates();
2597        assert!(
2598            op_updates.is_empty(),
2599            "unconsumed transaction updates: {op_updates:?}"
2600        );
2601
2602        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2603        // Drain all the updates from the commit since it is assumed that they were already applied.
2604        let updates = durable_storage.sync_updates(upper).await?;
2605        // Writable and savepoint catalogs should have consumed all updates before committing a
2606        // transaction, otherwise the commit was performed with an out of date state.
2607        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2608        // updates before committing.
2609        soft_assert_no_log!(
2610            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2611            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2612        );
2613        Ok(())
2614    }
2615}
2616
2617use crate::durable::async_trait;
2618
2619use super::objects::{RoleAuthKey, RoleAuthValue};
2620
2621#[async_trait]
2622impl StorageTxn for Transaction<'_> {
2623    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2624        self.storage_collection_metadata
2625            .items()
2626            .into_iter()
2627            .map(
2628                |(
2629                    StorageCollectionMetadataKey { id },
2630                    StorageCollectionMetadataValue { shard },
2631                )| { (*id, shard.clone()) },
2632            )
2633            .collect()
2634    }
2635
2636    fn insert_collection_metadata(
2637        &mut self,
2638        metadata: BTreeMap<GlobalId, ShardId>,
2639    ) -> Result<(), StorageError> {
2640        for (id, shard) in metadata {
2641            self.storage_collection_metadata
2642                .insert(
2643                    StorageCollectionMetadataKey { id },
2644                    StorageCollectionMetadataValue {
2645                        shard: shard.clone(),
2646                    },
2647                    self.op_id,
2648                )
2649                .map_err(|err| match err {
2650                    DurableCatalogError::DuplicateKey => {
2651                        StorageError::CollectionMetadataAlreadyExists(id)
2652                    }
2653                    DurableCatalogError::UniquenessViolation => {
2654                        StorageError::PersistShardAlreadyInUse(shard)
2655                    }
2656                    err => StorageError::Generic(anyhow::anyhow!(err)),
2657                })?;
2658        }
2659        Ok(())
2660    }
2661
2662    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2663        let ks: Vec<_> = ids
2664            .into_iter()
2665            .map(|id| StorageCollectionMetadataKey { id })
2666            .collect();
2667        self.storage_collection_metadata
2668            .delete_by_keys(ks, self.op_id)
2669            .into_iter()
2670            .map(
2671                |(
2672                    StorageCollectionMetadataKey { id },
2673                    StorageCollectionMetadataValue { shard },
2674                )| (id, shard),
2675            )
2676            .collect()
2677    }
2678
2679    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2680        self.unfinalized_shards
2681            .items()
2682            .into_iter()
2683            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2684            .collect()
2685    }
2686
2687    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError> {
2688        for shard in s {
2689            match self
2690                .unfinalized_shards
2691                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2692            {
2693                // Inserting duplicate keys has no effect.
2694                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2695                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2696            };
2697        }
2698        Ok(())
2699    }
2700
2701    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2702        let ks: Vec<_> = shards
2703            .into_iter()
2704            .map(|shard| UnfinalizedShardKey { shard })
2705            .collect();
2706        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2707    }
2708
2709    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2710        self.txn_wal_shard
2711            .values()
2712            .iter()
2713            .next()
2714            .map(|TxnWalShardValue { shard }| *shard)
2715    }
2716
2717    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError> {
2718        self.txn_wal_shard
2719            .insert((), TxnWalShardValue { shard }, self.op_id)
2720            .map_err(|err| match err {
2721                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2722                err => StorageError::Generic(anyhow::anyhow!(err)),
2723            })
2724    }
2725}
2726
2727/// Describes a set of changes to apply as the result of a catalog transaction.
2728#[derive(Debug, Clone, Default, PartialEq)]
2729pub struct TransactionBatch {
2730    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2731    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2732    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2733    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2734    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2735    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2736    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2737    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2738    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2739    pub(crate) introspection_sources: Vec<(
2740        proto::ClusterIntrospectionSourceIndexKey,
2741        proto::ClusterIntrospectionSourceIndexValue,
2742        Diff,
2743    )>,
2744    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2745    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2746    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2747    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2748    pub(crate) system_configurations: Vec<(
2749        proto::ServerConfigurationKey,
2750        proto::ServerConfigurationValue,
2751        Diff,
2752    )>,
2753    pub(crate) default_privileges: Vec<(
2754        proto::DefaultPrivilegesKey,
2755        proto::DefaultPrivilegesValue,
2756        Diff,
2757    )>,
2758    pub(crate) source_references: Vec<(
2759        proto::SourceReferencesKey,
2760        proto::SourceReferencesValue,
2761        Diff,
2762    )>,
2763    pub(crate) system_privileges: Vec<(
2764        proto::SystemPrivilegesKey,
2765        proto::SystemPrivilegesValue,
2766        Diff,
2767    )>,
2768    pub(crate) storage_collection_metadata: Vec<(
2769        proto::StorageCollectionMetadataKey,
2770        proto::StorageCollectionMetadataValue,
2771        Diff,
2772    )>,
2773    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2774    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2775    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2776    /// The upper of the catalog when the transaction started.
2777    pub(crate) upper: mz_repr::Timestamp,
2778}
2779
2780impl TransactionBatch {
2781    pub fn is_empty(&self) -> bool {
2782        let TransactionBatch {
2783            databases,
2784            schemas,
2785            items,
2786            comments,
2787            roles,
2788            role_auth,
2789            clusters,
2790            cluster_replicas,
2791            network_policies,
2792            introspection_sources,
2793            id_allocator,
2794            configs,
2795            settings,
2796            source_references,
2797            system_gid_mapping,
2798            system_configurations,
2799            default_privileges,
2800            system_privileges,
2801            storage_collection_metadata,
2802            unfinalized_shards,
2803            txn_wal_shard,
2804            audit_log_updates,
2805            upper: _,
2806        } = self;
2807        databases.is_empty()
2808            && schemas.is_empty()
2809            && items.is_empty()
2810            && comments.is_empty()
2811            && roles.is_empty()
2812            && role_auth.is_empty()
2813            && clusters.is_empty()
2814            && cluster_replicas.is_empty()
2815            && network_policies.is_empty()
2816            && introspection_sources.is_empty()
2817            && id_allocator.is_empty()
2818            && configs.is_empty()
2819            && settings.is_empty()
2820            && source_references.is_empty()
2821            && system_gid_mapping.is_empty()
2822            && system_configurations.is_empty()
2823            && default_privileges.is_empty()
2824            && system_privileges.is_empty()
2825            && storage_collection_metadata.is_empty()
2826            && unfinalized_shards.is_empty()
2827            && txn_wal_shard.is_empty()
2828            && audit_log_updates.is_empty()
2829    }
2830}
2831
2832#[derive(Debug, Clone, PartialEq, Eq)]
2833struct TransactionUpdate<V> {
2834    value: V,
2835    ts: Timestamp,
2836    diff: Diff,
2837}
2838
2839/// Utility trait to check for plan validity.
2840trait UniqueName {
2841    /// Does the item have a unique name? If yes, we can check for name equality in validity
2842    /// checking.
2843    const HAS_UNIQUE_NAME: bool;
2844    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2845    fn unique_name(&self) -> &str;
2846}
2847
2848mod unique_name {
2849    use crate::durable::objects::*;
2850
2851    macro_rules! impl_unique_name {
2852        ($($t:ty),* $(,)?) => {
2853            $(
2854                impl crate::durable::transaction::UniqueName for $t {
2855                    const HAS_UNIQUE_NAME: bool = true;
2856                    fn unique_name(&self) -> &str {
2857                        &self.name
2858                    }
2859                }
2860            )*
2861        };
2862    }
2863
2864    macro_rules! impl_no_unique_name {
2865        ($($t:ty),* $(,)?) => {
2866            $(
2867                impl crate::durable::transaction::UniqueName for $t {
2868                    const HAS_UNIQUE_NAME: bool = false;
2869                    fn unique_name(&self) -> &str {
2870                       ""
2871                    }
2872                }
2873            )*
2874        };
2875    }
2876
2877    impl_unique_name! {
2878        ClusterReplicaValue,
2879        ClusterValue,
2880        DatabaseValue,
2881        ItemValue,
2882        NetworkPolicyValue,
2883        RoleValue,
2884        SchemaValue,
2885    }
2886
2887    impl_no_unique_name!(
2888        (),
2889        ClusterIntrospectionSourceIndexValue,
2890        CommentValue,
2891        ConfigValue,
2892        DefaultPrivilegesValue,
2893        GidMappingValue,
2894        IdAllocValue,
2895        ServerConfigurationValue,
2896        SettingValue,
2897        SourceReferencesValue,
2898        StorageCollectionMetadataValue,
2899        SystemPrivilegesValue,
2900        TxnWalShardValue,
2901        RoleAuthValue,
2902    );
2903
2904    #[cfg(test)]
2905    mod test {
2906        impl_no_unique_name!(String,);
2907    }
2908}
2909
2910/// TableTransaction emulates some features of a typical SQL transaction over
2911/// table for a Collection.
2912///
2913/// It supports:
2914/// - uniqueness constraints
2915/// - transactional reads and writes (including read-your-writes before commit)
2916///
2917/// `K` is the primary key type. Multiple entries with the same key are disallowed.
2918/// `V` is the an arbitrary value type.
2919#[derive(Debug)]
2920struct TableTransaction<K, V> {
2921    initial: BTreeMap<K, V>,
2922    // The desired updates to keys after commit.
2923    // Invariant: Value is sorted by `ts`.
2924    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2925    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2926}
2927
2928impl<K, V> TableTransaction<K, V>
2929where
2930    K: Ord + Eq + Clone + Debug,
2931    V: Ord + Clone + Debug + UniqueName,
2932{
2933    /// Create a new TableTransaction with initial data.
2934    ///
2935    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
2936    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
2937    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
2938    /// over.
2939    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2940    where
2941        K: RustType<KP>,
2942        V: RustType<VP>,
2943    {
2944        let initial = initial
2945            .into_iter()
2946            .map(RustType::from_proto)
2947            .collect::<Result<_, _>>()?;
2948
2949        Ok(Self {
2950            initial,
2951            pending: BTreeMap::new(),
2952            uniqueness_violation: None,
2953        })
2954    }
2955
2956    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
2957    /// that determines whether there is a uniqueness violation among two values.
2958    fn new_with_uniqueness_fn<KP, VP>(
2959        initial: BTreeMap<KP, VP>,
2960        uniqueness_violation: fn(a: &V, b: &V) -> bool,
2961    ) -> Result<Self, TryFromProtoError>
2962    where
2963        K: RustType<KP>,
2964        V: RustType<VP>,
2965    {
2966        let initial = initial
2967            .into_iter()
2968            .map(RustType::from_proto)
2969            .collect::<Result<_, _>>()?;
2970
2971        Ok(Self {
2972            initial,
2973            pending: BTreeMap::new(),
2974            uniqueness_violation: Some(uniqueness_violation),
2975        })
2976    }
2977
2978    /// Consumes and returns the pending changes and their diffs. `Diff` is
2979    /// guaranteed to be 1 or -1.
2980    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2981    where
2982        K: RustType<KP>,
2983        V: RustType<VP>,
2984    {
2985        soft_assert_no_log!(self.verify().is_ok());
2986        // Pending describes the desired final state for some keys. K,V pairs should be
2987        // retracted if they already exist and were deleted or are being updated.
2988        self.pending
2989            .into_iter()
2990            .flat_map(|(k, v)| {
2991                let mut v: Vec<_> = v
2992                    .into_iter()
2993                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2994                    .collect();
2995                differential_dataflow::consolidation::consolidate(&mut v);
2996                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2997            })
2998            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2999            .collect()
3000    }
3001
3002    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
3003    ///
3004    /// Runtime is O(n^2), where n is the number of items in `self`, if
3005    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
3006    fn verify(&self) -> Result<(), DurableCatalogError> {
3007        if let Some(uniqueness_violation) = self.uniqueness_violation {
3008            // Compare each value to each other value and ensure they are unique.
3009            let items = self.values();
3010            if V::HAS_UNIQUE_NAME {
3011                let by_name: BTreeMap<_, _> = items
3012                    .iter()
3013                    .enumerate()
3014                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3015                    .collect();
3016                for (i, vi) in items.iter().enumerate() {
3017                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3018                        if i != *j && uniqueness_violation(vi, *vj) {
3019                            return Err(DurableCatalogError::UniquenessViolation);
3020                        }
3021                    }
3022                }
3023            } else {
3024                for (i, vi) in items.iter().enumerate() {
3025                    for (j, vj) in items.iter().enumerate() {
3026                        if i != j && uniqueness_violation(vi, vj) {
3027                            return Err(DurableCatalogError::UniquenessViolation);
3028                        }
3029                    }
3030                }
3031            }
3032        }
3033        soft_assert_no_log!(
3034            self.pending
3035                .values()
3036                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3037            "pending should be sorted by timestamp: {:?}",
3038            self.pending
3039        );
3040        Ok(())
3041    }
3042
3043    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3044    ///
3045    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3046    /// items in `keys`.
3047    fn verify_keys<'a>(
3048        &self,
3049        keys: impl IntoIterator<Item = &'a K>,
3050    ) -> Result<(), DurableCatalogError>
3051    where
3052        K: 'a,
3053    {
3054        if let Some(uniqueness_violation) = self.uniqueness_violation {
3055            let entries: Vec<_> = keys
3056                .into_iter()
3057                .filter_map(|key| self.get(key).map(|value| (key, value)))
3058                .collect();
3059            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3060            for (ki, vi) in self.items() {
3061                for (kj, vj) in &entries {
3062                    if ki != *kj && uniqueness_violation(vi, vj) {
3063                        return Err(DurableCatalogError::UniquenessViolation);
3064                    }
3065                }
3066            }
3067        }
3068        soft_assert_no_log!(self.verify().is_ok());
3069        Ok(())
3070    }
3071
3072    /// Iterates over the items viewable in the current transaction in arbitrary
3073    /// order and applies `f` on all key, value pairs.
3074    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3075        let mut seen = BTreeSet::new();
3076        for k in self.pending.keys() {
3077            seen.insert(k);
3078            let v = self.get(k);
3079            // Deleted items don't exist so shouldn't be visited, but still suppress
3080            // visiting the key later.
3081            if let Some(v) = v {
3082                f(k, v);
3083            }
3084        }
3085        for (k, v) in self.initial.iter() {
3086            // Add on initial items that don't have updates.
3087            if !seen.contains(k) {
3088                f(k, v);
3089            }
3090        }
3091    }
3092
3093    /// Returns the current value of `k`.
3094    fn get(&self, k: &K) -> Option<&V> {
3095        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3096        let mut updates = Vec::with_capacity(pending.len() + 1);
3097        if let Some(initial) = self.initial.get(k) {
3098            updates.push((initial, Diff::ONE));
3099        }
3100        updates.extend(
3101            pending
3102                .into_iter()
3103                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3104        );
3105
3106        differential_dataflow::consolidation::consolidate(&mut updates);
3107        assert!(updates.len() <= 1);
3108        updates.into_iter().next().map(|(v, _)| v)
3109    }
3110
3111    /// Returns the items viewable in the current transaction. The items are
3112    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3113    /// [`Self::for_values`].
3114    // Used by tests.
3115    #[cfg(test)]
3116    fn items_cloned(&self) -> BTreeMap<K, V> {
3117        let mut items = BTreeMap::new();
3118        self.for_values(|k, v| {
3119            items.insert(k.clone(), v.clone());
3120        });
3121        items
3122    }
3123
3124    /// Returns the current items as proto-typed key-value pairs, suitable for
3125    /// constructing a [`Snapshot`]. This merges `initial` and `pending` to
3126    /// produce the current view and converts back to proto types.
3127    fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3128    where
3129        K: RustType<KP>,
3130        V: RustType<VP>,
3131        KP: Ord,
3132    {
3133        let mut items = BTreeMap::new();
3134        self.for_values(|k, v| {
3135            items.insert(k.into_proto(), v.into_proto());
3136        });
3137        items
3138    }
3139
3140    /// Returns the items viewable in the current transaction as references. Returns a map
3141    /// of references.
3142    fn items(&self) -> BTreeMap<&K, &V> {
3143        let mut items = BTreeMap::new();
3144        self.for_values(|k, v| {
3145            items.insert(k, v);
3146        });
3147        items
3148    }
3149
3150    /// Returns the values viewable in the current transaction as references.
3151    fn values(&self) -> BTreeSet<&V> {
3152        let mut items = BTreeSet::new();
3153        self.for_values(|_, v| {
3154            items.insert(v);
3155        });
3156        items
3157    }
3158
3159    /// Returns the number of items viewable in the current transaction.
3160    fn len(&self) -> usize {
3161        let mut count = 0;
3162        self.for_values(|_, _| {
3163            count += 1;
3164        });
3165        count
3166    }
3167
3168    /// Iterates over the items viewable in the current transaction, and provides a
3169    /// map where additional pending items can be inserted, which will be appended
3170    /// to current pending items. Does not verify uniqueness.
3171    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3172        &mut self,
3173        mut f: F,
3174    ) {
3175        let mut pending = BTreeMap::new();
3176        self.for_values(|k, v| f(&mut pending, k, v));
3177        for (k, updates) in pending {
3178            self.pending.entry(k).or_default().extend(updates);
3179        }
3180    }
3181
3182    /// Inserts a new k,v pair.
3183    ///
3184    /// Returns an error if the uniqueness check failed or the key already exists.
3185    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3186        let mut violation = None;
3187        self.for_values(|for_k, for_v| {
3188            if &k == for_k {
3189                violation = Some(DurableCatalogError::DuplicateKey);
3190            }
3191            if let Some(uniqueness_violation) = self.uniqueness_violation {
3192                if uniqueness_violation(for_v, &v) {
3193                    violation = Some(DurableCatalogError::UniquenessViolation);
3194                }
3195            }
3196        });
3197        if let Some(violation) = violation {
3198            return Err(violation);
3199        }
3200        self.pending.entry(k).or_default().push(TransactionUpdate {
3201            value: v,
3202            ts,
3203            diff: Diff::ONE,
3204        });
3205        soft_assert_no_log!(self.verify().is_ok());
3206        Ok(())
3207    }
3208
3209    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3210    /// value should be updated, otherwise `None`. Returns the number of changed
3211    /// entries.
3212    ///
3213    /// Returns an error if the uniqueness check failed.
3214    ///
3215    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3216    /// better performance.
3217    fn update<F: Fn(&K, &V) -> Option<V>>(
3218        &mut self,
3219        f: F,
3220        ts: Timestamp,
3221    ) -> Result<Diff, DurableCatalogError> {
3222        let mut changed = Diff::ZERO;
3223        let mut keys = BTreeSet::new();
3224        // Keep a copy of pending in case of uniqueness violation.
3225        let pending = self.pending.clone();
3226        self.for_values_mut(|p, k, v| {
3227            if let Some(next) = f(k, v) {
3228                changed += Diff::ONE;
3229                keys.insert(k.clone());
3230                let updates = p.entry(k.clone()).or_default();
3231                updates.push(TransactionUpdate {
3232                    value: v.clone(),
3233                    ts,
3234                    diff: Diff::MINUS_ONE,
3235                });
3236                updates.push(TransactionUpdate {
3237                    value: next,
3238                    ts,
3239                    diff: Diff::ONE,
3240                });
3241            }
3242        });
3243        // Check for uniqueness violation.
3244        if let Err(err) = self.verify_keys(&keys) {
3245            self.pending = pending;
3246            Err(err)
3247        } else {
3248            Ok(changed)
3249        }
3250    }
3251
3252    /// Updates `k`, `v` pair if `k` already exists in `self`.
3253    ///
3254    /// Returns `true` if `k` was updated, `false` otherwise.
3255    /// Returns an error if the uniqueness check failed.
3256    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3257        if let Some(cur_v) = self.get(&k) {
3258            if v != *cur_v {
3259                self.set(k, Some(v), ts)?;
3260            }
3261            Ok(true)
3262        } else {
3263            Ok(false)
3264        }
3265    }
3266
3267    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3268    ///
3269    /// Returns the number of changed entries.
3270    /// Returns an error if the uniqueness check failed.
3271    fn update_by_keys(
3272        &mut self,
3273        kvs: impl IntoIterator<Item = (K, V)>,
3274        ts: Timestamp,
3275    ) -> Result<Diff, DurableCatalogError> {
3276        let kvs: Vec<_> = kvs
3277            .into_iter()
3278            .filter_map(|(k, v)| match self.get(&k) {
3279                // Record if updating this entry would be a no-op.
3280                Some(cur_v) => Some((*cur_v == v, k, v)),
3281                None => None,
3282            })
3283            .collect();
3284        let changed = kvs.len();
3285        let changed =
3286            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3287        let kvs = kvs
3288            .into_iter()
3289            // Filter out no-ops to save some work.
3290            .filter(|(no_op, _, _)| !no_op)
3291            .map(|(_, k, v)| (k, Some(v)))
3292            .collect();
3293        self.set_many(kvs, ts)?;
3294        Ok(changed)
3295    }
3296
3297    /// Set the value for a key. Returns the previous entry if the key existed,
3298    /// otherwise None.
3299    ///
3300    /// Returns an error if the uniqueness check failed.
3301    ///
3302    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3303    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3304        let prev = self.get(&k).cloned();
3305        let entry = self.pending.entry(k.clone()).or_default();
3306        let restore_len = entry.len();
3307
3308        match (v, prev.clone()) {
3309            (Some(v), Some(prev)) => {
3310                entry.push(TransactionUpdate {
3311                    value: prev,
3312                    ts,
3313                    diff: Diff::MINUS_ONE,
3314                });
3315                entry.push(TransactionUpdate {
3316                    value: v,
3317                    ts,
3318                    diff: Diff::ONE,
3319                });
3320            }
3321            (Some(v), None) => {
3322                entry.push(TransactionUpdate {
3323                    value: v,
3324                    ts,
3325                    diff: Diff::ONE,
3326                });
3327            }
3328            (None, Some(prev)) => {
3329                entry.push(TransactionUpdate {
3330                    value: prev,
3331                    ts,
3332                    diff: Diff::MINUS_ONE,
3333                });
3334            }
3335            (None, None) => {}
3336        }
3337
3338        // Check for uniqueness violation.
3339        if let Err(err) = self.verify_keys([&k]) {
3340            // Revert self.pending to the state it was in before calling this
3341            // function.
3342            let pending = self.pending.get_mut(&k).expect("inserted above");
3343            pending.truncate(restore_len);
3344            Err(err)
3345        } else {
3346            Ok(prev)
3347        }
3348    }
3349
3350    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3351    /// otherwise None.
3352    ///
3353    /// Returns an error if any uniqueness check failed.
3354    fn set_many(
3355        &mut self,
3356        kvs: BTreeMap<K, Option<V>>,
3357        ts: Timestamp,
3358    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3359        if kvs.is_empty() {
3360            return Ok(BTreeMap::new());
3361        }
3362
3363        let mut prevs = BTreeMap::new();
3364        let mut restores = BTreeMap::new();
3365
3366        for (k, v) in kvs {
3367            let prev = self.get(&k).cloned();
3368            let entry = self.pending.entry(k.clone()).or_default();
3369            restores.insert(k.clone(), entry.len());
3370
3371            match (v, prev.clone()) {
3372                (Some(v), Some(prev)) => {
3373                    entry.push(TransactionUpdate {
3374                        value: prev,
3375                        ts,
3376                        diff: Diff::MINUS_ONE,
3377                    });
3378                    entry.push(TransactionUpdate {
3379                        value: v,
3380                        ts,
3381                        diff: Diff::ONE,
3382                    });
3383                }
3384                (Some(v), None) => {
3385                    entry.push(TransactionUpdate {
3386                        value: v,
3387                        ts,
3388                        diff: Diff::ONE,
3389                    });
3390                }
3391                (None, Some(prev)) => {
3392                    entry.push(TransactionUpdate {
3393                        value: prev,
3394                        ts,
3395                        diff: Diff::MINUS_ONE,
3396                    });
3397                }
3398                (None, None) => {}
3399            }
3400
3401            prevs.insert(k, prev);
3402        }
3403
3404        // Check for uniqueness violation.
3405        if let Err(err) = self.verify_keys(prevs.keys()) {
3406            for (k, restore_len) in restores {
3407                // Revert self.pending to the state it was in before calling this
3408                // function.
3409                let pending = self.pending.get_mut(&k).expect("inserted above");
3410                pending.truncate(restore_len);
3411            }
3412            Err(err)
3413        } else {
3414            Ok(prevs)
3415        }
3416    }
3417
3418    /// Deletes items for which `f` returns true. Returns the keys and values of
3419    /// the deleted entries.
3420    ///
3421    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3422    /// better performance.
3423    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3424        let mut deleted = Vec::new();
3425        self.for_values_mut(|p, k, v| {
3426            if f(k, v) {
3427                deleted.push((k.clone(), v.clone()));
3428                p.entry(k.clone()).or_default().push(TransactionUpdate {
3429                    value: v.clone(),
3430                    ts,
3431                    diff: Diff::MINUS_ONE,
3432                });
3433            }
3434        });
3435        soft_assert_no_log!(self.verify().is_ok());
3436        deleted
3437    }
3438
3439    /// Deletes item with key `k`.
3440    ///
3441    /// Returns the value of the deleted entry, if it existed.
3442    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3443        self.set(k, None, ts)
3444            .expect("deleting an entry cannot violate uniqueness")
3445    }
3446
3447    /// Deletes items with key in `ks`.
3448    ///
3449    /// Returns the keys and values of the deleted entries.
3450    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3451        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3452        let prevs = self
3453            .set_many(kvs, ts)
3454            .expect("deleting entries cannot violate uniqueness");
3455        prevs
3456            .into_iter()
3457            .filter_map(|(k, v)| v.map(|v| (k, v)))
3458            .collect()
3459    }
3460}
3461
3462#[cfg(test)]
3463#[allow(clippy::unwrap_used)]
3464mod tests {
3465    use super::*;
3466
3467    use mz_ore::now::SYSTEM_TIME;
3468    use mz_ore::{assert_none, assert_ok};
3469    use mz_persist_client::cache::PersistClientCache;
3470    use mz_persist_types::PersistLocation;
3471    use semver::Version;
3472
3473    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3474    use crate::memory;
3475
3476    #[mz_ore::test]
3477    fn test_table_transaction_simple() {
3478        fn uniqueness_violation(a: &String, b: &String) -> bool {
3479            a == b
3480        }
3481        let mut table = TableTransaction::new_with_uniqueness_fn(
3482            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3483            uniqueness_violation,
3484        )
3485        .unwrap();
3486
3487        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3488        // for DurableCatalogError.
3489        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3490        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3491        assert!(
3492            table
3493                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3494                .is_err()
3495        );
3496        assert!(
3497            table
3498                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3499                .is_err()
3500        );
3501    }
3502
3503    #[mz_ore::test]
3504    fn test_table_transaction() {
3505        fn uniqueness_violation(a: &String, b: &String) -> bool {
3506            a == b
3507        }
3508        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3509
3510        fn commit(
3511            table: &mut BTreeMap<Vec<u8>, String>,
3512            mut pending: Vec<(Vec<u8>, String, Diff)>,
3513        ) {
3514            // Sort by diff so that we process retractions first.
3515            pending.sort_by(|a, b| a.2.cmp(&b.2));
3516            for (k, v, diff) in pending {
3517                if diff == Diff::MINUS_ONE {
3518                    let prev = table.remove(&k);
3519                    assert_eq!(prev, Some(v));
3520                } else if diff == Diff::ONE {
3521                    let prev = table.insert(k, v);
3522                    assert_eq!(prev, None);
3523                } else {
3524                    panic!("unexpected diff: {diff}");
3525                }
3526            }
3527        }
3528
3529        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3530        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3531        let mut table_txn =
3532            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3533        assert_eq!(table_txn.items_cloned(), table);
3534        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3535        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3536        assert_eq!(
3537            table_txn.items_cloned(),
3538            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3539        );
3540        assert_eq!(
3541            table_txn
3542                .update(|_k, _v| Some("v3".to_string()), 2)
3543                .unwrap(),
3544            Diff::ONE
3545        );
3546
3547        // Uniqueness violation.
3548        table_txn
3549            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3550            .unwrap_err();
3551
3552        table_txn
3553            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3554            .unwrap();
3555        assert_eq!(
3556            table_txn.items_cloned(),
3557            BTreeMap::from([
3558                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3559                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3560            ])
3561        );
3562        let err = table_txn
3563            .update(|_k, _v| Some("v1".to_string()), 5)
3564            .unwrap_err();
3565        assert!(
3566            matches!(err, DurableCatalogError::UniquenessViolation),
3567            "unexpected err: {err:?}"
3568        );
3569        let pending = table_txn.pending();
3570        assert_eq!(
3571            pending,
3572            vec![
3573                (
3574                    1i64.to_le_bytes().to_vec(),
3575                    "v1".to_string(),
3576                    Diff::MINUS_ONE
3577                ),
3578                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3579                (
3580                    2i64.to_le_bytes().to_vec(),
3581                    "v2".to_string(),
3582                    Diff::MINUS_ONE
3583                ),
3584                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3585            ]
3586        );
3587        commit(&mut table, pending);
3588        assert_eq!(
3589            table,
3590            BTreeMap::from([
3591                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3592                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3593            ])
3594        );
3595
3596        let mut table_txn =
3597            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3598        // Deleting then creating an item that has a uniqueness violation should work.
3599        assert_eq!(
3600            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3601            1
3602        );
3603        table_txn
3604            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3605            .unwrap();
3606        // Uniqueness violation in value.
3607        table_txn
3608            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3609            .unwrap_err();
3610        // Key already exists, expect error.
3611        table_txn
3612            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3613            .unwrap_err();
3614        assert_eq!(
3615            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3616            1
3617        );
3618        // Both the inserts work now because the key and uniqueness violation are gone.
3619        table_txn
3620            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3621            .unwrap();
3622        table_txn
3623            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3624            .unwrap();
3625        let pending = table_txn.pending();
3626        assert_eq!(
3627            pending,
3628            vec![
3629                (
3630                    1i64.to_le_bytes().to_vec(),
3631                    "v3".to_string(),
3632                    Diff::MINUS_ONE
3633                ),
3634                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3635                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3636            ]
3637        );
3638        commit(&mut table, pending);
3639        assert_eq!(
3640            table,
3641            BTreeMap::from([
3642                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3643                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3644                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3645            ])
3646        );
3647
3648        let mut table_txn =
3649            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3650        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3651        table_txn
3652            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3653            .unwrap();
3654
3655        commit(&mut table, table_txn.pending());
3656        assert_eq!(
3657            table,
3658            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3659        );
3660
3661        let mut table_txn =
3662            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3663        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3664        table_txn
3665            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3666            .unwrap();
3667        commit(&mut table, table_txn.pending());
3668        assert_eq!(
3669            table,
3670            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3671        );
3672
3673        // Verify we don't try to delete v3 or v4 during commit.
3674        let mut table_txn =
3675            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3676        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3677        table_txn
3678            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3679            .unwrap();
3680        table_txn
3681            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3682            .unwrap_err();
3683        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3684        table_txn
3685            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3686            .unwrap();
3687        commit(&mut table, table_txn.pending());
3688        assert_eq!(
3689            table.clone().into_iter().collect::<Vec<_>>(),
3690            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3691        );
3692
3693        // Test `set`.
3694        let mut table_txn =
3695            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3696        // Uniqueness violation.
3697        table_txn
3698            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3699            .unwrap_err();
3700        table_txn
3701            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3702            .unwrap();
3703        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3704        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3705        let pending = table_txn.pending();
3706        assert_eq!(
3707            pending,
3708            vec![
3709                (
3710                    1i64.to_le_bytes().to_vec(),
3711                    "v5".to_string(),
3712                    Diff::MINUS_ONE
3713                ),
3714                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3715            ]
3716        );
3717        commit(&mut table, pending);
3718        assert_eq!(
3719            table,
3720            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3721        );
3722
3723        // Duplicate `set`.
3724        let mut table_txn =
3725            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3726        table_txn
3727            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3728            .unwrap();
3729        let pending = table_txn.pending::<Vec<u8>, String>();
3730        assert!(pending.is_empty());
3731
3732        // Test `set_many`.
3733        let mut table_txn =
3734            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3735        // Uniqueness violation.
3736        table_txn
3737            .set_many(
3738                BTreeMap::from([
3739                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3740                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3741                ]),
3742                0,
3743            )
3744            .unwrap_err();
3745        table_txn
3746            .set_many(
3747                BTreeMap::from([
3748                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3749                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3750                ]),
3751                1,
3752            )
3753            .unwrap();
3754        table_txn
3755            .set_many(
3756                BTreeMap::from([
3757                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3758                    (3i64.to_le_bytes().to_vec(), None),
3759                ]),
3760                2,
3761            )
3762            .unwrap();
3763        let pending = table_txn.pending();
3764        assert_eq!(
3765            pending,
3766            vec![
3767                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3768                (
3769                    3i64.to_le_bytes().to_vec(),
3770                    "v6".to_string(),
3771                    Diff::MINUS_ONE
3772                ),
3773                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3774            ]
3775        );
3776        commit(&mut table, pending);
3777        assert_eq!(
3778            table,
3779            BTreeMap::from([
3780                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3781                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3782            ])
3783        );
3784
3785        // Duplicate `set_many`.
3786        let mut table_txn =
3787            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3788        table_txn
3789            .set_many(
3790                BTreeMap::from([
3791                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3792                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3793                ]),
3794                0,
3795            )
3796            .unwrap();
3797        let pending = table_txn.pending::<Vec<u8>, String>();
3798        assert!(pending.is_empty());
3799        commit(&mut table, pending);
3800        assert_eq!(
3801            table,
3802            BTreeMap::from([
3803                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3804                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3805            ])
3806        );
3807
3808        // Test `update_by_key`
3809        let mut table_txn =
3810            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3811        // Uniqueness violation.
3812        table_txn
3813            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3814            .unwrap_err();
3815        assert!(
3816            table_txn
3817                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3818                .unwrap()
3819        );
3820        assert!(
3821            !table_txn
3822                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3823                .unwrap()
3824        );
3825        let pending = table_txn.pending();
3826        assert_eq!(
3827            pending,
3828            vec![
3829                (
3830                    1i64.to_le_bytes().to_vec(),
3831                    "v6".to_string(),
3832                    Diff::MINUS_ONE
3833                ),
3834                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3835            ]
3836        );
3837        commit(&mut table, pending);
3838        assert_eq!(
3839            table,
3840            BTreeMap::from([
3841                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3842                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3843            ])
3844        );
3845
3846        // Duplicate `update_by_key`.
3847        let mut table_txn =
3848            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3849        assert!(
3850            table_txn
3851                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3852                .unwrap()
3853        );
3854        let pending = table_txn.pending::<Vec<u8>, String>();
3855        assert!(pending.is_empty());
3856        commit(&mut table, pending);
3857        assert_eq!(
3858            table,
3859            BTreeMap::from([
3860                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3861                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3862            ])
3863        );
3864
3865        // Test `update_by_keys`
3866        let mut table_txn =
3867            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3868        // Uniqueness violation.
3869        table_txn
3870            .update_by_keys(
3871                [
3872                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3873                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3874                ],
3875                0,
3876            )
3877            .unwrap_err();
3878        let n = table_txn
3879            .update_by_keys(
3880                [
3881                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3882                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3883                ],
3884                1,
3885            )
3886            .unwrap();
3887        assert_eq!(n, Diff::ONE);
3888        let n = table_txn
3889            .update_by_keys(
3890                [
3891                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3892                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3893                ],
3894                2,
3895            )
3896            .unwrap();
3897        assert_eq!(n, Diff::ZERO);
3898        let pending = table_txn.pending();
3899        assert_eq!(
3900            pending,
3901            vec![
3902                (
3903                    1i64.to_le_bytes().to_vec(),
3904                    "v8".to_string(),
3905                    Diff::MINUS_ONE
3906                ),
3907                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3908            ]
3909        );
3910        commit(&mut table, pending);
3911        assert_eq!(
3912            table,
3913            BTreeMap::from([
3914                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3915                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3916            ])
3917        );
3918
3919        // Duplicate `update_by_keys`.
3920        let mut table_txn =
3921            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3922        let n = table_txn
3923            .update_by_keys(
3924                [
3925                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3926                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3927                ],
3928                0,
3929            )
3930            .unwrap();
3931        assert_eq!(n, Diff::from(2));
3932        let pending = table_txn.pending::<Vec<u8>, String>();
3933        assert!(pending.is_empty());
3934        commit(&mut table, pending);
3935        assert_eq!(
3936            table,
3937            BTreeMap::from([
3938                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3939                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3940            ])
3941        );
3942
3943        // Test `delete_by_key`
3944        let mut table_txn =
3945            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3946        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3947        assert_eq!(prev, Some("v9".to_string()));
3948        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3949        assert_none!(prev);
3950        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3951        assert_none!(prev);
3952        let pending = table_txn.pending();
3953        assert_eq!(
3954            pending,
3955            vec![(
3956                1i64.to_le_bytes().to_vec(),
3957                "v9".to_string(),
3958                Diff::MINUS_ONE
3959            ),]
3960        );
3961        commit(&mut table, pending);
3962        assert_eq!(
3963            table,
3964            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3965        );
3966
3967        // Test `delete_by_keys`
3968        let mut table_txn =
3969            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3970        let prevs = table_txn.delete_by_keys(
3971            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3972            0,
3973        );
3974        assert_eq!(
3975            prevs,
3976            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3977        );
3978        let prevs = table_txn.delete_by_keys(
3979            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3980            1,
3981        );
3982        assert_eq!(prevs, vec![]);
3983        let prevs = table_txn.delete_by_keys(
3984            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3985            2,
3986        );
3987        assert_eq!(prevs, vec![]);
3988        let pending = table_txn.pending();
3989        assert_eq!(
3990            pending,
3991            vec![(
3992                42i64.to_le_bytes().to_vec(),
3993                "v7".to_string(),
3994                Diff::MINUS_ONE
3995            ),]
3996        );
3997        commit(&mut table, pending);
3998        assert_eq!(table, BTreeMap::new());
3999    }
4000
4001    #[mz_ore::test(tokio::test)]
4002    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
4003    async fn test_savepoint() {
4004        const VERSION: Version = Version::new(26, 0, 0);
4005        let mut persist_cache = PersistClientCache::new_no_metrics();
4006        persist_cache.cfg.build_version = VERSION;
4007        let persist_client = persist_cache
4008            .open(PersistLocation::new_in_mem())
4009            .await
4010            .unwrap();
4011        let state_builder = TestCatalogStateBuilder::new(persist_client)
4012            .with_default_deploy_generation()
4013            .with_version(VERSION);
4014
4015        // Initialize catalog.
4016        let _ = state_builder
4017            .clone()
4018            .unwrap_build()
4019            .await
4020            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4021            .await
4022            .unwrap()
4023            .0;
4024        let mut savepoint_state = state_builder
4025            .unwrap_build()
4026            .await
4027            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4028            .await
4029            .unwrap()
4030            .0;
4031
4032        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4033        assert!(!initial_snapshot.is_empty());
4034
4035        let db_name = "db";
4036        let db_owner = RoleId::User(42);
4037        let db_privileges = Vec::new();
4038        let mut txn = savepoint_state.transaction().await.unwrap();
4039        let (db_id, db_oid) = txn
4040            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4041            .unwrap();
4042        let commit_ts = txn.upper();
4043        txn.commit_internal(commit_ts).await.unwrap();
4044        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4045        let update = updates.into_element();
4046
4047        assert_eq!(update.diff, StateDiff::Addition);
4048
4049        let db = match update.kind {
4050            memory::objects::StateUpdateKind::Database(db) => db,
4051            update => panic!("unexpected update: {update:?}"),
4052        };
4053
4054        assert_eq!(db_id, db.id);
4055        assert_eq!(db_oid, db.oid);
4056        assert_eq!(db_name, db.name);
4057        assert_eq!(db_owner, db.owner_id);
4058        assert_eq!(db_privileges, db.privileges);
4059    }
4060
4061    #[mz_ore::test]
4062    fn test_allocate_introspection_source_index_id() {
4063        let cluster_variant: u8 = 0b0000_0001;
4064        let cluster_id_inner: u64 =
4065            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4066        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4067
4068        let cluster_id = ClusterId::System(cluster_id_inner);
4069        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4070
4071        let introspection_source_index_id: u64 =
4072            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4073
4074        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4075        {
4076            let mut cluster_variant_mask = 0xFF << 56;
4077            cluster_variant_mask &= introspection_source_index_id;
4078            cluster_variant_mask >>= 56;
4079            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4080        }
4081
4082        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4083        {
4084            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4085            cluster_id_inner_mask &= introspection_source_index_id;
4086            cluster_id_inner_mask >>= 8;
4087            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4088        }
4089
4090        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4091        {
4092            let mut log_variant_mask = 0xFF;
4093            log_variant_mask &= introspection_source_index_id;
4094            assert_eq!(
4095                log_variant_mask,
4096                u64::from(timely_messages_received_log_variant)
4097            );
4098        }
4099
4100        let (catalog_item_id, global_id) =
4101            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4102
4103        assert_eq!(
4104            catalog_item_id,
4105            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4106        );
4107        assert_eq!(
4108            global_id,
4109            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4110        );
4111    }
4112}