Skip to main content

mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35    RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53    ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54    Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55    DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57    ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67    SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68    SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69    USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70    USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
77/// An operation also logically groups multiple catalog updates together.
78#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81    #[derivative(Debug = "ignore")]
82    #[derivative(PartialEq = "ignore")]
83    durable_catalog: &'a mut dyn DurableCatalogState,
84    databases: TableTransaction<DatabaseKey, DatabaseValue>,
85    schemas: TableTransaction<SchemaKey, SchemaValue>,
86    items: TableTransaction<ItemKey, ItemValue>,
87    comments: TableTransaction<CommentKey, CommentValue>,
88    roles: TableTransaction<RoleKey, RoleValue>,
89    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90    clusters: TableTransaction<ClusterKey, ClusterValue>,
91    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92    introspection_sources:
93        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95    configs: TableTransaction<ConfigKey, ConfigValue>,
96    settings: TableTransaction<SettingKey, SettingValue>,
97    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103    storage_collection_metadata:
104        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107    // Don't make this a table transaction so that it's not read into the
108    // in-memory cache.
109    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110    /// The upper of `durable_catalog` at the start of the transaction.
111    upper: mz_repr::Timestamp,
112    /// The ID of the current operation of this transaction.
113    op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117    pub fn new(
118        durable_catalog: &'a mut dyn DurableCatalogState,
119        Snapshot {
120            databases,
121            schemas,
122            roles,
123            role_auth,
124            items,
125            comments,
126            clusters,
127            network_policies,
128            cluster_replicas,
129            introspection_sources,
130            id_allocator,
131            configs,
132            settings,
133            source_references,
134            system_object_mappings,
135            system_configurations,
136            default_privileges,
137            system_privileges,
138            storage_collection_metadata,
139            unfinalized_shards,
140            txn_wal_shard,
141        }: Snapshot,
142        upper: mz_repr::Timestamp,
143    ) -> Result<Transaction<'a>, CatalogError> {
144        Ok(Transaction {
145            durable_catalog,
146            databases: TableTransaction::new_with_uniqueness_fn(
147                databases,
148                |a: &DatabaseValue, b| a.name == b.name,
149            )?,
150            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151                a.database_id == b.database_id && a.name == b.name
152            })?,
153            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154                a.schema_id == b.schema_id && a.name == b.name && {
155                    // `item_type` is slow, only compute if needed.
156                    let a_type = a.item_type();
157                    let b_type = b.item_type();
158                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161                }
162            })?,
163            comments: TableTransaction::new(comments)?,
164            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165                a.name == b.name
166            })?,
167            role_auth: TableTransaction::new(role_auth)?,
168            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169                a.name == b.name
170            })?,
171            network_policies: TableTransaction::new_with_uniqueness_fn(
172                network_policies,
173                |a: &NetworkPolicyValue, b| a.name == b.name,
174            )?,
175            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176                cluster_replicas,
177                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178            )?,
179            introspection_sources: TableTransaction::new(introspection_sources)?,
180            id_allocator: TableTransaction::new(id_allocator)?,
181            configs: TableTransaction::new(configs)?,
182            settings: TableTransaction::new(settings)?,
183            source_references: TableTransaction::new(source_references)?,
184            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185            system_configurations: TableTransaction::new(system_configurations)?,
186            default_privileges: TableTransaction::new(default_privileges)?,
187            system_privileges: TableTransaction::new(system_privileges)?,
188            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190            // Uniqueness violations for this value occur at the key rather than
191            // the value (the key is the unit struct `()` so this is a singleton
192            // value).
193            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194            audit_log_updates: Vec::new(),
195            upper,
196            op_id: 0,
197        })
198    }
199
200    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201        let key = ItemKey { id: *id };
202        self.items
203            .get(&key)
204            .map(|v| DurableType::from_key_value(key, v.clone()))
205    }
206
207    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208        self.items
209            .items()
210            .into_iter()
211            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212            .sorted_by_key(|Item { id, .. }| *id)
213    }
214
215    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216        self.insert_audit_log_events([event]);
217    }
218
219    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220        let events = events
221            .into_iter()
222            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223        self.audit_log_updates.extend(events);
224    }
225
226    pub fn insert_user_database(
227        &mut self,
228        database_name: &str,
229        owner_id: RoleId,
230        privileges: Vec<MzAclItem>,
231        temporary_oids: &HashSet<u32>,
232    ) -> Result<(DatabaseId, u32), CatalogError> {
233        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234        let id = DatabaseId::User(id);
235        let oid = self.allocate_oid(temporary_oids)?;
236        self.insert_database(id, database_name, owner_id, privileges, oid)?;
237        Ok((id, oid))
238    }
239
240    pub(crate) fn insert_database(
241        &mut self,
242        id: DatabaseId,
243        database_name: &str,
244        owner_id: RoleId,
245        privileges: Vec<MzAclItem>,
246        oid: u32,
247    ) -> Result<u32, CatalogError> {
248        match self.databases.insert(
249            DatabaseKey { id },
250            DatabaseValue {
251                name: database_name.to_string(),
252                owner_id,
253                privileges,
254                oid,
255            },
256            self.op_id,
257        ) {
258            Ok(_) => Ok(oid),
259            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260        }
261    }
262
263    pub fn insert_user_schema(
264        &mut self,
265        database_id: DatabaseId,
266        schema_name: &str,
267        owner_id: RoleId,
268        privileges: Vec<MzAclItem>,
269        temporary_oids: &HashSet<u32>,
270    ) -> Result<(SchemaId, u32), CatalogError> {
271        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272        let id = SchemaId::User(id);
273        let oid = self.allocate_oid(temporary_oids)?;
274        self.insert_schema(
275            id,
276            Some(database_id),
277            schema_name.to_string(),
278            owner_id,
279            privileges,
280            oid,
281        )?;
282        Ok((id, oid))
283    }
284
285    pub fn insert_system_schema(
286        &mut self,
287        schema_id: u64,
288        schema_name: &str,
289        owner_id: RoleId,
290        privileges: Vec<MzAclItem>,
291        oid: u32,
292    ) -> Result<(), CatalogError> {
293        let id = SchemaId::System(schema_id);
294        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295    }
296
297    pub(crate) fn insert_schema(
298        &mut self,
299        schema_id: SchemaId,
300        database_id: Option<DatabaseId>,
301        schema_name: String,
302        owner_id: RoleId,
303        privileges: Vec<MzAclItem>,
304        oid: u32,
305    ) -> Result<(), CatalogError> {
306        match self.schemas.insert(
307            SchemaKey { id: schema_id },
308            SchemaValue {
309                database_id,
310                name: schema_name.clone(),
311                owner_id,
312                privileges,
313                oid,
314            },
315            self.op_id,
316        ) {
317            Ok(_) => Ok(()),
318            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319        }
320    }
321
322    pub fn insert_builtin_role(
323        &mut self,
324        id: RoleId,
325        name: String,
326        attributes: RoleAttributesRaw,
327        membership: RoleMembership,
328        vars: RoleVars,
329        oid: u32,
330    ) -> Result<RoleId, CatalogError> {
331        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332        self.insert_role(id, name, attributes, membership, vars, oid)?;
333        Ok(id)
334    }
335
336    pub fn insert_user_role(
337        &mut self,
338        name: String,
339        attributes: RoleAttributesRaw,
340        membership: RoleMembership,
341        vars: RoleVars,
342        temporary_oids: &HashSet<u32>,
343    ) -> Result<(RoleId, u32), CatalogError> {
344        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345        let id = RoleId::User(id);
346        let oid = self.allocate_oid(temporary_oids)?;
347        self.insert_role(id, name, attributes, membership, vars, oid)?;
348        Ok((id, oid))
349    }
350
351    fn insert_role(
352        &mut self,
353        id: RoleId,
354        name: String,
355        attributes: RoleAttributesRaw,
356        membership: RoleMembership,
357        vars: RoleVars,
358        oid: u32,
359    ) -> Result<(), CatalogError> {
360        if let Some(ref password) = attributes.password {
361            let hash = mz_auth::hash::scram256_hash(
362                password,
363                &attributes
364                    .scram_iterations
365                    .or_else(|| {
366                        soft_panic_or_log!(
367                            "Hash iterations must be set if a password is provided."
368                        );
369                        None
370                    })
371                    // This should never happen, but rather than panicking we'll
372                    // set a known secure value as a fallback.
373                    .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374            )
375            .expect("password hash should be valid");
376            match self.role_auth.insert(
377                RoleAuthKey { role_id: id },
378                RoleAuthValue {
379                    password_hash: Some(hash),
380                    updated_at: SYSTEM_TIME(),
381                },
382                self.op_id,
383            ) {
384                Ok(_) => {}
385                Err(_) => {
386                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387                }
388            }
389        }
390
391        match self.roles.insert(
392            RoleKey { id },
393            RoleValue {
394                name: name.clone(),
395                attributes: attributes.into(),
396                membership,
397                vars,
398                oid,
399            },
400            self.op_id,
401        ) {
402            Ok(_) => Ok(()),
403            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404        }
405    }
406
407    /// Panics if any introspection source id is not a system id
408    pub fn insert_user_cluster(
409        &mut self,
410        cluster_id: ClusterId,
411        cluster_name: &str,
412        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413        owner_id: RoleId,
414        privileges: Vec<MzAclItem>,
415        config: ClusterConfig,
416        temporary_oids: &HashSet<u32>,
417    ) -> Result<(), CatalogError> {
418        self.insert_cluster(
419            cluster_id,
420            cluster_name,
421            introspection_source_indexes,
422            owner_id,
423            privileges,
424            config,
425            temporary_oids,
426        )
427    }
428
429    /// Panics if any introspection source id is not a system id
430    pub fn insert_system_cluster(
431        &mut self,
432        cluster_name: &str,
433        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434        privileges: Vec<MzAclItem>,
435        owner_id: RoleId,
436        config: ClusterConfig,
437        temporary_oids: &HashSet<u32>,
438    ) -> Result<(), CatalogError> {
439        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441        self.insert_cluster(
442            cluster_id,
443            cluster_name,
444            introspection_source_indexes,
445            owner_id,
446            privileges,
447            config,
448            temporary_oids,
449        )
450    }
451
452    fn insert_cluster(
453        &mut self,
454        cluster_id: ClusterId,
455        cluster_name: &str,
456        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457        owner_id: RoleId,
458        privileges: Vec<MzAclItem>,
459        config: ClusterConfig,
460        temporary_oids: &HashSet<u32>,
461    ) -> Result<(), CatalogError> {
462        if let Err(_) = self.clusters.insert(
463            ClusterKey { id: cluster_id },
464            ClusterValue {
465                name: cluster_name.to_string(),
466                owner_id,
467                privileges,
468                config,
469            },
470            self.op_id,
471        ) {
472            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473        };
474
475        let amount = usize_to_u64(introspection_source_indexes.len());
476        let oids = self.allocate_oids(amount, temporary_oids)?;
477        let introspection_source_indexes: Vec<_> = introspection_source_indexes
478            .into_iter()
479            .zip_eq(oids)
480            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481            .collect();
482        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483            let introspection_source_index = IntrospectionSourceIndex {
484                cluster_id,
485                name: builtin.name.to_string(),
486                item_id,
487                index_id,
488                oid,
489            };
490            let (key, value) = introspection_source_index.into_key_value();
491            self.introspection_sources
492                .insert(key, value, self.op_id)
493                .expect("no uniqueness violation");
494        }
495
496        Ok(())
497    }
498
499    pub fn rename_cluster(
500        &mut self,
501        cluster_id: ClusterId,
502        cluster_name: &str,
503        cluster_to_name: &str,
504    ) -> Result<(), CatalogError> {
505        let key = ClusterKey { id: cluster_id };
506
507        match self.clusters.update(
508            |k, v| {
509                if *k == key {
510                    let mut value = v.clone();
511                    value.name = cluster_to_name.to_string();
512                    Some(value)
513                } else {
514                    None
515                }
516            },
517            self.op_id,
518        )? {
519            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520            Diff::ONE => Ok(()),
521            n => panic!(
522                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523            ),
524        }
525    }
526
527    pub fn rename_cluster_replica(
528        &mut self,
529        replica_id: ReplicaId,
530        replica_name: &QualifiedReplica,
531        replica_to_name: &str,
532    ) -> Result<(), CatalogError> {
533        let key = ClusterReplicaKey { id: replica_id };
534
535        match self.cluster_replicas.update(
536            |k, v| {
537                if *k == key {
538                    let mut value = v.clone();
539                    value.name = replica_to_name.to_string();
540                    Some(value)
541                } else {
542                    None
543                }
544            },
545            self.op_id,
546        )? {
547            Diff::ZERO => {
548                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549            }
550            Diff::ONE => Ok(()),
551            n => panic!(
552                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553            ),
554        }
555    }
556
557    pub fn insert_cluster_replica(
558        &mut self,
559        cluster_id: ClusterId,
560        replica_name: &str,
561        config: ReplicaConfig,
562        owner_id: RoleId,
563    ) -> Result<ReplicaId, CatalogError> {
564        let replica_id = match cluster_id {
565            ClusterId::System(_) => self.allocate_system_replica_id()?,
566            ClusterId::User(_) => self.allocate_user_replica_id()?,
567        };
568        self.insert_cluster_replica_with_id(
569            cluster_id,
570            replica_id,
571            replica_name,
572            config,
573            owner_id,
574        )?;
575        Ok(replica_id)
576    }
577
578    pub(crate) fn insert_cluster_replica_with_id(
579        &mut self,
580        cluster_id: ClusterId,
581        replica_id: ReplicaId,
582        replica_name: &str,
583        config: ReplicaConfig,
584        owner_id: RoleId,
585    ) -> Result<(), CatalogError> {
586        if let Err(_) = self.cluster_replicas.insert(
587            ClusterReplicaKey { id: replica_id },
588            ClusterReplicaValue {
589                cluster_id,
590                name: replica_name.into(),
591                config,
592                owner_id,
593            },
594            self.op_id,
595        ) {
596            let cluster = self
597                .clusters
598                .get(&ClusterKey { id: cluster_id })
599                .expect("cluster exists");
600            return Err(SqlCatalogError::DuplicateReplica(
601                replica_name.to_string(),
602                cluster.name.to_string(),
603            )
604            .into());
605        };
606        Ok(())
607    }
608
609    pub fn insert_user_network_policy(
610        &mut self,
611        name: String,
612        rules: Vec<NetworkPolicyRule>,
613        privileges: Vec<MzAclItem>,
614        owner_id: RoleId,
615        temporary_oids: &HashSet<u32>,
616    ) -> Result<NetworkPolicyId, CatalogError> {
617        let oid = self.allocate_oid(temporary_oids)?;
618        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619        let id = NetworkPolicyId::User(id);
620        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621    }
622
623    pub fn insert_network_policy(
624        &mut self,
625        id: NetworkPolicyId,
626        name: String,
627        rules: Vec<NetworkPolicyRule>,
628        privileges: Vec<MzAclItem>,
629        owner_id: RoleId,
630        oid: u32,
631    ) -> Result<NetworkPolicyId, CatalogError> {
632        match self.network_policies.insert(
633            NetworkPolicyKey { id },
634            NetworkPolicyValue {
635                name: name.clone(),
636                rules,
637                privileges,
638                owner_id,
639                oid,
640            },
641            self.op_id,
642        ) {
643            Ok(_) => Ok(id),
644            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645        }
646    }
647
648    /// Updates persisted information about persisted introspection source
649    /// indexes.
650    ///
651    /// Panics if provided id is not a system id.
652    pub fn update_introspection_source_index_gids(
653        &mut self,
654        mappings: impl Iterator<
655            Item = (
656                ClusterId,
657                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658            ),
659        >,
660    ) -> Result<(), CatalogError> {
661        for (cluster_id, updates) in mappings {
662            for (name, item_id, index_id, oid) in updates {
663                let introspection_source_index = IntrospectionSourceIndex {
664                    cluster_id,
665                    name,
666                    item_id,
667                    index_id,
668                    oid,
669                };
670                let (key, value) = introspection_source_index.into_key_value();
671
672                let prev = self
673                    .introspection_sources
674                    .set(key, Some(value), self.op_id)?;
675                if prev.is_none() {
676                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677                        "{index_id}"
678                    ))
679                    .into());
680                }
681            }
682        }
683        Ok(())
684    }
685
686    pub fn insert_user_item(
687        &mut self,
688        id: CatalogItemId,
689        global_id: GlobalId,
690        schema_id: SchemaId,
691        item_name: &str,
692        create_sql: String,
693        owner_id: RoleId,
694        privileges: Vec<MzAclItem>,
695        temporary_oids: &HashSet<u32>,
696        versions: BTreeMap<RelationVersion, GlobalId>,
697    ) -> Result<u32, CatalogError> {
698        let oid = self.allocate_oid(temporary_oids)?;
699        self.insert_item(
700            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701        )?;
702        Ok(oid)
703    }
704
705    pub fn insert_item(
706        &mut self,
707        id: CatalogItemId,
708        oid: u32,
709        global_id: GlobalId,
710        schema_id: SchemaId,
711        item_name: &str,
712        create_sql: String,
713        owner_id: RoleId,
714        privileges: Vec<MzAclItem>,
715        extra_versions: BTreeMap<RelationVersion, GlobalId>,
716    ) -> Result<(), CatalogError> {
717        match self.items.insert(
718            ItemKey { id },
719            ItemValue {
720                schema_id,
721                name: item_name.to_string(),
722                create_sql,
723                owner_id,
724                privileges,
725                oid,
726                global_id,
727                extra_versions,
728            },
729            self.op_id,
730        ) {
731            Ok(_) => Ok(()),
732            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733        }
734    }
735
736    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738    }
739
740    pub fn get_and_increment_id_by(
741        &mut self,
742        key: String,
743        amount: u64,
744    ) -> Result<Vec<u64>, CatalogError> {
745        assert!(
746            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747            "system item IDs cannot be allocated outside of bootstrap"
748        );
749
750        let current_id = self
751            .id_allocator
752            .items()
753            .get(&IdAllocKey { name: key.clone() })
754            .unwrap_or_else(|| panic!("{key} id allocator missing"))
755            .next_id;
756        let next_id = current_id
757            .checked_add(amount)
758            .ok_or(SqlCatalogError::IdExhaustion)?;
759        let prev = self.id_allocator.set(
760            IdAllocKey { name: key },
761            Some(IdAllocValue { next_id }),
762            self.op_id,
763        )?;
764        assert_eq!(
765            prev,
766            Some(IdAllocValue {
767                next_id: current_id
768            })
769        );
770        Ok((current_id..next_id).collect())
771    }
772
773    pub fn allocate_system_item_ids(
774        &mut self,
775        amount: u64,
776    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777        assert!(
778            !self.durable_catalog.is_bootstrap_complete(),
779            "we can only allocate system item IDs during bootstrap"
780        );
781        Ok(self
782            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783            .into_iter()
784            // TODO(alter_table): Use separate ID allocators.
785            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786            .collect())
787    }
788
789    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
790    /// from the `cluster_id` and `log_variant`.
791    ///
792    /// Introspection source indexes are a special edge case of items. They are considered system
793    /// items, but they are the only system item that can be created by the user at any time. All
794    /// other system items can only be created by the system during the startup of an upgrade.
795    ///
796    /// Furthermore, all other system item IDs are allocated deterministically in the same order
797    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
798    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
799    /// only one of them can successfully write the IDs down to the catalog. This removes the need
800    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
801    ///
802    /// Since introspection IDs can be allocated at any time, read-only instances would either need
803    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
804    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
805    ///
806    /// Introspection source index IDs are 64 bit integers, with the following format (not to
807    /// scale):
808    ///
809    /// -------------------------------------------------------------
810    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
811    /// |--------------------|------------------------|-------------|
812    /// |       8-bits       |         48-bits        |   8-bits    |
813    /// -------------------------------------------------------------
814    ///
815    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
816    ///                          to.
817    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
818    ///                          belongs to.
819    /// Log Variant:             A unique number indicating the log variant this index is on.
820    pub fn allocate_introspection_source_index_id(
821        cluster_id: &ClusterId,
822        log_variant: LogVariant,
823    ) -> (CatalogItemId, GlobalId) {
824        let cluster_variant: u8 = match cluster_id {
825            ClusterId::System(_) => 1,
826            ClusterId::User(_) => 2,
827        };
828        let cluster_id: u64 = cluster_id.inner_id();
829        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830        assert_eq!(
831            CLUSTER_ID_MASK & cluster_id,
832            0,
833            "invalid cluster ID: {cluster_id}"
834        );
835        let log_variant: u8 = match log_variant {
836            LogVariant::Timely(TimelyLog::Operates) => 1,
837            LogVariant::Timely(TimelyLog::Channels) => 2,
838            LogVariant::Timely(TimelyLog::Elapsed) => 3,
839            LogVariant::Timely(TimelyLog::Histogram) => 4,
840            LogVariant::Timely(TimelyLog::Addresses) => 5,
841            LogVariant::Timely(TimelyLog::Parks) => 6,
842            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844            LogVariant::Timely(TimelyLog::Reachability) => 9,
845            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849            LogVariant::Differential(DifferentialLog::Sharing) => 14,
850            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
863            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
864            LogVariant::Compute(ComputeLog::LirMapping) => 30,
865            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
866            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
867        };
868
869        let mut id: u64 = u64::from(cluster_variant) << 56;
870        id |= cluster_id << 8;
871        id |= u64::from(log_variant);
872
873        (
874            CatalogItemId::IntrospectionSourceIndex(id),
875            GlobalId::IntrospectionSourceIndex(id),
876        )
877    }
878
879    pub fn allocate_user_item_ids(
880        &mut self,
881        amount: u64,
882    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
883        Ok(self
884            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
885            .into_iter()
886            // TODO(alter_table): Use separate ID allocators.
887            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
888            .collect())
889    }
890
891    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
892        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
893        Ok(ReplicaId::User(id))
894    }
895
896    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
897        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
898        Ok(ReplicaId::System(id))
899    }
900
901    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
902        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
903    }
904
905    pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
906        self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
907    }
908
909    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
910    /// object.
911    #[mz_ore::instrument]
912    fn allocate_oids(
913        &mut self,
914        amount: u64,
915        temporary_oids: &HashSet<u32>,
916    ) -> Result<Vec<u32>, CatalogError> {
917        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
918        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
919        struct UserOid(u32);
920
921        impl UserOid {
922            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
923                if oid < FIRST_USER_OID {
924                    Err(anyhow!("invalid user OID {oid}"))
925                } else {
926                    Ok(UserOid(oid))
927                }
928            }
929        }
930
931        impl std::ops::AddAssign<u32> for UserOid {
932            fn add_assign(&mut self, rhs: u32) {
933                let (res, overflow) = self.0.overflowing_add(rhs);
934                self.0 = if overflow { FIRST_USER_OID + res } else { res };
935            }
936        }
937
938        if amount > u32::MAX.into() {
939            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
940        }
941
942        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
943        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
944        // However, benchmarking shows that this doesn't make a noticeable difference and the other
945        // approach requires making sure that allocator always stays in-sync which can be
946        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
947        let mut allocated_oids = HashSet::with_capacity(
948            self.databases.len()
949                + self.schemas.len()
950                + self.roles.len()
951                + self.items.len()
952                + self.introspection_sources.len()
953                + temporary_oids.len(),
954        );
955        self.databases.for_values(|_, value| {
956            allocated_oids.insert(value.oid);
957        });
958        self.schemas.for_values(|_, value| {
959            allocated_oids.insert(value.oid);
960        });
961        self.roles.for_values(|_, value| {
962            allocated_oids.insert(value.oid);
963        });
964        self.items.for_values(|_, value| {
965            allocated_oids.insert(value.oid);
966        });
967        self.introspection_sources.for_values(|_, value| {
968            allocated_oids.insert(value.oid);
969        });
970
971        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
972
973        let start_oid: u32 = self
974            .id_allocator
975            .items()
976            .get(&IdAllocKey {
977                name: OID_ALLOC_KEY.to_string(),
978            })
979            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
980            .next_id
981            .try_into()
982            .expect("we should never persist an oid outside of the u32 range");
983        let mut current_oid = UserOid::new(start_oid)
984            .expect("we should never persist an oid outside of user OID range");
985        let mut oids = Vec::new();
986        while oids.len() < u64_to_usize(amount) {
987            if !is_allocated(current_oid.0) {
988                oids.push(current_oid.0);
989            }
990            current_oid += 1;
991
992            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
993                // We've exhausted all possible OIDs and still don't have `amount`.
994                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
995            }
996        }
997
998        let next_id = current_oid.0;
999        let prev = self.id_allocator.set(
1000            IdAllocKey {
1001                name: OID_ALLOC_KEY.to_string(),
1002            },
1003            Some(IdAllocValue {
1004                next_id: next_id.into(),
1005            }),
1006            self.op_id,
1007        )?;
1008        assert_eq!(
1009            prev,
1010            Some(IdAllocValue {
1011                next_id: start_oid.into(),
1012            })
1013        );
1014
1015        Ok(oids)
1016    }
1017
1018    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1019    /// object.
1020    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1021        self.allocate_oids(1, temporary_oids)
1022            .map(|oids| oids.into_element())
1023    }
1024
1025    pub(crate) fn insert_id_allocator(
1026        &mut self,
1027        name: String,
1028        next_id: u64,
1029    ) -> Result<(), CatalogError> {
1030        match self.id_allocator.insert(
1031            IdAllocKey { name: name.clone() },
1032            IdAllocValue { next_id },
1033            self.op_id,
1034        ) {
1035            Ok(_) => Ok(()),
1036            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1037        }
1038    }
1039
1040    /// Removes the database `id` from the transaction.
1041    ///
1042    /// Returns an error if `id` is not found.
1043    ///
1044    /// Runtime is linear with respect to the total number of databases in the catalog.
1045    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1046    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1047        let prev = self
1048            .databases
1049            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1050        if prev.is_some() {
1051            Ok(())
1052        } else {
1053            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1054        }
1055    }
1056
1057    /// Removes all databases in `databases` from the transaction.
1058    ///
1059    /// Returns an error if any id in `databases` is not found.
1060    ///
1061    /// NOTE: On error, there still may be some databases removed from the transaction. It
1062    /// is up to the caller to either abort the transaction or commit.
1063    pub fn remove_databases(
1064        &mut self,
1065        databases: &BTreeSet<DatabaseId>,
1066    ) -> Result<(), CatalogError> {
1067        if databases.is_empty() {
1068            return Ok(());
1069        }
1070
1071        let to_remove = databases
1072            .iter()
1073            .map(|id| (DatabaseKey { id: *id }, None))
1074            .collect();
1075        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1076        prev.retain(|_k, val| val.is_none());
1077
1078        if !prev.is_empty() {
1079            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1080            return Err(SqlCatalogError::UnknownDatabase(err).into());
1081        }
1082
1083        Ok(())
1084    }
1085
1086    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1087    ///
1088    /// Returns an error if `(database_id, schema_id)` is not found.
1089    ///
1090    /// Runtime is linear with respect to the total number of schemas in the catalog.
1091    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1092    pub fn remove_schema(
1093        &mut self,
1094        database_id: &Option<DatabaseId>,
1095        schema_id: &SchemaId,
1096    ) -> Result<(), CatalogError> {
1097        let prev = self
1098            .schemas
1099            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1100        if prev.is_some() {
1101            Ok(())
1102        } else {
1103            let database_name = match database_id {
1104                Some(id) => format!("{id}."),
1105                None => "".to_string(),
1106            };
1107            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1108        }
1109    }
1110
1111    /// Removes all schemas in `schemas` from the transaction.
1112    ///
1113    /// Returns an error if any id in `schemas` is not found.
1114    ///
1115    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1116    /// is up to the caller to either abort the transaction or commit.
1117    pub fn remove_schemas(
1118        &mut self,
1119        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1120    ) -> Result<(), CatalogError> {
1121        if schemas.is_empty() {
1122            return Ok(());
1123        }
1124
1125        let to_remove = schemas
1126            .iter()
1127            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1128            .collect();
1129        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1130        prev.retain(|_k, v| v.is_none());
1131
1132        if !prev.is_empty() {
1133            let err = prev
1134                .keys()
1135                .map(|k| {
1136                    let db_spec = schemas.get(&k.id).expect("should_exist");
1137                    let db_name = match db_spec {
1138                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1139                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1140                    };
1141                    format!("{}.{}", db_name, k.id)
1142                })
1143                .join(", ");
1144
1145            return Err(SqlCatalogError::UnknownSchema(err).into());
1146        }
1147
1148        Ok(())
1149    }
1150
1151    pub fn remove_source_references(
1152        &mut self,
1153        source_id: CatalogItemId,
1154    ) -> Result<(), CatalogError> {
1155        let deleted = self
1156            .source_references
1157            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1158            .is_some();
1159        if deleted {
1160            Ok(())
1161        } else {
1162            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1163        }
1164    }
1165
1166    /// Removes all user roles in `roles` from the transaction.
1167    ///
1168    /// Returns an error if any id in `roles` is not found.
1169    ///
1170    /// NOTE: On error, there still may be some roles removed from the transaction. It
1171    /// is up to the caller to either abort the transaction or commit.
1172    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1173        assert!(
1174            roles.iter().all(|id| id.is_user()),
1175            "cannot delete non-user roles"
1176        );
1177        self.remove_roles(roles)
1178    }
1179
1180    /// Removes all roles in `roles` from the transaction.
1181    ///
1182    /// Returns an error if any id in `roles` is not found.
1183    ///
1184    /// NOTE: On error, there still may be some roles removed from the transaction. It
1185    /// is up to the caller to either abort the transaction or commit.
1186    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1187        if roles.is_empty() {
1188            return Ok(());
1189        }
1190
1191        let to_remove_keys = roles
1192            .iter()
1193            .map(|role_id| RoleKey { id: *role_id })
1194            .collect::<Vec<_>>();
1195
1196        let to_remove_roles = to_remove_keys
1197            .iter()
1198            .map(|role_key| (role_key.clone(), None))
1199            .collect();
1200
1201        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1202
1203        let to_remove_role_auth = to_remove_keys
1204            .iter()
1205            .map(|role_key| {
1206                (
1207                    RoleAuthKey {
1208                        role_id: role_key.id,
1209                    },
1210                    None,
1211                )
1212            })
1213            .collect();
1214
1215        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1216
1217        prev.retain(|_k, v| v.is_none());
1218        if !prev.is_empty() {
1219            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1220            return Err(SqlCatalogError::UnknownRole(err).into());
1221        }
1222
1223        role_auth_prev.retain(|_k, v| v.is_none());
1224        // The reason we don't to the same check as above is that the role auth table
1225        // is not required to have all roles in the role table.
1226
1227        Ok(())
1228    }
1229
1230    /// Removes all cluster in `clusters` from the transaction.
1231    ///
1232    /// Returns an error if any id in `clusters` is not found.
1233    ///
1234    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1235    /// the caller to either abort the transaction or commit.
1236    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1237        if clusters.is_empty() {
1238            return Ok(());
1239        }
1240
1241        let to_remove = clusters
1242            .iter()
1243            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1244            .collect();
1245        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1246
1247        prev.retain(|_k, v| v.is_none());
1248        if !prev.is_empty() {
1249            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1250            return Err(SqlCatalogError::UnknownCluster(err).into());
1251        }
1252
1253        // Cascade delete introspection sources and cluster replicas.
1254        //
1255        // TODO(benesch): this doesn't seem right. Cascade deletions should
1256        // be entirely the domain of the higher catalog layer, not the
1257        // storage layer.
1258        self.cluster_replicas
1259            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1260        self.introspection_sources
1261            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1262
1263        Ok(())
1264    }
1265
1266    /// Removes the cluster replica `id` from the transaction.
1267    ///
1268    /// Returns an error if `id` is not found.
1269    ///
1270    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1271    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1272    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1273        let deleted = self
1274            .cluster_replicas
1275            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1276            .is_some();
1277        if deleted {
1278            Ok(())
1279        } else {
1280            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1281        }
1282    }
1283
1284    /// Removes all cluster replicas in `replicas` from the transaction.
1285    ///
1286    /// Returns an error if any id in `replicas` is not found.
1287    ///
1288    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1289    /// is up to the caller to either abort the transaction or commit.
1290    pub fn remove_cluster_replicas(
1291        &mut self,
1292        replicas: &BTreeSet<ReplicaId>,
1293    ) -> Result<(), CatalogError> {
1294        if replicas.is_empty() {
1295            return Ok(());
1296        }
1297
1298        let to_remove = replicas
1299            .iter()
1300            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1301            .collect();
1302        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1303
1304        prev.retain(|_k, v| v.is_none());
1305        if !prev.is_empty() {
1306            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1307            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1308        }
1309
1310        Ok(())
1311    }
1312
1313    /// Removes item `id` from the transaction.
1314    ///
1315    /// Returns an error if `id` is not found.
1316    ///
1317    /// Runtime is linear with respect to the total number of items in the catalog.
1318    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1319    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1320        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1321        if prev.is_some() {
1322            Ok(())
1323        } else {
1324            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1325        }
1326    }
1327
1328    /// Removes all items in `ids` from the transaction.
1329    ///
1330    /// Returns an error if any id in `ids` is not found.
1331    ///
1332    /// NOTE: On error, there still may be some items removed from the transaction. It is
1333    /// up to the caller to either abort the transaction or commit.
1334    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1335        if ids.is_empty() {
1336            return Ok(());
1337        }
1338
1339        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1340        let n = self.items.delete_by_keys(ks, self.op_id).len();
1341        if n == ids.len() {
1342            Ok(())
1343        } else {
1344            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1345            let mut unknown = ids.difference(&item_ids);
1346            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1347        }
1348    }
1349
1350    /// Removes all system object mappings in `descriptions` from the transaction.
1351    ///
1352    /// Returns an error if any description in `descriptions` is not found.
1353    ///
1354    /// NOTE: On error, there still may be some items removed from the transaction. It is
1355    /// up to the caller to either abort the transaction or commit.
1356    pub fn remove_system_object_mappings(
1357        &mut self,
1358        descriptions: BTreeSet<SystemObjectDescription>,
1359    ) -> Result<(), CatalogError> {
1360        if descriptions.is_empty() {
1361            return Ok(());
1362        }
1363
1364        let ks: Vec<_> = descriptions
1365            .clone()
1366            .into_iter()
1367            .map(|desc| GidMappingKey {
1368                schema_name: desc.schema_name,
1369                object_type: desc.object_type,
1370                object_name: desc.object_name,
1371            })
1372            .collect();
1373        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1374
1375        if n == descriptions.len() {
1376            Ok(())
1377        } else {
1378            let item_descriptions = self
1379                .system_gid_mapping
1380                .items()
1381                .keys()
1382                .map(|k| SystemObjectDescription {
1383                    schema_name: k.schema_name.clone(),
1384                    object_type: k.object_type.clone(),
1385                    object_name: k.object_name.clone(),
1386                })
1387                .collect();
1388            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1389                format!(
1390                    "{} {}.{}",
1391                    desc.object_type, desc.schema_name, desc.object_name
1392                )
1393            });
1394            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1395        }
1396    }
1397
1398    /// Removes all introspection source indexes in `indexes` from the transaction.
1399    ///
1400    /// Returns an error if any index in `indexes` is not found.
1401    ///
1402    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1403    /// up to the caller to either abort the transaction or commit.
1404    pub fn remove_introspection_source_indexes(
1405        &mut self,
1406        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1407    ) -> Result<(), CatalogError> {
1408        if introspection_source_indexes.is_empty() {
1409            return Ok(());
1410        }
1411
1412        let ks: Vec<_> = introspection_source_indexes
1413            .clone()
1414            .into_iter()
1415            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1416            .collect();
1417        let n = self
1418            .introspection_sources
1419            .delete_by_keys(ks, self.op_id)
1420            .len();
1421        if n == introspection_source_indexes.len() {
1422            Ok(())
1423        } else {
1424            let txn_indexes = self
1425                .introspection_sources
1426                .items()
1427                .keys()
1428                .map(|k| (k.cluster_id, k.name.clone()))
1429                .collect();
1430            let mut unknown = introspection_source_indexes
1431                .difference(&txn_indexes)
1432                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1433            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1434        }
1435    }
1436
1437    /// Updates item `id` in the transaction to `item_name` and `item`.
1438    ///
1439    /// Returns an error if `id` is not found.
1440    ///
1441    /// Runtime is linear with respect to the total number of items in the catalog.
1442    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1443    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1444        let updated =
1445            self.items
1446                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1447        if updated {
1448            Ok(())
1449        } else {
1450            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1451        }
1452    }
1453
1454    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1455    /// corresponding value in `items`.
1456    ///
1457    /// Returns an error if any id in `items` is not found.
1458    ///
1459    /// NOTE: On error, there still may be some items updated in the transaction. It is
1460    /// up to the caller to either abort the transaction or commit.
1461    pub fn update_items(
1462        &mut self,
1463        items: BTreeMap<CatalogItemId, Item>,
1464    ) -> Result<(), CatalogError> {
1465        if items.is_empty() {
1466            return Ok(());
1467        }
1468
1469        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1470        let kvs: Vec<_> = items
1471            .clone()
1472            .into_iter()
1473            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1474            .collect();
1475        let n = self.items.update_by_keys(kvs, self.op_id)?;
1476        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1477        if n == update_ids.len() {
1478            Ok(())
1479        } else {
1480            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1481            let mut unknown = update_ids.difference(&item_ids);
1482            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1483        }
1484    }
1485
1486    /// Updates role `id` in the transaction to `role`.
1487    ///
1488    /// Returns an error if `id` is not found.
1489    ///
1490    /// Runtime is linear with respect to the total number of items in the catalog.
1491    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1492    /// You should model it after [`Self::update_items`].
1493    pub fn update_role(
1494        &mut self,
1495        id: RoleId,
1496        role: Role,
1497        password: PasswordAction,
1498    ) -> Result<(), CatalogError> {
1499        let key = RoleKey { id };
1500        if self.roles.get(&key).is_some() {
1501            let auth_key = RoleAuthKey { role_id: id };
1502
1503            match password {
1504                PasswordAction::Set(new_password) => {
1505                    let hash = mz_auth::hash::scram256_hash(
1506                        &new_password.password,
1507                        &new_password.scram_iterations,
1508                    )
1509                    .expect("password hash should be valid");
1510                    let value = RoleAuthValue {
1511                        password_hash: Some(hash),
1512                        updated_at: SYSTEM_TIME(),
1513                    };
1514
1515                    if self.role_auth.get(&auth_key).is_some() {
1516                        self.role_auth
1517                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1518                    } else {
1519                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1520                    }
1521                }
1522                PasswordAction::Clear => {
1523                    let value = RoleAuthValue {
1524                        password_hash: None,
1525                        updated_at: SYSTEM_TIME(),
1526                    };
1527                    if self.role_auth.get(&auth_key).is_some() {
1528                        self.role_auth
1529                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1530                    }
1531                }
1532                PasswordAction::NoChange => {}
1533            }
1534
1535            self.roles
1536                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1537
1538            Ok(())
1539        } else {
1540            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1541        }
1542    }
1543
1544    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1545    /// corresponding value in `roles`.
1546    ///
1547    /// This function does *not* write role_authentication information to the catalog.
1548    /// It is purely for updating the role itself.
1549    ///
1550    /// Returns an error if any id in `roles` is not found.
1551    ///
1552    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1553    /// up to the caller to either abort the transaction or commit.
1554    pub fn update_roles_without_auth(
1555        &mut self,
1556        roles: BTreeMap<RoleId, Role>,
1557    ) -> Result<(), CatalogError> {
1558        if roles.is_empty() {
1559            return Ok(());
1560        }
1561
1562        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1563        let kvs: Vec<_> = roles
1564            .into_iter()
1565            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1566            .collect();
1567        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1568        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1569
1570        if n == update_role_ids.len() {
1571            Ok(())
1572        } else {
1573            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1574            let mut unknown = update_role_ids.difference(&role_ids);
1575            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1576        }
1577    }
1578
1579    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1580    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1581    ///
1582    /// Panics if provided id is not a system id.
1583    pub fn update_system_object_mappings(
1584        &mut self,
1585        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1586    ) -> Result<(), CatalogError> {
1587        if mappings.is_empty() {
1588            return Ok(());
1589        }
1590
1591        let n = self.system_gid_mapping.update(
1592            |_k, v| {
1593                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1594                    let (_, new_value) = mapping.clone().into_key_value();
1595                    Some(new_value)
1596                } else {
1597                    None
1598                }
1599            },
1600            self.op_id,
1601        )?;
1602
1603        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1604            != mappings.len()
1605        {
1606            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1607            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1608        }
1609
1610        Ok(())
1611    }
1612
1613    /// Updates cluster `id` in the transaction to `cluster`.
1614    ///
1615    /// Returns an error if `id` is not found.
1616    ///
1617    /// Runtime is linear with respect to the total number of clusters in the catalog.
1618    /// DO NOT call this function in a loop.
1619    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1620        let updated = self.clusters.update_by_key(
1621            ClusterKey { id },
1622            cluster.into_key_value().1,
1623            self.op_id,
1624        )?;
1625        if updated {
1626            Ok(())
1627        } else {
1628            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1629        }
1630    }
1631
1632    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1633    ///
1634    /// Returns an error if `replica_id` is not found.
1635    ///
1636    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1637    /// DO NOT call this function in a loop.
1638    pub fn update_cluster_replica(
1639        &mut self,
1640        replica_id: ReplicaId,
1641        replica: ClusterReplica,
1642    ) -> Result<(), CatalogError> {
1643        let updated = self.cluster_replicas.update_by_key(
1644            ClusterReplicaKey { id: replica_id },
1645            replica.into_key_value().1,
1646            self.op_id,
1647        )?;
1648        if updated {
1649            Ok(())
1650        } else {
1651            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1652        }
1653    }
1654
1655    /// Updates database `id` in the transaction to `database`.
1656    ///
1657    /// Returns an error if `id` is not found.
1658    ///
1659    /// Runtime is linear with respect to the total number of databases in the catalog.
1660    /// DO NOT call this function in a loop.
1661    pub fn update_database(
1662        &mut self,
1663        id: DatabaseId,
1664        database: Database,
1665    ) -> Result<(), CatalogError> {
1666        let updated = self.databases.update_by_key(
1667            DatabaseKey { id },
1668            database.into_key_value().1,
1669            self.op_id,
1670        )?;
1671        if updated {
1672            Ok(())
1673        } else {
1674            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1675        }
1676    }
1677
1678    /// Updates schema `schema_id` in the transaction to `schema`.
1679    ///
1680    /// Returns an error if `schema_id` is not found.
1681    ///
1682    /// Runtime is linear with respect to the total number of schemas in the catalog.
1683    /// DO NOT call this function in a loop.
1684    pub fn update_schema(
1685        &mut self,
1686        schema_id: SchemaId,
1687        schema: Schema,
1688    ) -> Result<(), CatalogError> {
1689        let updated = self.schemas.update_by_key(
1690            SchemaKey { id: schema_id },
1691            schema.into_key_value().1,
1692            self.op_id,
1693        )?;
1694        if updated {
1695            Ok(())
1696        } else {
1697            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1698        }
1699    }
1700
1701    /// Updates `network_policy_id` in the transaction to `network policy`.
1702    ///
1703    /// Returns an error if `id` is not found.
1704    ///
1705    /// Runtime is linear with respect to the total number of databases in the catalog.
1706    /// DO NOT call this function in a loop.
1707    pub fn update_network_policy(
1708        &mut self,
1709        id: NetworkPolicyId,
1710        network_policy: NetworkPolicy,
1711    ) -> Result<(), CatalogError> {
1712        let updated = self.network_policies.update_by_key(
1713            NetworkPolicyKey { id },
1714            network_policy.into_key_value().1,
1715            self.op_id,
1716        )?;
1717        if updated {
1718            Ok(())
1719        } else {
1720            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1721        }
1722    }
1723    /// Removes all network policies in `network policies` from the transaction.
1724    ///
1725    /// Returns an error if any id in `network policy` is not found.
1726    ///
1727    /// NOTE: On error, there still may be some roles removed from the transaction. It
1728    /// is up to the caller to either abort the transaction or commit.
1729    pub fn remove_network_policies(
1730        &mut self,
1731        network_policies: &BTreeSet<NetworkPolicyId>,
1732    ) -> Result<(), CatalogError> {
1733        if network_policies.is_empty() {
1734            return Ok(());
1735        }
1736
1737        let to_remove = network_policies
1738            .iter()
1739            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1740            .collect();
1741        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1742        assert!(
1743            prev.iter().all(|(k, _)| k.id.is_user()),
1744            "cannot delete non-user network policy"
1745        );
1746
1747        prev.retain(|_k, v| v.is_none());
1748        if !prev.is_empty() {
1749            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1750            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1751        }
1752
1753        Ok(())
1754    }
1755    /// Set persisted default privilege.
1756    ///
1757    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1758    pub fn set_default_privilege(
1759        &mut self,
1760        role_id: RoleId,
1761        database_id: Option<DatabaseId>,
1762        schema_id: Option<SchemaId>,
1763        object_type: ObjectType,
1764        grantee: RoleId,
1765        privileges: Option<AclMode>,
1766    ) -> Result<(), CatalogError> {
1767        self.default_privileges.set(
1768            DefaultPrivilegesKey {
1769                role_id,
1770                database_id,
1771                schema_id,
1772                object_type,
1773                grantee,
1774            },
1775            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1776            self.op_id,
1777        )?;
1778        Ok(())
1779    }
1780
1781    /// Set persisted default privileges.
1782    pub fn set_default_privileges(
1783        &mut self,
1784        default_privileges: Vec<DefaultPrivilege>,
1785    ) -> Result<(), CatalogError> {
1786        if default_privileges.is_empty() {
1787            return Ok(());
1788        }
1789
1790        let default_privileges = default_privileges
1791            .into_iter()
1792            .map(DurableType::into_key_value)
1793            .map(|(k, v)| (k, Some(v)))
1794            .collect();
1795        self.default_privileges
1796            .set_many(default_privileges, self.op_id)?;
1797        Ok(())
1798    }
1799
1800    /// Set persisted system privilege.
1801    ///
1802    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1803    pub fn set_system_privilege(
1804        &mut self,
1805        grantee: RoleId,
1806        grantor: RoleId,
1807        acl_mode: Option<AclMode>,
1808    ) -> Result<(), CatalogError> {
1809        self.system_privileges.set(
1810            SystemPrivilegesKey { grantee, grantor },
1811            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1812            self.op_id,
1813        )?;
1814        Ok(())
1815    }
1816
1817    /// Set persisted system privileges.
1818    pub fn set_system_privileges(
1819        &mut self,
1820        system_privileges: Vec<MzAclItem>,
1821    ) -> Result<(), CatalogError> {
1822        if system_privileges.is_empty() {
1823            return Ok(());
1824        }
1825
1826        let system_privileges = system_privileges
1827            .into_iter()
1828            .map(DurableType::into_key_value)
1829            .map(|(k, v)| (k, Some(v)))
1830            .collect();
1831        self.system_privileges
1832            .set_many(system_privileges, self.op_id)?;
1833        Ok(())
1834    }
1835
1836    /// Set persisted setting.
1837    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1838        self.settings.set(
1839            SettingKey { name },
1840            value.map(|value| SettingValue { value }),
1841            self.op_id,
1842        )?;
1843        Ok(())
1844    }
1845
1846    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1847        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1848    }
1849
1850    /// Insert persisted introspection source index.
1851    pub fn insert_introspection_source_indexes(
1852        &mut self,
1853        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1854        temporary_oids: &HashSet<u32>,
1855    ) -> Result<(), CatalogError> {
1856        if introspection_source_indexes.is_empty() {
1857            return Ok(());
1858        }
1859
1860        let amount = usize_to_u64(introspection_source_indexes.len());
1861        let oids = self.allocate_oids(amount, temporary_oids)?;
1862        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1863            .into_iter()
1864            .zip_eq(oids)
1865            .map(
1866                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1867                    cluster_id,
1868                    name,
1869                    item_id,
1870                    index_id,
1871                    oid,
1872                },
1873            )
1874            .collect();
1875
1876        for introspection_source_index in introspection_source_indexes {
1877            let (key, value) = introspection_source_index.into_key_value();
1878            self.introspection_sources.insert(key, value, self.op_id)?;
1879        }
1880
1881        Ok(())
1882    }
1883
1884    /// Set persisted system object mappings.
1885    pub fn set_system_object_mappings(
1886        &mut self,
1887        mappings: Vec<SystemObjectMapping>,
1888    ) -> Result<(), CatalogError> {
1889        if mappings.is_empty() {
1890            return Ok(());
1891        }
1892
1893        let mappings = mappings
1894            .into_iter()
1895            .map(DurableType::into_key_value)
1896            .map(|(k, v)| (k, Some(v)))
1897            .collect();
1898        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1899        Ok(())
1900    }
1901
1902    /// Set persisted replica.
1903    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1904        if replicas.is_empty() {
1905            return Ok(());
1906        }
1907
1908        let replicas = replicas
1909            .into_iter()
1910            .map(DurableType::into_key_value)
1911            .map(|(k, v)| (k, Some(v)))
1912            .collect();
1913        self.cluster_replicas.set_many(replicas, self.op_id)?;
1914        Ok(())
1915    }
1916
1917    /// Set persisted configuration.
1918    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1919        match value {
1920            Some(value) => {
1921                let config = Config { key, value };
1922                let (key, value) = config.into_key_value();
1923                self.configs.set(key, Some(value), self.op_id)?;
1924            }
1925            None => {
1926                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1927            }
1928        }
1929        Ok(())
1930    }
1931
1932    /// Get the value of a persisted config.
1933    pub fn get_config(&self, key: String) -> Option<u64> {
1934        self.configs
1935            .get(&ConfigKey { key })
1936            .map(|entry| entry.value)
1937    }
1938
1939    /// Get the value of a persisted setting.
1940    pub fn get_setting(&self, name: String) -> Option<&str> {
1941        self.settings
1942            .get(&SettingKey { name })
1943            .map(|entry| &*entry.value)
1944    }
1945
1946    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1947        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1948            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1949    }
1950
1951    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1952        self.set_setting(
1953            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1954            Some(shard_id.to_string()),
1955        )
1956    }
1957
1958    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1959        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1960            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1961    }
1962
1963    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1964        self.set_setting(
1965            EXPRESSION_CACHE_SHARD_KEY.to_string(),
1966            Some(shard_id.to_string()),
1967        )
1968    }
1969
1970    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
1971    /// match the `with_0dt_deployment_max_wait` "system var" value.
1972    ///
1973    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1974    /// but use it in boot before Launch Darkly is available.
1975    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1976        self.set_config(
1977            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1978            Some(
1979                value
1980                    .as_millis()
1981                    .try_into()
1982                    .expect("max wait fits into u64"),
1983            ),
1984        )
1985    }
1986
1987    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
1988    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
1989    /// value.
1990    ///
1991    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1992    /// but use it in boot before Launch Darkly is available.
1993    pub fn set_0dt_deployment_ddl_check_interval(
1994        &mut self,
1995        value: Duration,
1996    ) -> Result<(), CatalogError> {
1997        self.set_config(
1998            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
1999            Some(
2000                value
2001                    .as_millis()
2002                    .try_into()
2003                    .expect("ddl check interval fits into u64"),
2004            ),
2005        )
2006    }
2007
2008    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
2009    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
2010    ///
2011    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2012    /// but use it in boot before Launch Darkly is available.
2013    pub fn set_enable_0dt_deployment_panic_after_timeout(
2014        &mut self,
2015        value: bool,
2016    ) -> Result<(), CatalogError> {
2017        self.set_config(
2018            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2019            Some(u64::from(value)),
2020        )
2021    }
2022
2023    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2024    /// match the `with_0dt_deployment_max_wait` "system var" value.
2025    ///
2026    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2027    /// but use it in boot before LaunchDarkly is available.
2028    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2029        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2030    }
2031
2032    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2033    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2034    /// value.
2035    ///
2036    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2037    /// use it in boot before LaunchDarkly is available.
2038    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2039        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2040    }
2041
2042    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2043    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2044    /// var" value.
2045    ///
2046    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2047    /// use it in boot before LaunchDarkly is available.
2048    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2049        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2050    }
2051
2052    /// Updates the catalog `system_config_synced` "config" value to true.
2053    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2054        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2055    }
2056
2057    pub fn update_comment(
2058        &mut self,
2059        object_id: CommentObjectId,
2060        sub_component: Option<usize>,
2061        comment: Option<String>,
2062    ) -> Result<(), CatalogError> {
2063        let key = CommentKey {
2064            object_id,
2065            sub_component,
2066        };
2067        let value = comment.map(|c| CommentValue { comment: c });
2068        self.comments.set(key, value, self.op_id)?;
2069
2070        Ok(())
2071    }
2072
2073    pub fn drop_comments(
2074        &mut self,
2075        object_ids: &BTreeSet<CommentObjectId>,
2076    ) -> Result<(), CatalogError> {
2077        if object_ids.is_empty() {
2078            return Ok(());
2079        }
2080
2081        self.comments
2082            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2083        Ok(())
2084    }
2085
2086    pub fn update_source_references(
2087        &mut self,
2088        source_id: CatalogItemId,
2089        references: Vec<SourceReference>,
2090        updated_at: u64,
2091    ) -> Result<(), CatalogError> {
2092        let key = SourceReferencesKey { source_id };
2093        let value = SourceReferencesValue {
2094            references,
2095            updated_at,
2096        };
2097        self.source_references.set(key, Some(value), self.op_id)?;
2098        Ok(())
2099    }
2100
2101    /// Upserts persisted system configuration `name` to `value`.
2102    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2103        let key = ServerConfigurationKey {
2104            name: name.to_string(),
2105        };
2106        let value = ServerConfigurationValue { value };
2107        self.system_configurations
2108            .set(key, Some(value), self.op_id)?;
2109        Ok(())
2110    }
2111
2112    /// Removes persisted system configuration `name`.
2113    pub fn remove_system_config(&mut self, name: &str) {
2114        let key = ServerConfigurationKey {
2115            name: name.to_string(),
2116        };
2117        self.system_configurations
2118            .set(key, None, self.op_id)
2119            .expect("cannot have uniqueness violation");
2120    }
2121
2122    /// Removes all persisted system configurations.
2123    pub fn clear_system_configs(&mut self) {
2124        self.system_configurations.delete(|_k, _v| true, self.op_id);
2125    }
2126
2127    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2128        match self.configs.insert(
2129            ConfigKey { key: key.clone() },
2130            ConfigValue { value },
2131            self.op_id,
2132        ) {
2133            Ok(_) => Ok(()),
2134            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2135        }
2136    }
2137
2138    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2139        self.clusters
2140            .items()
2141            .into_iter()
2142            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2143    }
2144
2145    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2146        self.cluster_replicas
2147            .items()
2148            .into_iter()
2149            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2150    }
2151
2152    pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2153        self.databases
2154            .items()
2155            .into_iter()
2156            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2157    }
2158
2159    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2160        self.roles
2161            .items()
2162            .into_iter()
2163            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2164    }
2165
2166    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2167        self.network_policies
2168            .items()
2169            .into_iter()
2170            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2171    }
2172
2173    pub fn get_system_object_mappings(
2174        &self,
2175    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2176        self.system_gid_mapping
2177            .items()
2178            .into_iter()
2179            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2180    }
2181
2182    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2183        self.schemas
2184            .items()
2185            .into_iter()
2186            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2187    }
2188
2189    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2190        self.system_configurations
2191            .items()
2192            .into_iter()
2193            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2194    }
2195
2196    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2197        let key = SchemaKey { id: *id };
2198        self.schemas
2199            .get(&key)
2200            .map(|v| DurableType::from_key_value(key, v.clone()))
2201    }
2202
2203    pub fn get_introspection_source_indexes(
2204        &self,
2205        cluster_id: ClusterId,
2206    ) -> BTreeMap<&str, (GlobalId, u32)> {
2207        self.introspection_sources
2208            .items()
2209            .into_iter()
2210            .filter(|(k, _v)| k.cluster_id == cluster_id)
2211            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2212            .collect()
2213    }
2214
2215    pub fn get_catalog_content_version(&self) -> Option<&str> {
2216        self.settings
2217            .get(&SettingKey {
2218                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2219            })
2220            .map(|value| &*value.value)
2221    }
2222
2223    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2224        self.settings
2225            .get(&SettingKey {
2226                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2227            })
2228            .map(|value| value.value.clone())
2229    }
2230
2231    /// Commit the current operation within the transaction. This does not cause anything to be
2232    /// written durably, but signals to the current transaction that we are moving on to the next
2233    /// operation.
2234    ///
2235    /// Returns the updates of the committed operation.
2236    #[must_use]
2237    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2238        let updates = self.get_op_updates();
2239        self.commit_op();
2240        updates
2241    }
2242
2243    fn get_op_updates(&self) -> Vec<StateUpdate> {
2244        fn get_collection_op_updates<'a, T>(
2245            table_txn: &'a TableTransaction<T::Key, T::Value>,
2246            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2247            op: Timestamp,
2248        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2249        where
2250            T::Key: Ord + Eq + Clone + Debug,
2251            T::Value: Ord + Clone + Debug,
2252            T: DurableType,
2253        {
2254            table_txn
2255                .pending
2256                .iter()
2257                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2258                .filter_map(move |(k, v)| {
2259                    if v.ts == op {
2260                        let key = k.clone();
2261                        let value = v.value.clone();
2262                        let diff = v.diff.clone().try_into().expect("invalid diff");
2263                        let update = DurableType::from_key_value(key, value);
2264                        let kind = kind_fn(update);
2265                        Some((kind, diff))
2266                    } else {
2267                        None
2268                    }
2269                })
2270        }
2271
2272        fn get_large_collection_op_updates<'a, T>(
2273            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2274            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2275            op: Timestamp,
2276        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2277        where
2278            T::Key: Ord + Eq + Clone + Debug,
2279            T: DurableType<Value = ()>,
2280        {
2281            collection.iter().filter_map(move |(k, diff, ts)| {
2282                if *ts == op {
2283                    let key = k.clone();
2284                    let diff = diff.clone().try_into().expect("invalid diff");
2285                    let update = DurableType::from_key_value(key, ());
2286                    let kind = kind_fn(update);
2287                    Some((kind, diff))
2288                } else {
2289                    None
2290                }
2291            })
2292        }
2293
2294        let Transaction {
2295            durable_catalog: _,
2296            databases,
2297            schemas,
2298            items,
2299            comments,
2300            roles,
2301            role_auth,
2302            clusters,
2303            network_policies,
2304            cluster_replicas,
2305            introspection_sources,
2306            system_gid_mapping,
2307            system_configurations,
2308            default_privileges,
2309            source_references,
2310            system_privileges,
2311            audit_log_updates,
2312            storage_collection_metadata,
2313            unfinalized_shards,
2314            // Not representable as a `StateUpdate`.
2315            id_allocator: _,
2316            configs: _,
2317            settings: _,
2318            txn_wal_shard: _,
2319            upper,
2320            op_id: _,
2321        } = &self;
2322
2323        let updates = std::iter::empty()
2324            .chain(get_collection_op_updates(
2325                roles,
2326                StateUpdateKind::Role,
2327                self.op_id,
2328            ))
2329            .chain(get_collection_op_updates(
2330                role_auth,
2331                StateUpdateKind::RoleAuth,
2332                self.op_id,
2333            ))
2334            .chain(get_collection_op_updates(
2335                databases,
2336                StateUpdateKind::Database,
2337                self.op_id,
2338            ))
2339            .chain(get_collection_op_updates(
2340                schemas,
2341                StateUpdateKind::Schema,
2342                self.op_id,
2343            ))
2344            .chain(get_collection_op_updates(
2345                default_privileges,
2346                StateUpdateKind::DefaultPrivilege,
2347                self.op_id,
2348            ))
2349            .chain(get_collection_op_updates(
2350                system_privileges,
2351                StateUpdateKind::SystemPrivilege,
2352                self.op_id,
2353            ))
2354            .chain(get_collection_op_updates(
2355                system_configurations,
2356                StateUpdateKind::SystemConfiguration,
2357                self.op_id,
2358            ))
2359            .chain(get_collection_op_updates(
2360                clusters,
2361                StateUpdateKind::Cluster,
2362                self.op_id,
2363            ))
2364            .chain(get_collection_op_updates(
2365                network_policies,
2366                StateUpdateKind::NetworkPolicy,
2367                self.op_id,
2368            ))
2369            .chain(get_collection_op_updates(
2370                introspection_sources,
2371                StateUpdateKind::IntrospectionSourceIndex,
2372                self.op_id,
2373            ))
2374            .chain(get_collection_op_updates(
2375                cluster_replicas,
2376                StateUpdateKind::ClusterReplica,
2377                self.op_id,
2378            ))
2379            .chain(get_collection_op_updates(
2380                system_gid_mapping,
2381                StateUpdateKind::SystemObjectMapping,
2382                self.op_id,
2383            ))
2384            .chain(get_collection_op_updates(
2385                items,
2386                StateUpdateKind::Item,
2387                self.op_id,
2388            ))
2389            .chain(get_collection_op_updates(
2390                comments,
2391                StateUpdateKind::Comment,
2392                self.op_id,
2393            ))
2394            .chain(get_collection_op_updates(
2395                source_references,
2396                StateUpdateKind::SourceReferences,
2397                self.op_id,
2398            ))
2399            .chain(get_collection_op_updates(
2400                storage_collection_metadata,
2401                StateUpdateKind::StorageCollectionMetadata,
2402                self.op_id,
2403            ))
2404            .chain(get_collection_op_updates(
2405                unfinalized_shards,
2406                StateUpdateKind::UnfinalizedShard,
2407                self.op_id,
2408            ))
2409            .chain(get_large_collection_op_updates(
2410                audit_log_updates,
2411                StateUpdateKind::AuditLog,
2412                self.op_id,
2413            ))
2414            .map(|(kind, diff)| StateUpdate {
2415                kind,
2416                ts: upper.clone(),
2417                diff,
2418            })
2419            .collect();
2420
2421        updates
2422    }
2423
2424    pub fn is_savepoint(&self) -> bool {
2425        self.durable_catalog.is_savepoint()
2426    }
2427
2428    fn commit_op(&mut self) {
2429        self.op_id += 1;
2430    }
2431
2432    pub fn op_id(&self) -> Timestamp {
2433        self.op_id
2434    }
2435
2436    pub fn upper(&self) -> mz_repr::Timestamp {
2437        self.upper
2438    }
2439
2440    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2441        let audit_log_updates = self
2442            .audit_log_updates
2443            .into_iter()
2444            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2445            .collect();
2446
2447        let txn_batch = TransactionBatch {
2448            databases: self.databases.pending(),
2449            schemas: self.schemas.pending(),
2450            items: self.items.pending(),
2451            comments: self.comments.pending(),
2452            roles: self.roles.pending(),
2453            role_auth: self.role_auth.pending(),
2454            clusters: self.clusters.pending(),
2455            cluster_replicas: self.cluster_replicas.pending(),
2456            network_policies: self.network_policies.pending(),
2457            introspection_sources: self.introspection_sources.pending(),
2458            id_allocator: self.id_allocator.pending(),
2459            configs: self.configs.pending(),
2460            source_references: self.source_references.pending(),
2461            settings: self.settings.pending(),
2462            system_gid_mapping: self.system_gid_mapping.pending(),
2463            system_configurations: self.system_configurations.pending(),
2464            default_privileges: self.default_privileges.pending(),
2465            system_privileges: self.system_privileges.pending(),
2466            storage_collection_metadata: self.storage_collection_metadata.pending(),
2467            unfinalized_shards: self.unfinalized_shards.pending(),
2468            txn_wal_shard: self.txn_wal_shard.pending(),
2469            audit_log_updates,
2470            upper: self.upper,
2471        };
2472        (txn_batch, self.durable_catalog)
2473    }
2474
2475    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2476    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2477    /// before proceeding. In general, this must be fatal to the calling process. We do not
2478    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2479    ///
2480    /// The transaction is committed at `commit_ts`.
2481    ///
2482    /// Returns what the upper was directly after the transaction committed.
2483    ///
2484    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2485    /// catalog is not writeable.
2486    #[mz_ore::instrument(level = "debug")]
2487    pub(crate) async fn commit_internal(
2488        self,
2489        commit_ts: mz_repr::Timestamp,
2490    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2491        let (mut txn_batch, durable_catalog) = self.into_parts();
2492        let TransactionBatch {
2493            databases,
2494            schemas,
2495            items,
2496            comments,
2497            roles,
2498            role_auth,
2499            clusters,
2500            cluster_replicas,
2501            network_policies,
2502            introspection_sources,
2503            id_allocator,
2504            configs,
2505            source_references,
2506            settings,
2507            system_gid_mapping,
2508            system_configurations,
2509            default_privileges,
2510            system_privileges,
2511            storage_collection_metadata,
2512            unfinalized_shards,
2513            txn_wal_shard,
2514            audit_log_updates,
2515            upper,
2516        } = &mut txn_batch;
2517        // Consolidate in memory because it will likely be faster than consolidating after the
2518        // transaction has been made durable.
2519        differential_dataflow::consolidation::consolidate_updates(databases);
2520        differential_dataflow::consolidation::consolidate_updates(schemas);
2521        differential_dataflow::consolidation::consolidate_updates(items);
2522        differential_dataflow::consolidation::consolidate_updates(comments);
2523        differential_dataflow::consolidation::consolidate_updates(roles);
2524        differential_dataflow::consolidation::consolidate_updates(role_auth);
2525        differential_dataflow::consolidation::consolidate_updates(clusters);
2526        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2527        differential_dataflow::consolidation::consolidate_updates(network_policies);
2528        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2529        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2530        differential_dataflow::consolidation::consolidate_updates(configs);
2531        differential_dataflow::consolidation::consolidate_updates(settings);
2532        differential_dataflow::consolidation::consolidate_updates(source_references);
2533        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2534        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2535        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2536        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2537        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2538        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2539        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2540        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2541
2542        assert!(
2543            commit_ts >= *upper,
2544            "expected commit ts, {}, to be greater than or equal to upper, {}",
2545            commit_ts,
2546            upper
2547        );
2548        let upper = durable_catalog
2549            .commit_transaction(txn_batch, commit_ts)
2550            .await?;
2551        Ok((durable_catalog, upper))
2552    }
2553
2554    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2555    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2556    /// before proceeding. In general, this must be fatal to the calling process. We do not
2557    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2558    ///
2559    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2560    /// catalog is not writeable.
2561    ///
2562    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2563    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2564    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2565    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2566    ///
2567    /// An alternative implementation would be for the caller to explicitly consume their updates
2568    /// after committing and only then apply the updates in-memory. While this removes assumptions
2569    /// about the caller in this method, in practice it results in duplicate work on every commit.
2570    #[mz_ore::instrument(level = "debug")]
2571    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2572        let op_updates = self.get_op_updates();
2573        assert!(
2574            op_updates.is_empty(),
2575            "unconsumed transaction updates: {op_updates:?}"
2576        );
2577
2578        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2579        // Drain all the updates from the commit since it is assumed that they were already applied.
2580        let updates = durable_storage.sync_updates(upper).await?;
2581        // Writable and savepoint catalogs should have consumed all updates before committing a
2582        // transaction, otherwise the commit was performed with an out of date state.
2583        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2584        // updates before committing.
2585        soft_assert_no_log!(
2586            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2587            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2588        );
2589        Ok(())
2590    }
2591}
2592
2593use crate::durable::async_trait;
2594
2595use super::objects::{RoleAuthKey, RoleAuthValue};
2596
2597#[async_trait]
2598impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2599    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2600        self.storage_collection_metadata
2601            .items()
2602            .into_iter()
2603            .map(
2604                |(
2605                    StorageCollectionMetadataKey { id },
2606                    StorageCollectionMetadataValue { shard },
2607                )| { (*id, shard.clone()) },
2608            )
2609            .collect()
2610    }
2611
2612    fn insert_collection_metadata(
2613        &mut self,
2614        metadata: BTreeMap<GlobalId, ShardId>,
2615    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2616        for (id, shard) in metadata {
2617            self.storage_collection_metadata
2618                .insert(
2619                    StorageCollectionMetadataKey { id },
2620                    StorageCollectionMetadataValue {
2621                        shard: shard.clone(),
2622                    },
2623                    self.op_id,
2624                )
2625                .map_err(|err| match err {
2626                    DurableCatalogError::DuplicateKey => {
2627                        StorageError::CollectionMetadataAlreadyExists(id)
2628                    }
2629                    DurableCatalogError::UniquenessViolation => {
2630                        StorageError::PersistShardAlreadyInUse(shard)
2631                    }
2632                    err => StorageError::Generic(anyhow::anyhow!(err)),
2633                })?;
2634        }
2635        Ok(())
2636    }
2637
2638    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2639        let ks: Vec<_> = ids
2640            .into_iter()
2641            .map(|id| StorageCollectionMetadataKey { id })
2642            .collect();
2643        self.storage_collection_metadata
2644            .delete_by_keys(ks, self.op_id)
2645            .into_iter()
2646            .map(
2647                |(
2648                    StorageCollectionMetadataKey { id },
2649                    StorageCollectionMetadataValue { shard },
2650                )| (id, shard),
2651            )
2652            .collect()
2653    }
2654
2655    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2656        self.unfinalized_shards
2657            .items()
2658            .into_iter()
2659            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2660            .collect()
2661    }
2662
2663    fn insert_unfinalized_shards(
2664        &mut self,
2665        s: BTreeSet<ShardId>,
2666    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2667        for shard in s {
2668            match self
2669                .unfinalized_shards
2670                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2671            {
2672                // Inserting duplicate keys has no effect.
2673                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2674                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2675            };
2676        }
2677        Ok(())
2678    }
2679
2680    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2681        let ks: Vec<_> = shards
2682            .into_iter()
2683            .map(|shard| UnfinalizedShardKey { shard })
2684            .collect();
2685        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2686    }
2687
2688    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2689        self.txn_wal_shard
2690            .values()
2691            .iter()
2692            .next()
2693            .map(|TxnWalShardValue { shard }| *shard)
2694    }
2695
2696    fn write_txn_wal_shard(
2697        &mut self,
2698        shard: ShardId,
2699    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2700        self.txn_wal_shard
2701            .insert((), TxnWalShardValue { shard }, self.op_id)
2702            .map_err(|err| match err {
2703                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2704                err => StorageError::Generic(anyhow::anyhow!(err)),
2705            })
2706    }
2707}
2708
2709/// Describes a set of changes to apply as the result of a catalog transaction.
2710#[derive(Debug, Clone, Default, PartialEq)]
2711pub struct TransactionBatch {
2712    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2713    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2714    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2715    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2716    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2717    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2718    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2719    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2720    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2721    pub(crate) introspection_sources: Vec<(
2722        proto::ClusterIntrospectionSourceIndexKey,
2723        proto::ClusterIntrospectionSourceIndexValue,
2724        Diff,
2725    )>,
2726    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2727    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2728    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2729    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2730    pub(crate) system_configurations: Vec<(
2731        proto::ServerConfigurationKey,
2732        proto::ServerConfigurationValue,
2733        Diff,
2734    )>,
2735    pub(crate) default_privileges: Vec<(
2736        proto::DefaultPrivilegesKey,
2737        proto::DefaultPrivilegesValue,
2738        Diff,
2739    )>,
2740    pub(crate) source_references: Vec<(
2741        proto::SourceReferencesKey,
2742        proto::SourceReferencesValue,
2743        Diff,
2744    )>,
2745    pub(crate) system_privileges: Vec<(
2746        proto::SystemPrivilegesKey,
2747        proto::SystemPrivilegesValue,
2748        Diff,
2749    )>,
2750    pub(crate) storage_collection_metadata: Vec<(
2751        proto::StorageCollectionMetadataKey,
2752        proto::StorageCollectionMetadataValue,
2753        Diff,
2754    )>,
2755    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2756    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2757    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2758    /// The upper of the catalog when the transaction started.
2759    pub(crate) upper: mz_repr::Timestamp,
2760}
2761
2762impl TransactionBatch {
2763    pub fn is_empty(&self) -> bool {
2764        let TransactionBatch {
2765            databases,
2766            schemas,
2767            items,
2768            comments,
2769            roles,
2770            role_auth,
2771            clusters,
2772            cluster_replicas,
2773            network_policies,
2774            introspection_sources,
2775            id_allocator,
2776            configs,
2777            settings,
2778            source_references,
2779            system_gid_mapping,
2780            system_configurations,
2781            default_privileges,
2782            system_privileges,
2783            storage_collection_metadata,
2784            unfinalized_shards,
2785            txn_wal_shard,
2786            audit_log_updates,
2787            upper: _,
2788        } = self;
2789        databases.is_empty()
2790            && schemas.is_empty()
2791            && items.is_empty()
2792            && comments.is_empty()
2793            && roles.is_empty()
2794            && role_auth.is_empty()
2795            && clusters.is_empty()
2796            && cluster_replicas.is_empty()
2797            && network_policies.is_empty()
2798            && introspection_sources.is_empty()
2799            && id_allocator.is_empty()
2800            && configs.is_empty()
2801            && settings.is_empty()
2802            && source_references.is_empty()
2803            && system_gid_mapping.is_empty()
2804            && system_configurations.is_empty()
2805            && default_privileges.is_empty()
2806            && system_privileges.is_empty()
2807            && storage_collection_metadata.is_empty()
2808            && unfinalized_shards.is_empty()
2809            && txn_wal_shard.is_empty()
2810            && audit_log_updates.is_empty()
2811    }
2812}
2813
2814#[derive(Debug, Clone, PartialEq, Eq)]
2815struct TransactionUpdate<V> {
2816    value: V,
2817    ts: Timestamp,
2818    diff: Diff,
2819}
2820
2821/// Utility trait to check for plan validity.
2822trait UniqueName {
2823    /// Does the item have a unique name? If yes, we can check for name equality in validity
2824    /// checking.
2825    const HAS_UNIQUE_NAME: bool;
2826    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2827    fn unique_name(&self) -> &str;
2828}
2829
2830mod unique_name {
2831    use crate::durable::objects::*;
2832
2833    macro_rules! impl_unique_name {
2834        ($($t:ty),* $(,)?) => {
2835            $(
2836                impl crate::durable::transaction::UniqueName for $t {
2837                    const HAS_UNIQUE_NAME: bool = true;
2838                    fn unique_name(&self) -> &str {
2839                        &self.name
2840                    }
2841                }
2842            )*
2843        };
2844    }
2845
2846    macro_rules! impl_no_unique_name {
2847        ($($t:ty),* $(,)?) => {
2848            $(
2849                impl crate::durable::transaction::UniqueName for $t {
2850                    const HAS_UNIQUE_NAME: bool = false;
2851                    fn unique_name(&self) -> &str {
2852                       ""
2853                    }
2854                }
2855            )*
2856        };
2857    }
2858
2859    impl_unique_name! {
2860        ClusterReplicaValue,
2861        ClusterValue,
2862        DatabaseValue,
2863        ItemValue,
2864        NetworkPolicyValue,
2865        RoleValue,
2866        SchemaValue,
2867    }
2868
2869    impl_no_unique_name!(
2870        (),
2871        ClusterIntrospectionSourceIndexValue,
2872        CommentValue,
2873        ConfigValue,
2874        DefaultPrivilegesValue,
2875        GidMappingValue,
2876        IdAllocValue,
2877        ServerConfigurationValue,
2878        SettingValue,
2879        SourceReferencesValue,
2880        StorageCollectionMetadataValue,
2881        SystemPrivilegesValue,
2882        TxnWalShardValue,
2883        RoleAuthValue,
2884    );
2885
2886    #[cfg(test)]
2887    mod test {
2888        impl_no_unique_name!(String,);
2889    }
2890}
2891
2892/// TableTransaction emulates some features of a typical SQL transaction over
2893/// table for a Collection.
2894///
2895/// It supports:
2896/// - uniqueness constraints
2897/// - transactional reads and writes (including read-your-writes before commit)
2898///
2899/// `K` is the primary key type. Multiple entries with the same key are disallowed.
2900/// `V` is the an arbitrary value type.
2901#[derive(Debug)]
2902struct TableTransaction<K, V> {
2903    initial: BTreeMap<K, V>,
2904    // The desired updates to keys after commit.
2905    // Invariant: Value is sorted by `ts`.
2906    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2907    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2908}
2909
2910impl<K, V> TableTransaction<K, V>
2911where
2912    K: Ord + Eq + Clone + Debug,
2913    V: Ord + Clone + Debug + UniqueName,
2914{
2915    /// Create a new TableTransaction with initial data.
2916    ///
2917    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
2918    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
2919    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
2920    /// over.
2921    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2922    where
2923        K: RustType<KP>,
2924        V: RustType<VP>,
2925    {
2926        let initial = initial
2927            .into_iter()
2928            .map(RustType::from_proto)
2929            .collect::<Result<_, _>>()?;
2930
2931        Ok(Self {
2932            initial,
2933            pending: BTreeMap::new(),
2934            uniqueness_violation: None,
2935        })
2936    }
2937
2938    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
2939    /// that determines whether there is a uniqueness violation among two values.
2940    fn new_with_uniqueness_fn<KP, VP>(
2941        initial: BTreeMap<KP, VP>,
2942        uniqueness_violation: fn(a: &V, b: &V) -> bool,
2943    ) -> Result<Self, TryFromProtoError>
2944    where
2945        K: RustType<KP>,
2946        V: RustType<VP>,
2947    {
2948        let initial = initial
2949            .into_iter()
2950            .map(RustType::from_proto)
2951            .collect::<Result<_, _>>()?;
2952
2953        Ok(Self {
2954            initial,
2955            pending: BTreeMap::new(),
2956            uniqueness_violation: Some(uniqueness_violation),
2957        })
2958    }
2959
2960    /// Consumes and returns the pending changes and their diffs. `Diff` is
2961    /// guaranteed to be 1 or -1.
2962    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2963    where
2964        K: RustType<KP>,
2965        V: RustType<VP>,
2966    {
2967        soft_assert_no_log!(self.verify().is_ok());
2968        // Pending describes the desired final state for some keys. K,V pairs should be
2969        // retracted if they already exist and were deleted or are being updated.
2970        self.pending
2971            .into_iter()
2972            .flat_map(|(k, v)| {
2973                let mut v: Vec<_> = v
2974                    .into_iter()
2975                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2976                    .collect();
2977                differential_dataflow::consolidation::consolidate(&mut v);
2978                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2979            })
2980            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2981            .collect()
2982    }
2983
2984    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
2985    ///
2986    /// Runtime is O(n^2), where n is the number of items in `self`, if
2987    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
2988    fn verify(&self) -> Result<(), DurableCatalogError> {
2989        if let Some(uniqueness_violation) = self.uniqueness_violation {
2990            // Compare each value to each other value and ensure they are unique.
2991            let items = self.values();
2992            if V::HAS_UNIQUE_NAME {
2993                let by_name: BTreeMap<_, _> = items
2994                    .iter()
2995                    .enumerate()
2996                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2997                    .collect();
2998                for (i, vi) in items.iter().enumerate() {
2999                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3000                        if i != *j && uniqueness_violation(vi, *vj) {
3001                            return Err(DurableCatalogError::UniquenessViolation);
3002                        }
3003                    }
3004                }
3005            } else {
3006                for (i, vi) in items.iter().enumerate() {
3007                    for (j, vj) in items.iter().enumerate() {
3008                        if i != j && uniqueness_violation(vi, vj) {
3009                            return Err(DurableCatalogError::UniquenessViolation);
3010                        }
3011                    }
3012                }
3013            }
3014        }
3015        soft_assert_no_log!(
3016            self.pending
3017                .values()
3018                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3019            "pending should be sorted by timestamp: {:?}",
3020            self.pending
3021        );
3022        Ok(())
3023    }
3024
3025    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3026    ///
3027    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3028    /// items in `keys`.
3029    fn verify_keys<'a>(
3030        &self,
3031        keys: impl IntoIterator<Item = &'a K>,
3032    ) -> Result<(), DurableCatalogError>
3033    where
3034        K: 'a,
3035    {
3036        if let Some(uniqueness_violation) = self.uniqueness_violation {
3037            let entries: Vec<_> = keys
3038                .into_iter()
3039                .filter_map(|key| self.get(key).map(|value| (key, value)))
3040                .collect();
3041            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3042            for (ki, vi) in self.items() {
3043                for (kj, vj) in &entries {
3044                    if ki != *kj && uniqueness_violation(vi, vj) {
3045                        return Err(DurableCatalogError::UniquenessViolation);
3046                    }
3047                }
3048            }
3049        }
3050        soft_assert_no_log!(self.verify().is_ok());
3051        Ok(())
3052    }
3053
3054    /// Iterates over the items viewable in the current transaction in arbitrary
3055    /// order and applies `f` on all key, value pairs.
3056    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3057        let mut seen = BTreeSet::new();
3058        for k in self.pending.keys() {
3059            seen.insert(k);
3060            let v = self.get(k);
3061            // Deleted items don't exist so shouldn't be visited, but still suppress
3062            // visiting the key later.
3063            if let Some(v) = v {
3064                f(k, v);
3065            }
3066        }
3067        for (k, v) in self.initial.iter() {
3068            // Add on initial items that don't have updates.
3069            if !seen.contains(k) {
3070                f(k, v);
3071            }
3072        }
3073    }
3074
3075    /// Returns the current value of `k`.
3076    fn get(&self, k: &K) -> Option<&V> {
3077        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3078        let mut updates = Vec::with_capacity(pending.len() + 1);
3079        if let Some(initial) = self.initial.get(k) {
3080            updates.push((initial, Diff::ONE));
3081        }
3082        updates.extend(
3083            pending
3084                .into_iter()
3085                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3086        );
3087
3088        differential_dataflow::consolidation::consolidate(&mut updates);
3089        assert!(updates.len() <= 1);
3090        updates.into_iter().next().map(|(v, _)| v)
3091    }
3092
3093    /// Returns the items viewable in the current transaction. The items are
3094    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3095    /// [`Self::for_values`].
3096    // Used by tests.
3097    #[cfg(test)]
3098    fn items_cloned(&self) -> BTreeMap<K, V> {
3099        let mut items = BTreeMap::new();
3100        self.for_values(|k, v| {
3101            items.insert(k.clone(), v.clone());
3102        });
3103        items
3104    }
3105
3106    /// Returns the items viewable in the current transaction as references. Returns a map
3107    /// of references.
3108    fn items(&self) -> BTreeMap<&K, &V> {
3109        let mut items = BTreeMap::new();
3110        self.for_values(|k, v| {
3111            items.insert(k, v);
3112        });
3113        items
3114    }
3115
3116    /// Returns the values viewable in the current transaction as references.
3117    fn values(&self) -> BTreeSet<&V> {
3118        let mut items = BTreeSet::new();
3119        self.for_values(|_, v| {
3120            items.insert(v);
3121        });
3122        items
3123    }
3124
3125    /// Returns the number of items viewable in the current transaction.
3126    fn len(&self) -> usize {
3127        let mut count = 0;
3128        self.for_values(|_, _| {
3129            count += 1;
3130        });
3131        count
3132    }
3133
3134    /// Iterates over the items viewable in the current transaction, and provides a
3135    /// map where additional pending items can be inserted, which will be appended
3136    /// to current pending items. Does not verify uniqueness.
3137    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3138        &mut self,
3139        mut f: F,
3140    ) {
3141        let mut pending = BTreeMap::new();
3142        self.for_values(|k, v| f(&mut pending, k, v));
3143        for (k, updates) in pending {
3144            self.pending.entry(k).or_default().extend(updates);
3145        }
3146    }
3147
3148    /// Inserts a new k,v pair.
3149    ///
3150    /// Returns an error if the uniqueness check failed or the key already exists.
3151    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3152        let mut violation = None;
3153        self.for_values(|for_k, for_v| {
3154            if &k == for_k {
3155                violation = Some(DurableCatalogError::DuplicateKey);
3156            }
3157            if let Some(uniqueness_violation) = self.uniqueness_violation {
3158                if uniqueness_violation(for_v, &v) {
3159                    violation = Some(DurableCatalogError::UniquenessViolation);
3160                }
3161            }
3162        });
3163        if let Some(violation) = violation {
3164            return Err(violation);
3165        }
3166        self.pending.entry(k).or_default().push(TransactionUpdate {
3167            value: v,
3168            ts,
3169            diff: Diff::ONE,
3170        });
3171        soft_assert_no_log!(self.verify().is_ok());
3172        Ok(())
3173    }
3174
3175    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3176    /// value should be updated, otherwise `None`. Returns the number of changed
3177    /// entries.
3178    ///
3179    /// Returns an error if the uniqueness check failed.
3180    ///
3181    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3182    /// better performance.
3183    fn update<F: Fn(&K, &V) -> Option<V>>(
3184        &mut self,
3185        f: F,
3186        ts: Timestamp,
3187    ) -> Result<Diff, DurableCatalogError> {
3188        let mut changed = Diff::ZERO;
3189        let mut keys = BTreeSet::new();
3190        // Keep a copy of pending in case of uniqueness violation.
3191        let pending = self.pending.clone();
3192        self.for_values_mut(|p, k, v| {
3193            if let Some(next) = f(k, v) {
3194                changed += Diff::ONE;
3195                keys.insert(k.clone());
3196                let updates = p.entry(k.clone()).or_default();
3197                updates.push(TransactionUpdate {
3198                    value: v.clone(),
3199                    ts,
3200                    diff: Diff::MINUS_ONE,
3201                });
3202                updates.push(TransactionUpdate {
3203                    value: next,
3204                    ts,
3205                    diff: Diff::ONE,
3206                });
3207            }
3208        });
3209        // Check for uniqueness violation.
3210        if let Err(err) = self.verify_keys(&keys) {
3211            self.pending = pending;
3212            Err(err)
3213        } else {
3214            Ok(changed)
3215        }
3216    }
3217
3218    /// Updates `k`, `v` pair if `k` already exists in `self`.
3219    ///
3220    /// Returns `true` if `k` was updated, `false` otherwise.
3221    /// Returns an error if the uniqueness check failed.
3222    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3223        if let Some(cur_v) = self.get(&k) {
3224            if v != *cur_v {
3225                self.set(k, Some(v), ts)?;
3226            }
3227            Ok(true)
3228        } else {
3229            Ok(false)
3230        }
3231    }
3232
3233    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3234    ///
3235    /// Returns the number of changed entries.
3236    /// Returns an error if the uniqueness check failed.
3237    fn update_by_keys(
3238        &mut self,
3239        kvs: impl IntoIterator<Item = (K, V)>,
3240        ts: Timestamp,
3241    ) -> Result<Diff, DurableCatalogError> {
3242        let kvs: Vec<_> = kvs
3243            .into_iter()
3244            .filter_map(|(k, v)| match self.get(&k) {
3245                // Record if updating this entry would be a no-op.
3246                Some(cur_v) => Some((*cur_v == v, k, v)),
3247                None => None,
3248            })
3249            .collect();
3250        let changed = kvs.len();
3251        let changed =
3252            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3253        let kvs = kvs
3254            .into_iter()
3255            // Filter out no-ops to save some work.
3256            .filter(|(no_op, _, _)| !no_op)
3257            .map(|(_, k, v)| (k, Some(v)))
3258            .collect();
3259        self.set_many(kvs, ts)?;
3260        Ok(changed)
3261    }
3262
3263    /// Set the value for a key. Returns the previous entry if the key existed,
3264    /// otherwise None.
3265    ///
3266    /// Returns an error if the uniqueness check failed.
3267    ///
3268    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3269    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3270        let prev = self.get(&k).cloned();
3271        let entry = self.pending.entry(k.clone()).or_default();
3272        let restore_len = entry.len();
3273
3274        match (v, prev.clone()) {
3275            (Some(v), Some(prev)) => {
3276                entry.push(TransactionUpdate {
3277                    value: prev,
3278                    ts,
3279                    diff: Diff::MINUS_ONE,
3280                });
3281                entry.push(TransactionUpdate {
3282                    value: v,
3283                    ts,
3284                    diff: Diff::ONE,
3285                });
3286            }
3287            (Some(v), None) => {
3288                entry.push(TransactionUpdate {
3289                    value: v,
3290                    ts,
3291                    diff: Diff::ONE,
3292                });
3293            }
3294            (None, Some(prev)) => {
3295                entry.push(TransactionUpdate {
3296                    value: prev,
3297                    ts,
3298                    diff: Diff::MINUS_ONE,
3299                });
3300            }
3301            (None, None) => {}
3302        }
3303
3304        // Check for uniqueness violation.
3305        if let Err(err) = self.verify_keys([&k]) {
3306            // Revert self.pending to the state it was in before calling this
3307            // function.
3308            let pending = self.pending.get_mut(&k).expect("inserted above");
3309            pending.truncate(restore_len);
3310            Err(err)
3311        } else {
3312            Ok(prev)
3313        }
3314    }
3315
3316    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3317    /// otherwise None.
3318    ///
3319    /// Returns an error if any uniqueness check failed.
3320    fn set_many(
3321        &mut self,
3322        kvs: BTreeMap<K, Option<V>>,
3323        ts: Timestamp,
3324    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3325        if kvs.is_empty() {
3326            return Ok(BTreeMap::new());
3327        }
3328
3329        let mut prevs = BTreeMap::new();
3330        let mut restores = BTreeMap::new();
3331
3332        for (k, v) in kvs {
3333            let prev = self.get(&k).cloned();
3334            let entry = self.pending.entry(k.clone()).or_default();
3335            restores.insert(k.clone(), entry.len());
3336
3337            match (v, prev.clone()) {
3338                (Some(v), Some(prev)) => {
3339                    entry.push(TransactionUpdate {
3340                        value: prev,
3341                        ts,
3342                        diff: Diff::MINUS_ONE,
3343                    });
3344                    entry.push(TransactionUpdate {
3345                        value: v,
3346                        ts,
3347                        diff: Diff::ONE,
3348                    });
3349                }
3350                (Some(v), None) => {
3351                    entry.push(TransactionUpdate {
3352                        value: v,
3353                        ts,
3354                        diff: Diff::ONE,
3355                    });
3356                }
3357                (None, Some(prev)) => {
3358                    entry.push(TransactionUpdate {
3359                        value: prev,
3360                        ts,
3361                        diff: Diff::MINUS_ONE,
3362                    });
3363                }
3364                (None, None) => {}
3365            }
3366
3367            prevs.insert(k, prev);
3368        }
3369
3370        // Check for uniqueness violation.
3371        if let Err(err) = self.verify_keys(prevs.keys()) {
3372            for (k, restore_len) in restores {
3373                // Revert self.pending to the state it was in before calling this
3374                // function.
3375                let pending = self.pending.get_mut(&k).expect("inserted above");
3376                pending.truncate(restore_len);
3377            }
3378            Err(err)
3379        } else {
3380            Ok(prevs)
3381        }
3382    }
3383
3384    /// Deletes items for which `f` returns true. Returns the keys and values of
3385    /// the deleted entries.
3386    ///
3387    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3388    /// better performance.
3389    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3390        let mut deleted = Vec::new();
3391        self.for_values_mut(|p, k, v| {
3392            if f(k, v) {
3393                deleted.push((k.clone(), v.clone()));
3394                p.entry(k.clone()).or_default().push(TransactionUpdate {
3395                    value: v.clone(),
3396                    ts,
3397                    diff: Diff::MINUS_ONE,
3398                });
3399            }
3400        });
3401        soft_assert_no_log!(self.verify().is_ok());
3402        deleted
3403    }
3404
3405    /// Deletes item with key `k`.
3406    ///
3407    /// Returns the value of the deleted entry, if it existed.
3408    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3409        self.set(k, None, ts)
3410            .expect("deleting an entry cannot violate uniqueness")
3411    }
3412
3413    /// Deletes items with key in `ks`.
3414    ///
3415    /// Returns the keys and values of the deleted entries.
3416    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3417        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3418        let prevs = self
3419            .set_many(kvs, ts)
3420            .expect("deleting entries cannot violate uniqueness");
3421        prevs
3422            .into_iter()
3423            .filter_map(|(k, v)| v.map(|v| (k, v)))
3424            .collect()
3425    }
3426}
3427
3428#[cfg(test)]
3429#[allow(clippy::unwrap_used)]
3430mod tests {
3431    use super::*;
3432
3433    use mz_ore::now::SYSTEM_TIME;
3434    use mz_ore::{assert_none, assert_ok};
3435    use mz_persist_client::cache::PersistClientCache;
3436    use mz_persist_types::PersistLocation;
3437    use semver::Version;
3438
3439    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3440    use crate::memory;
3441
3442    #[mz_ore::test]
3443    fn test_table_transaction_simple() {
3444        fn uniqueness_violation(a: &String, b: &String) -> bool {
3445            a == b
3446        }
3447        let mut table = TableTransaction::new_with_uniqueness_fn(
3448            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3449            uniqueness_violation,
3450        )
3451        .unwrap();
3452
3453        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3454        // for DurableCatalogError.
3455        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3456        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3457        assert!(
3458            table
3459                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3460                .is_err()
3461        );
3462        assert!(
3463            table
3464                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3465                .is_err()
3466        );
3467    }
3468
3469    #[mz_ore::test]
3470    fn test_table_transaction() {
3471        fn uniqueness_violation(a: &String, b: &String) -> bool {
3472            a == b
3473        }
3474        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3475
3476        fn commit(
3477            table: &mut BTreeMap<Vec<u8>, String>,
3478            mut pending: Vec<(Vec<u8>, String, Diff)>,
3479        ) {
3480            // Sort by diff so that we process retractions first.
3481            pending.sort_by(|a, b| a.2.cmp(&b.2));
3482            for (k, v, diff) in pending {
3483                if diff == Diff::MINUS_ONE {
3484                    let prev = table.remove(&k);
3485                    assert_eq!(prev, Some(v));
3486                } else if diff == Diff::ONE {
3487                    let prev = table.insert(k, v);
3488                    assert_eq!(prev, None);
3489                } else {
3490                    panic!("unexpected diff: {diff}");
3491                }
3492            }
3493        }
3494
3495        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3496        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3497        let mut table_txn =
3498            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3499        assert_eq!(table_txn.items_cloned(), table);
3500        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3501        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3502        assert_eq!(
3503            table_txn.items_cloned(),
3504            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3505        );
3506        assert_eq!(
3507            table_txn
3508                .update(|_k, _v| Some("v3".to_string()), 2)
3509                .unwrap(),
3510            Diff::ONE
3511        );
3512
3513        // Uniqueness violation.
3514        table_txn
3515            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3516            .unwrap_err();
3517
3518        table_txn
3519            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3520            .unwrap();
3521        assert_eq!(
3522            table_txn.items_cloned(),
3523            BTreeMap::from([
3524                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3525                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3526            ])
3527        );
3528        let err = table_txn
3529            .update(|_k, _v| Some("v1".to_string()), 5)
3530            .unwrap_err();
3531        assert!(
3532            matches!(err, DurableCatalogError::UniquenessViolation),
3533            "unexpected err: {err:?}"
3534        );
3535        let pending = table_txn.pending();
3536        assert_eq!(
3537            pending,
3538            vec![
3539                (
3540                    1i64.to_le_bytes().to_vec(),
3541                    "v1".to_string(),
3542                    Diff::MINUS_ONE
3543                ),
3544                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3545                (
3546                    2i64.to_le_bytes().to_vec(),
3547                    "v2".to_string(),
3548                    Diff::MINUS_ONE
3549                ),
3550                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3551            ]
3552        );
3553        commit(&mut table, pending);
3554        assert_eq!(
3555            table,
3556            BTreeMap::from([
3557                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3558                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3559            ])
3560        );
3561
3562        let mut table_txn =
3563            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3564        // Deleting then creating an item that has a uniqueness violation should work.
3565        assert_eq!(
3566            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3567            1
3568        );
3569        table_txn
3570            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3571            .unwrap();
3572        // Uniqueness violation in value.
3573        table_txn
3574            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3575            .unwrap_err();
3576        // Key already exists, expect error.
3577        table_txn
3578            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3579            .unwrap_err();
3580        assert_eq!(
3581            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3582            1
3583        );
3584        // Both the inserts work now because the key and uniqueness violation are gone.
3585        table_txn
3586            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3587            .unwrap();
3588        table_txn
3589            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3590            .unwrap();
3591        let pending = table_txn.pending();
3592        assert_eq!(
3593            pending,
3594            vec![
3595                (
3596                    1i64.to_le_bytes().to_vec(),
3597                    "v3".to_string(),
3598                    Diff::MINUS_ONE
3599                ),
3600                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3601                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3602            ]
3603        );
3604        commit(&mut table, pending);
3605        assert_eq!(
3606            table,
3607            BTreeMap::from([
3608                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3609                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3610                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3611            ])
3612        );
3613
3614        let mut table_txn =
3615            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3616        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3617        table_txn
3618            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3619            .unwrap();
3620
3621        commit(&mut table, table_txn.pending());
3622        assert_eq!(
3623            table,
3624            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3625        );
3626
3627        let mut table_txn =
3628            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3629        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3630        table_txn
3631            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3632            .unwrap();
3633        commit(&mut table, table_txn.pending());
3634        assert_eq!(
3635            table,
3636            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3637        );
3638
3639        // Verify we don't try to delete v3 or v4 during commit.
3640        let mut table_txn =
3641            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3642        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3643        table_txn
3644            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3645            .unwrap();
3646        table_txn
3647            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3648            .unwrap_err();
3649        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3650        table_txn
3651            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3652            .unwrap();
3653        commit(&mut table, table_txn.pending());
3654        assert_eq!(
3655            table.clone().into_iter().collect::<Vec<_>>(),
3656            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3657        );
3658
3659        // Test `set`.
3660        let mut table_txn =
3661            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3662        // Uniqueness violation.
3663        table_txn
3664            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3665            .unwrap_err();
3666        table_txn
3667            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3668            .unwrap();
3669        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3670        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3671        let pending = table_txn.pending();
3672        assert_eq!(
3673            pending,
3674            vec![
3675                (
3676                    1i64.to_le_bytes().to_vec(),
3677                    "v5".to_string(),
3678                    Diff::MINUS_ONE
3679                ),
3680                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3681            ]
3682        );
3683        commit(&mut table, pending);
3684        assert_eq!(
3685            table,
3686            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3687        );
3688
3689        // Duplicate `set`.
3690        let mut table_txn =
3691            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3692        table_txn
3693            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3694            .unwrap();
3695        let pending = table_txn.pending::<Vec<u8>, String>();
3696        assert!(pending.is_empty());
3697
3698        // Test `set_many`.
3699        let mut table_txn =
3700            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3701        // Uniqueness violation.
3702        table_txn
3703            .set_many(
3704                BTreeMap::from([
3705                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3706                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3707                ]),
3708                0,
3709            )
3710            .unwrap_err();
3711        table_txn
3712            .set_many(
3713                BTreeMap::from([
3714                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3715                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3716                ]),
3717                1,
3718            )
3719            .unwrap();
3720        table_txn
3721            .set_many(
3722                BTreeMap::from([
3723                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3724                    (3i64.to_le_bytes().to_vec(), None),
3725                ]),
3726                2,
3727            )
3728            .unwrap();
3729        let pending = table_txn.pending();
3730        assert_eq!(
3731            pending,
3732            vec![
3733                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3734                (
3735                    3i64.to_le_bytes().to_vec(),
3736                    "v6".to_string(),
3737                    Diff::MINUS_ONE
3738                ),
3739                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3740            ]
3741        );
3742        commit(&mut table, pending);
3743        assert_eq!(
3744            table,
3745            BTreeMap::from([
3746                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3747                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3748            ])
3749        );
3750
3751        // Duplicate `set_many`.
3752        let mut table_txn =
3753            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3754        table_txn
3755            .set_many(
3756                BTreeMap::from([
3757                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3758                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3759                ]),
3760                0,
3761            )
3762            .unwrap();
3763        let pending = table_txn.pending::<Vec<u8>, String>();
3764        assert!(pending.is_empty());
3765        commit(&mut table, pending);
3766        assert_eq!(
3767            table,
3768            BTreeMap::from([
3769                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3770                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3771            ])
3772        );
3773
3774        // Test `update_by_key`
3775        let mut table_txn =
3776            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3777        // Uniqueness violation.
3778        table_txn
3779            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3780            .unwrap_err();
3781        assert!(
3782            table_txn
3783                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3784                .unwrap()
3785        );
3786        assert!(
3787            !table_txn
3788                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3789                .unwrap()
3790        );
3791        let pending = table_txn.pending();
3792        assert_eq!(
3793            pending,
3794            vec![
3795                (
3796                    1i64.to_le_bytes().to_vec(),
3797                    "v6".to_string(),
3798                    Diff::MINUS_ONE
3799                ),
3800                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3801            ]
3802        );
3803        commit(&mut table, pending);
3804        assert_eq!(
3805            table,
3806            BTreeMap::from([
3807                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3808                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3809            ])
3810        );
3811
3812        // Duplicate `update_by_key`.
3813        let mut table_txn =
3814            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3815        assert!(
3816            table_txn
3817                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3818                .unwrap()
3819        );
3820        let pending = table_txn.pending::<Vec<u8>, String>();
3821        assert!(pending.is_empty());
3822        commit(&mut table, pending);
3823        assert_eq!(
3824            table,
3825            BTreeMap::from([
3826                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3827                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3828            ])
3829        );
3830
3831        // Test `update_by_keys`
3832        let mut table_txn =
3833            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3834        // Uniqueness violation.
3835        table_txn
3836            .update_by_keys(
3837                [
3838                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3839                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3840                ],
3841                0,
3842            )
3843            .unwrap_err();
3844        let n = table_txn
3845            .update_by_keys(
3846                [
3847                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3848                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3849                ],
3850                1,
3851            )
3852            .unwrap();
3853        assert_eq!(n, Diff::ONE);
3854        let n = table_txn
3855            .update_by_keys(
3856                [
3857                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3858                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3859                ],
3860                2,
3861            )
3862            .unwrap();
3863        assert_eq!(n, Diff::ZERO);
3864        let pending = table_txn.pending();
3865        assert_eq!(
3866            pending,
3867            vec![
3868                (
3869                    1i64.to_le_bytes().to_vec(),
3870                    "v8".to_string(),
3871                    Diff::MINUS_ONE
3872                ),
3873                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3874            ]
3875        );
3876        commit(&mut table, pending);
3877        assert_eq!(
3878            table,
3879            BTreeMap::from([
3880                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3881                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3882            ])
3883        );
3884
3885        // Duplicate `update_by_keys`.
3886        let mut table_txn =
3887            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3888        let n = table_txn
3889            .update_by_keys(
3890                [
3891                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3892                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3893                ],
3894                0,
3895            )
3896            .unwrap();
3897        assert_eq!(n, Diff::from(2));
3898        let pending = table_txn.pending::<Vec<u8>, String>();
3899        assert!(pending.is_empty());
3900        commit(&mut table, pending);
3901        assert_eq!(
3902            table,
3903            BTreeMap::from([
3904                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3905                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3906            ])
3907        );
3908
3909        // Test `delete_by_key`
3910        let mut table_txn =
3911            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3912        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3913        assert_eq!(prev, Some("v9".to_string()));
3914        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3915        assert_none!(prev);
3916        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3917        assert_none!(prev);
3918        let pending = table_txn.pending();
3919        assert_eq!(
3920            pending,
3921            vec![(
3922                1i64.to_le_bytes().to_vec(),
3923                "v9".to_string(),
3924                Diff::MINUS_ONE
3925            ),]
3926        );
3927        commit(&mut table, pending);
3928        assert_eq!(
3929            table,
3930            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3931        );
3932
3933        // Test `delete_by_keys`
3934        let mut table_txn =
3935            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3936        let prevs = table_txn.delete_by_keys(
3937            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3938            0,
3939        );
3940        assert_eq!(
3941            prevs,
3942            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3943        );
3944        let prevs = table_txn.delete_by_keys(
3945            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3946            1,
3947        );
3948        assert_eq!(prevs, vec![]);
3949        let prevs = table_txn.delete_by_keys(
3950            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3951            2,
3952        );
3953        assert_eq!(prevs, vec![]);
3954        let pending = table_txn.pending();
3955        assert_eq!(
3956            pending,
3957            vec![(
3958                42i64.to_le_bytes().to_vec(),
3959                "v7".to_string(),
3960                Diff::MINUS_ONE
3961            ),]
3962        );
3963        commit(&mut table, pending);
3964        assert_eq!(table, BTreeMap::new());
3965    }
3966
3967    #[mz_ore::test(tokio::test)]
3968    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3969    async fn test_savepoint() {
3970        const VERSION: Version = Version::new(26, 0, 0);
3971        let mut persist_cache = PersistClientCache::new_no_metrics();
3972        persist_cache.cfg.build_version = VERSION;
3973        let persist_client = persist_cache
3974            .open(PersistLocation::new_in_mem())
3975            .await
3976            .unwrap();
3977        let state_builder = TestCatalogStateBuilder::new(persist_client)
3978            .with_default_deploy_generation()
3979            .with_version(VERSION);
3980
3981        // Initialize catalog.
3982        let _ = state_builder
3983            .clone()
3984            .unwrap_build()
3985            .await
3986            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3987            .await
3988            .unwrap()
3989            .0;
3990        let mut savepoint_state = state_builder
3991            .unwrap_build()
3992            .await
3993            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3994            .await
3995            .unwrap()
3996            .0;
3997
3998        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3999        assert!(!initial_snapshot.is_empty());
4000
4001        let db_name = "db";
4002        let db_owner = RoleId::User(42);
4003        let db_privileges = Vec::new();
4004        let mut txn = savepoint_state.transaction().await.unwrap();
4005        let (db_id, db_oid) = txn
4006            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4007            .unwrap();
4008        let commit_ts = txn.upper();
4009        txn.commit_internal(commit_ts).await.unwrap();
4010        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4011        let update = updates.into_element();
4012
4013        assert_eq!(update.diff, StateDiff::Addition);
4014
4015        let db = match update.kind {
4016            memory::objects::StateUpdateKind::Database(db) => db,
4017            update => panic!("unexpected update: {update:?}"),
4018        };
4019
4020        assert_eq!(db_id, db.id);
4021        assert_eq!(db_oid, db.oid);
4022        assert_eq!(db_name, db.name);
4023        assert_eq!(db_owner, db.owner_id);
4024        assert_eq!(db_privileges, db.privileges);
4025    }
4026
4027    #[mz_ore::test]
4028    fn test_allocate_introspection_source_index_id() {
4029        let cluster_variant: u8 = 0b0000_0001;
4030        let cluster_id_inner: u64 =
4031            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4032        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4033
4034        let cluster_id = ClusterId::System(cluster_id_inner);
4035        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4036
4037        let introspection_source_index_id: u64 =
4038            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4039
4040        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4041        {
4042            let mut cluster_variant_mask = 0xFF << 56;
4043            cluster_variant_mask &= introspection_source_index_id;
4044            cluster_variant_mask >>= 56;
4045            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4046        }
4047
4048        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4049        {
4050            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4051            cluster_id_inner_mask &= introspection_source_index_id;
4052            cluster_id_inner_mask >>= 8;
4053            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4054        }
4055
4056        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4057        {
4058            let mut log_variant_mask = 0xFF;
4059            log_variant_mask &= introspection_source_index_id;
4060            assert_eq!(
4061                log_variant_mask,
4062                u64::from(timely_messages_received_log_variant)
4063            );
4064        }
4065
4066        let (catalog_item_id, global_id) =
4067            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4068
4069        assert_eq!(
4070            catalog_item_id,
4071            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4072        );
4073        assert_eq!(
4074            global_id,
4075            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4076        );
4077    }
4078}