Skip to main content

mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35    RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53    ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54    Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55    DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57    ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67    SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68    SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69    USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70    USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
77/// An operation also logically groups multiple catalog updates together.
78#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81    #[derivative(Debug = "ignore")]
82    #[derivative(PartialEq = "ignore")]
83    durable_catalog: &'a mut dyn DurableCatalogState,
84    databases: TableTransaction<DatabaseKey, DatabaseValue>,
85    schemas: TableTransaction<SchemaKey, SchemaValue>,
86    items: TableTransaction<ItemKey, ItemValue>,
87    comments: TableTransaction<CommentKey, CommentValue>,
88    roles: TableTransaction<RoleKey, RoleValue>,
89    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90    clusters: TableTransaction<ClusterKey, ClusterValue>,
91    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92    introspection_sources:
93        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95    configs: TableTransaction<ConfigKey, ConfigValue>,
96    settings: TableTransaction<SettingKey, SettingValue>,
97    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103    storage_collection_metadata:
104        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107    // Don't make this a table transaction so that it's not read into the
108    // in-memory cache.
109    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110    /// The upper of `durable_catalog` at the start of the transaction.
111    upper: mz_repr::Timestamp,
112    /// The ID of the current operation of this transaction.
113    op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117    pub fn new(
118        durable_catalog: &'a mut dyn DurableCatalogState,
119        Snapshot {
120            databases,
121            schemas,
122            roles,
123            role_auth,
124            items,
125            comments,
126            clusters,
127            network_policies,
128            cluster_replicas,
129            introspection_sources,
130            id_allocator,
131            configs,
132            settings,
133            source_references,
134            system_object_mappings,
135            system_configurations,
136            default_privileges,
137            system_privileges,
138            storage_collection_metadata,
139            unfinalized_shards,
140            txn_wal_shard,
141        }: Snapshot,
142        upper: mz_repr::Timestamp,
143    ) -> Result<Transaction<'a>, CatalogError> {
144        Ok(Transaction {
145            durable_catalog,
146            databases: TableTransaction::new_with_uniqueness_fn(
147                databases,
148                |a: &DatabaseValue, b| a.name == b.name,
149            )?,
150            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151                a.database_id == b.database_id && a.name == b.name
152            })?,
153            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154                a.schema_id == b.schema_id && a.name == b.name && {
155                    // `item_type` is slow, only compute if needed.
156                    let a_type = a.item_type();
157                    let b_type = b.item_type();
158                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161                }
162            })?,
163            comments: TableTransaction::new(comments)?,
164            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165                a.name == b.name
166            })?,
167            role_auth: TableTransaction::new(role_auth)?,
168            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169                a.name == b.name
170            })?,
171            network_policies: TableTransaction::new_with_uniqueness_fn(
172                network_policies,
173                |a: &NetworkPolicyValue, b| a.name == b.name,
174            )?,
175            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176                cluster_replicas,
177                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178            )?,
179            introspection_sources: TableTransaction::new(introspection_sources)?,
180            id_allocator: TableTransaction::new(id_allocator)?,
181            configs: TableTransaction::new(configs)?,
182            settings: TableTransaction::new(settings)?,
183            source_references: TableTransaction::new(source_references)?,
184            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185            system_configurations: TableTransaction::new(system_configurations)?,
186            default_privileges: TableTransaction::new(default_privileges)?,
187            system_privileges: TableTransaction::new(system_privileges)?,
188            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190            // Uniqueness violations for this value occur at the key rather than
191            // the value (the key is the unit struct `()` so this is a singleton
192            // value).
193            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194            audit_log_updates: Vec::new(),
195            upper,
196            op_id: 0,
197        })
198    }
199
200    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201        let key = ItemKey { id: *id };
202        self.items
203            .get(&key)
204            .map(|v| DurableType::from_key_value(key, v.clone()))
205    }
206
207    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208        self.items
209            .items()
210            .into_iter()
211            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212            .sorted_by_key(|Item { id, .. }| *id)
213    }
214
215    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216        self.insert_audit_log_events([event]);
217    }
218
219    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220        let events = events
221            .into_iter()
222            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223        self.audit_log_updates.extend(events);
224    }
225
226    pub fn insert_user_database(
227        &mut self,
228        database_name: &str,
229        owner_id: RoleId,
230        privileges: Vec<MzAclItem>,
231        temporary_oids: &HashSet<u32>,
232    ) -> Result<(DatabaseId, u32), CatalogError> {
233        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234        let id = DatabaseId::User(id);
235        let oid = self.allocate_oid(temporary_oids)?;
236        self.insert_database(id, database_name, owner_id, privileges, oid)?;
237        Ok((id, oid))
238    }
239
240    pub(crate) fn insert_database(
241        &mut self,
242        id: DatabaseId,
243        database_name: &str,
244        owner_id: RoleId,
245        privileges: Vec<MzAclItem>,
246        oid: u32,
247    ) -> Result<u32, CatalogError> {
248        match self.databases.insert(
249            DatabaseKey { id },
250            DatabaseValue {
251                name: database_name.to_string(),
252                owner_id,
253                privileges,
254                oid,
255            },
256            self.op_id,
257        ) {
258            Ok(_) => Ok(oid),
259            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260        }
261    }
262
263    pub fn insert_user_schema(
264        &mut self,
265        database_id: DatabaseId,
266        schema_name: &str,
267        owner_id: RoleId,
268        privileges: Vec<MzAclItem>,
269        temporary_oids: &HashSet<u32>,
270    ) -> Result<(SchemaId, u32), CatalogError> {
271        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272        let id = SchemaId::User(id);
273        let oid = self.allocate_oid(temporary_oids)?;
274        self.insert_schema(
275            id,
276            Some(database_id),
277            schema_name.to_string(),
278            owner_id,
279            privileges,
280            oid,
281        )?;
282        Ok((id, oid))
283    }
284
285    pub fn insert_system_schema(
286        &mut self,
287        schema_id: u64,
288        schema_name: &str,
289        owner_id: RoleId,
290        privileges: Vec<MzAclItem>,
291        oid: u32,
292    ) -> Result<(), CatalogError> {
293        let id = SchemaId::System(schema_id);
294        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295    }
296
297    pub(crate) fn insert_schema(
298        &mut self,
299        schema_id: SchemaId,
300        database_id: Option<DatabaseId>,
301        schema_name: String,
302        owner_id: RoleId,
303        privileges: Vec<MzAclItem>,
304        oid: u32,
305    ) -> Result<(), CatalogError> {
306        match self.schemas.insert(
307            SchemaKey { id: schema_id },
308            SchemaValue {
309                database_id,
310                name: schema_name.clone(),
311                owner_id,
312                privileges,
313                oid,
314            },
315            self.op_id,
316        ) {
317            Ok(_) => Ok(()),
318            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319        }
320    }
321
322    pub fn insert_builtin_role(
323        &mut self,
324        id: RoleId,
325        name: String,
326        attributes: RoleAttributesRaw,
327        membership: RoleMembership,
328        vars: RoleVars,
329        oid: u32,
330    ) -> Result<RoleId, CatalogError> {
331        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332        self.insert_role(id, name, attributes, membership, vars, oid)?;
333        Ok(id)
334    }
335
336    pub fn insert_user_role(
337        &mut self,
338        name: String,
339        attributes: RoleAttributesRaw,
340        membership: RoleMembership,
341        vars: RoleVars,
342        temporary_oids: &HashSet<u32>,
343    ) -> Result<(RoleId, u32), CatalogError> {
344        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345        let id = RoleId::User(id);
346        let oid = self.allocate_oid(temporary_oids)?;
347        self.insert_role(id, name, attributes, membership, vars, oid)?;
348        Ok((id, oid))
349    }
350
351    fn insert_role(
352        &mut self,
353        id: RoleId,
354        name: String,
355        attributes: RoleAttributesRaw,
356        membership: RoleMembership,
357        vars: RoleVars,
358        oid: u32,
359    ) -> Result<(), CatalogError> {
360        if let Some(ref password) = attributes.password {
361            let hash = mz_auth::hash::scram256_hash(
362                password,
363                &attributes
364                    .scram_iterations
365                    .or_else(|| {
366                        soft_panic_or_log!(
367                            "Hash iterations must be set if a password is provided."
368                        );
369                        None
370                    })
371                    // This should never happen, but rather than panicking we'll
372                    // set a known secure value as a fallback.
373                    .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374            )
375            .expect("password hash should be valid");
376            match self.role_auth.insert(
377                RoleAuthKey { role_id: id },
378                RoleAuthValue {
379                    password_hash: Some(hash),
380                    updated_at: SYSTEM_TIME(),
381                },
382                self.op_id,
383            ) {
384                Ok(_) => {}
385                Err(_) => {
386                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387                }
388            }
389        }
390
391        match self.roles.insert(
392            RoleKey { id },
393            RoleValue {
394                name: name.clone(),
395                attributes: attributes.into(),
396                membership,
397                vars,
398                oid,
399            },
400            self.op_id,
401        ) {
402            Ok(_) => Ok(()),
403            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404        }
405    }
406
407    /// Panics if any introspection source id is not a system id
408    pub fn insert_user_cluster(
409        &mut self,
410        cluster_id: ClusterId,
411        cluster_name: &str,
412        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413        owner_id: RoleId,
414        privileges: Vec<MzAclItem>,
415        config: ClusterConfig,
416        temporary_oids: &HashSet<u32>,
417    ) -> Result<(), CatalogError> {
418        self.insert_cluster(
419            cluster_id,
420            cluster_name,
421            introspection_source_indexes,
422            owner_id,
423            privileges,
424            config,
425            temporary_oids,
426        )
427    }
428
429    /// Panics if any introspection source id is not a system id
430    pub fn insert_system_cluster(
431        &mut self,
432        cluster_name: &str,
433        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434        privileges: Vec<MzAclItem>,
435        owner_id: RoleId,
436        config: ClusterConfig,
437        temporary_oids: &HashSet<u32>,
438    ) -> Result<ClusterId, CatalogError> {
439        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441        self.insert_cluster(
442            cluster_id,
443            cluster_name,
444            introspection_source_indexes,
445            owner_id,
446            privileges,
447            config,
448            temporary_oids,
449        )?;
450        Ok(cluster_id)
451    }
452
453    fn insert_cluster(
454        &mut self,
455        cluster_id: ClusterId,
456        cluster_name: &str,
457        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
458        owner_id: RoleId,
459        privileges: Vec<MzAclItem>,
460        config: ClusterConfig,
461        temporary_oids: &HashSet<u32>,
462    ) -> Result<(), CatalogError> {
463        if let Err(_) = self.clusters.insert(
464            ClusterKey { id: cluster_id },
465            ClusterValue {
466                name: cluster_name.to_string(),
467                owner_id,
468                privileges,
469                config,
470            },
471            self.op_id,
472        ) {
473            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
474        };
475
476        let amount = usize_to_u64(introspection_source_indexes.len());
477        let oids = self.allocate_oids(amount, temporary_oids)?;
478        let introspection_source_indexes: Vec<_> = introspection_source_indexes
479            .into_iter()
480            .zip_eq(oids)
481            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
482            .collect();
483        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
484            let introspection_source_index = IntrospectionSourceIndex {
485                cluster_id,
486                name: builtin.name.to_string(),
487                item_id,
488                index_id,
489                oid,
490            };
491            let (key, value) = introspection_source_index.into_key_value();
492            self.introspection_sources
493                .insert(key, value, self.op_id)
494                .expect("no uniqueness violation");
495        }
496
497        Ok(())
498    }
499
500    pub fn rename_cluster(
501        &mut self,
502        cluster_id: ClusterId,
503        cluster_name: &str,
504        cluster_to_name: &str,
505    ) -> Result<(), CatalogError> {
506        let key = ClusterKey { id: cluster_id };
507
508        match self.clusters.update(
509            |k, v| {
510                if *k == key {
511                    let mut value = v.clone();
512                    value.name = cluster_to_name.to_string();
513                    Some(value)
514                } else {
515                    None
516                }
517            },
518            self.op_id,
519        )? {
520            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
521            Diff::ONE => Ok(()),
522            n => panic!(
523                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
524            ),
525        }
526    }
527
528    pub fn rename_cluster_replica(
529        &mut self,
530        replica_id: ReplicaId,
531        replica_name: &QualifiedReplica,
532        replica_to_name: &str,
533    ) -> Result<(), CatalogError> {
534        let key = ClusterReplicaKey { id: replica_id };
535
536        match self.cluster_replicas.update(
537            |k, v| {
538                if *k == key {
539                    let mut value = v.clone();
540                    value.name = replica_to_name.to_string();
541                    Some(value)
542                } else {
543                    None
544                }
545            },
546            self.op_id,
547        )? {
548            Diff::ZERO => {
549                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
550            }
551            Diff::ONE => Ok(()),
552            n => panic!(
553                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
554            ),
555        }
556    }
557
558    pub fn insert_cluster_replica(
559        &mut self,
560        cluster_id: ClusterId,
561        replica_name: &str,
562        config: ReplicaConfig,
563        owner_id: RoleId,
564    ) -> Result<ReplicaId, CatalogError> {
565        let replica_id = match cluster_id {
566            ClusterId::System(_) => self.allocate_system_replica_id()?,
567            ClusterId::User(_) => self.allocate_user_replica_id()?,
568        };
569        self.insert_cluster_replica_with_id(
570            cluster_id,
571            replica_id,
572            replica_name,
573            config,
574            owner_id,
575        )?;
576        Ok(replica_id)
577    }
578
579    pub(crate) fn insert_cluster_replica_with_id(
580        &mut self,
581        cluster_id: ClusterId,
582        replica_id: ReplicaId,
583        replica_name: &str,
584        config: ReplicaConfig,
585        owner_id: RoleId,
586    ) -> Result<(), CatalogError> {
587        if let Err(_) = self.cluster_replicas.insert(
588            ClusterReplicaKey { id: replica_id },
589            ClusterReplicaValue {
590                cluster_id,
591                name: replica_name.into(),
592                config,
593                owner_id,
594            },
595            self.op_id,
596        ) {
597            let cluster = self
598                .clusters
599                .get(&ClusterKey { id: cluster_id })
600                .expect("cluster exists");
601            return Err(SqlCatalogError::DuplicateReplica(
602                replica_name.to_string(),
603                cluster.name.to_string(),
604            )
605            .into());
606        };
607        Ok(())
608    }
609
610    pub fn insert_user_network_policy(
611        &mut self,
612        name: String,
613        rules: Vec<NetworkPolicyRule>,
614        privileges: Vec<MzAclItem>,
615        owner_id: RoleId,
616        temporary_oids: &HashSet<u32>,
617    ) -> Result<NetworkPolicyId, CatalogError> {
618        let oid = self.allocate_oid(temporary_oids)?;
619        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
620        let id = NetworkPolicyId::User(id);
621        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
622    }
623
624    pub fn insert_network_policy(
625        &mut self,
626        id: NetworkPolicyId,
627        name: String,
628        rules: Vec<NetworkPolicyRule>,
629        privileges: Vec<MzAclItem>,
630        owner_id: RoleId,
631        oid: u32,
632    ) -> Result<NetworkPolicyId, CatalogError> {
633        match self.network_policies.insert(
634            NetworkPolicyKey { id },
635            NetworkPolicyValue {
636                name: name.clone(),
637                rules,
638                privileges,
639                owner_id,
640                oid,
641            },
642            self.op_id,
643        ) {
644            Ok(_) => Ok(id),
645            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
646        }
647    }
648
649    /// Updates persisted information about persisted introspection source
650    /// indexes.
651    ///
652    /// Panics if provided id is not a system id.
653    pub fn update_introspection_source_index_gids(
654        &mut self,
655        mappings: impl Iterator<
656            Item = (
657                ClusterId,
658                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
659            ),
660        >,
661    ) -> Result<(), CatalogError> {
662        for (cluster_id, updates) in mappings {
663            for (name, item_id, index_id, oid) in updates {
664                let introspection_source_index = IntrospectionSourceIndex {
665                    cluster_id,
666                    name,
667                    item_id,
668                    index_id,
669                    oid,
670                };
671                let (key, value) = introspection_source_index.into_key_value();
672
673                let prev = self
674                    .introspection_sources
675                    .set(key, Some(value), self.op_id)?;
676                if prev.is_none() {
677                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
678                        "{index_id}"
679                    ))
680                    .into());
681                }
682            }
683        }
684        Ok(())
685    }
686
687    pub fn insert_user_item(
688        &mut self,
689        id: CatalogItemId,
690        global_id: GlobalId,
691        schema_id: SchemaId,
692        item_name: &str,
693        create_sql: String,
694        owner_id: RoleId,
695        privileges: Vec<MzAclItem>,
696        temporary_oids: &HashSet<u32>,
697        versions: BTreeMap<RelationVersion, GlobalId>,
698    ) -> Result<u32, CatalogError> {
699        let oid = self.allocate_oid(temporary_oids)?;
700        self.insert_item(
701            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
702        )?;
703        Ok(oid)
704    }
705
706    pub fn insert_item(
707        &mut self,
708        id: CatalogItemId,
709        oid: u32,
710        global_id: GlobalId,
711        schema_id: SchemaId,
712        item_name: &str,
713        create_sql: String,
714        owner_id: RoleId,
715        privileges: Vec<MzAclItem>,
716        extra_versions: BTreeMap<RelationVersion, GlobalId>,
717    ) -> Result<(), CatalogError> {
718        match self.items.insert(
719            ItemKey { id },
720            ItemValue {
721                schema_id,
722                name: item_name.to_string(),
723                create_sql,
724                owner_id,
725                privileges,
726                oid,
727                global_id,
728                extra_versions,
729            },
730            self.op_id,
731        ) {
732            Ok(_) => Ok(()),
733            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
734        }
735    }
736
737    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
738        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
739    }
740
741    pub fn get_and_increment_id_by(
742        &mut self,
743        key: String,
744        amount: u64,
745    ) -> Result<Vec<u64>, CatalogError> {
746        assert!(
747            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
748            "system item IDs cannot be allocated outside of bootstrap"
749        );
750
751        let current_id = self
752            .id_allocator
753            .items()
754            .get(&IdAllocKey { name: key.clone() })
755            .unwrap_or_else(|| panic!("{key} id allocator missing"))
756            .next_id;
757        let next_id = current_id
758            .checked_add(amount)
759            .ok_or(SqlCatalogError::IdExhaustion)?;
760        let prev = self.id_allocator.set(
761            IdAllocKey { name: key },
762            Some(IdAllocValue { next_id }),
763            self.op_id,
764        )?;
765        assert_eq!(
766            prev,
767            Some(IdAllocValue {
768                next_id: current_id
769            })
770        );
771        Ok((current_id..next_id).collect())
772    }
773
774    pub fn allocate_system_item_ids(
775        &mut self,
776        amount: u64,
777    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
778        assert!(
779            !self.durable_catalog.is_bootstrap_complete(),
780            "we can only allocate system item IDs during bootstrap"
781        );
782        Ok(self
783            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
784            .into_iter()
785            // TODO(alter_table): Use separate ID allocators.
786            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
787            .collect())
788    }
789
790    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
791    /// from the `cluster_id` and `log_variant`.
792    ///
793    /// Introspection source indexes are a special edge case of items. They are considered system
794    /// items, but they are the only system item that can be created by the user at any time. All
795    /// other system items can only be created by the system during the startup of an upgrade.
796    ///
797    /// Furthermore, all other system item IDs are allocated deterministically in the same order
798    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
799    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
800    /// only one of them can successfully write the IDs down to the catalog. This removes the need
801    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
802    ///
803    /// Since introspection IDs can be allocated at any time, read-only instances would either need
804    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
805    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
806    ///
807    /// Introspection source index IDs are 64 bit integers, with the following format (not to
808    /// scale):
809    ///
810    /// -------------------------------------------------------------
811    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
812    /// |--------------------|------------------------|-------------|
813    /// |       8-bits       |         48-bits        |   8-bits    |
814    /// -------------------------------------------------------------
815    ///
816    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
817    ///                          to.
818    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
819    ///                          belongs to.
820    /// Log Variant:             A unique number indicating the log variant this index is on.
821    pub fn allocate_introspection_source_index_id(
822        cluster_id: &ClusterId,
823        log_variant: LogVariant,
824    ) -> (CatalogItemId, GlobalId) {
825        let cluster_variant: u8 = match cluster_id {
826            ClusterId::System(_) => 1,
827            ClusterId::User(_) => 2,
828        };
829        let cluster_id: u64 = cluster_id.inner_id();
830        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
831        assert_eq!(
832            CLUSTER_ID_MASK & cluster_id,
833            0,
834            "invalid cluster ID: {cluster_id}"
835        );
836        let log_variant: u8 = match log_variant {
837            LogVariant::Timely(TimelyLog::Operates) => 1,
838            LogVariant::Timely(TimelyLog::Channels) => 2,
839            LogVariant::Timely(TimelyLog::Elapsed) => 3,
840            LogVariant::Timely(TimelyLog::Histogram) => 4,
841            LogVariant::Timely(TimelyLog::Addresses) => 5,
842            LogVariant::Timely(TimelyLog::Parks) => 6,
843            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
844            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
845            LogVariant::Timely(TimelyLog::Reachability) => 9,
846            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
847            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
848            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
849            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
850            LogVariant::Differential(DifferentialLog::Sharing) => 14,
851            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
852            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
853            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
854            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
855            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
856            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
857            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
858            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
859            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
860            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
861            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
862            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
863            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
864            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
865            LogVariant::Compute(ComputeLog::LirMapping) => 30,
866            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
867            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
868            LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
869        };
870
871        let mut id: u64 = u64::from(cluster_variant) << 56;
872        id |= cluster_id << 8;
873        id |= u64::from(log_variant);
874
875        (
876            CatalogItemId::IntrospectionSourceIndex(id),
877            GlobalId::IntrospectionSourceIndex(id),
878        )
879    }
880
881    pub fn allocate_user_item_ids(
882        &mut self,
883        amount: u64,
884    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
885        Ok(self
886            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
887            .into_iter()
888            // TODO(alter_table): Use separate ID allocators.
889            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
890            .collect())
891    }
892
893    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
894        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
895        Ok(ReplicaId::User(id))
896    }
897
898    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
899        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
900        Ok(ReplicaId::System(id))
901    }
902
903    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
904        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
905    }
906
907    pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
908        self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
909    }
910
911    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
912    /// object.
913    #[mz_ore::instrument]
914    fn allocate_oids(
915        &mut self,
916        amount: u64,
917        temporary_oids: &HashSet<u32>,
918    ) -> Result<Vec<u32>, CatalogError> {
919        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
920        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
921        struct UserOid(u32);
922
923        impl UserOid {
924            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
925                if oid < FIRST_USER_OID {
926                    Err(anyhow!("invalid user OID {oid}"))
927                } else {
928                    Ok(UserOid(oid))
929                }
930            }
931        }
932
933        impl std::ops::AddAssign<u32> for UserOid {
934            fn add_assign(&mut self, rhs: u32) {
935                let (res, overflow) = self.0.overflowing_add(rhs);
936                self.0 = if overflow { FIRST_USER_OID + res } else { res };
937            }
938        }
939
940        if amount > u32::MAX.into() {
941            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
942        }
943
944        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
945        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
946        // However, benchmarking shows that this doesn't make a noticeable difference and the other
947        // approach requires making sure that allocator always stays in-sync which can be
948        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
949        let mut allocated_oids = HashSet::with_capacity(
950            self.databases.len()
951                + self.schemas.len()
952                + self.roles.len()
953                + self.items.len()
954                + self.introspection_sources.len()
955                + temporary_oids.len(),
956        );
957        self.databases.for_values(|_, value| {
958            allocated_oids.insert(value.oid);
959        });
960        self.schemas.for_values(|_, value| {
961            allocated_oids.insert(value.oid);
962        });
963        self.roles.for_values(|_, value| {
964            allocated_oids.insert(value.oid);
965        });
966        self.items.for_values(|_, value| {
967            allocated_oids.insert(value.oid);
968        });
969        self.introspection_sources.for_values(|_, value| {
970            allocated_oids.insert(value.oid);
971        });
972
973        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
974
975        let start_oid: u32 = self
976            .id_allocator
977            .items()
978            .get(&IdAllocKey {
979                name: OID_ALLOC_KEY.to_string(),
980            })
981            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
982            .next_id
983            .try_into()
984            .expect("we should never persist an oid outside of the u32 range");
985        let mut current_oid = UserOid::new(start_oid)
986            .expect("we should never persist an oid outside of user OID range");
987        let mut oids = Vec::new();
988        while oids.len() < u64_to_usize(amount) {
989            if !is_allocated(current_oid.0) {
990                oids.push(current_oid.0);
991            }
992            current_oid += 1;
993
994            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
995                // We've exhausted all possible OIDs and still don't have `amount`.
996                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
997            }
998        }
999
1000        let next_id = current_oid.0;
1001        let prev = self.id_allocator.set(
1002            IdAllocKey {
1003                name: OID_ALLOC_KEY.to_string(),
1004            },
1005            Some(IdAllocValue {
1006                next_id: next_id.into(),
1007            }),
1008            self.op_id,
1009        )?;
1010        assert_eq!(
1011            prev,
1012            Some(IdAllocValue {
1013                next_id: start_oid.into(),
1014            })
1015        );
1016
1017        Ok(oids)
1018    }
1019
1020    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1021    /// object.
1022    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1023        self.allocate_oids(1, temporary_oids)
1024            .map(|oids| oids.into_element())
1025    }
1026
1027    /// Exports the current state of this transaction as a [`Snapshot`].
1028    ///
1029    /// This merges each `TableTransaction`'s initial data with its pending
1030    /// changes to produce the current view, then converts back to proto types.
1031    /// Used to persist transaction state between incremental DDL dry runs so
1032    /// the next dry run's fresh `Transaction` starts in sync with the
1033    /// accumulated `CatalogState`.
1034    pub fn current_snapshot(&self) -> Snapshot {
1035        Snapshot {
1036            databases: self.databases.current_items_proto(),
1037            schemas: self.schemas.current_items_proto(),
1038            roles: self.roles.current_items_proto(),
1039            role_auth: self.role_auth.current_items_proto(),
1040            items: self.items.current_items_proto(),
1041            comments: self.comments.current_items_proto(),
1042            clusters: self.clusters.current_items_proto(),
1043            network_policies: self.network_policies.current_items_proto(),
1044            cluster_replicas: self.cluster_replicas.current_items_proto(),
1045            introspection_sources: self.introspection_sources.current_items_proto(),
1046            id_allocator: self.id_allocator.current_items_proto(),
1047            configs: self.configs.current_items_proto(),
1048            settings: self.settings.current_items_proto(),
1049            system_object_mappings: self.system_gid_mapping.current_items_proto(),
1050            system_configurations: self.system_configurations.current_items_proto(),
1051            default_privileges: self.default_privileges.current_items_proto(),
1052            source_references: self.source_references.current_items_proto(),
1053            system_privileges: self.system_privileges.current_items_proto(),
1054            storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1055            unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1056            txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1057        }
1058    }
1059
1060    pub(crate) fn insert_id_allocator(
1061        &mut self,
1062        name: String,
1063        next_id: u64,
1064    ) -> Result<(), CatalogError> {
1065        match self.id_allocator.insert(
1066            IdAllocKey { name: name.clone() },
1067            IdAllocValue { next_id },
1068            self.op_id,
1069        ) {
1070            Ok(_) => Ok(()),
1071            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1072        }
1073    }
1074
1075    /// Removes the database `id` from the transaction.
1076    ///
1077    /// Returns an error if `id` is not found.
1078    ///
1079    /// Runtime is linear with respect to the total number of databases in the catalog.
1080    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1081    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1082        let prev = self
1083            .databases
1084            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1085        if prev.is_some() {
1086            Ok(())
1087        } else {
1088            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1089        }
1090    }
1091
1092    /// Removes all databases in `databases` from the transaction.
1093    ///
1094    /// Returns an error if any id in `databases` is not found.
1095    ///
1096    /// NOTE: On error, there still may be some databases removed from the transaction. It
1097    /// is up to the caller to either abort the transaction or commit.
1098    pub fn remove_databases(
1099        &mut self,
1100        databases: &BTreeSet<DatabaseId>,
1101    ) -> Result<(), CatalogError> {
1102        if databases.is_empty() {
1103            return Ok(());
1104        }
1105
1106        let to_remove = databases
1107            .iter()
1108            .map(|id| (DatabaseKey { id: *id }, None))
1109            .collect();
1110        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1111        prev.retain(|_k, val| val.is_none());
1112
1113        if !prev.is_empty() {
1114            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1115            return Err(SqlCatalogError::UnknownDatabase(err).into());
1116        }
1117
1118        Ok(())
1119    }
1120
1121    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1122    ///
1123    /// Returns an error if `(database_id, schema_id)` is not found.
1124    ///
1125    /// Runtime is linear with respect to the total number of schemas in the catalog.
1126    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1127    pub fn remove_schema(
1128        &mut self,
1129        database_id: &Option<DatabaseId>,
1130        schema_id: &SchemaId,
1131    ) -> Result<(), CatalogError> {
1132        let prev = self
1133            .schemas
1134            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1135        if prev.is_some() {
1136            Ok(())
1137        } else {
1138            let database_name = match database_id {
1139                Some(id) => format!("{id}."),
1140                None => "".to_string(),
1141            };
1142            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1143        }
1144    }
1145
1146    /// Removes all schemas in `schemas` from the transaction.
1147    ///
1148    /// Returns an error if any id in `schemas` is not found.
1149    ///
1150    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1151    /// is up to the caller to either abort the transaction or commit.
1152    pub fn remove_schemas(
1153        &mut self,
1154        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1155    ) -> Result<(), CatalogError> {
1156        if schemas.is_empty() {
1157            return Ok(());
1158        }
1159
1160        let to_remove = schemas
1161            .iter()
1162            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1163            .collect();
1164        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1165        prev.retain(|_k, v| v.is_none());
1166
1167        if !prev.is_empty() {
1168            let err = prev
1169                .keys()
1170                .map(|k| {
1171                    let db_spec = schemas.get(&k.id).expect("should_exist");
1172                    let db_name = match db_spec {
1173                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1174                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1175                    };
1176                    format!("{}.{}", db_name, k.id)
1177                })
1178                .join(", ");
1179
1180            return Err(SqlCatalogError::UnknownSchema(err).into());
1181        }
1182
1183        Ok(())
1184    }
1185
1186    pub fn remove_source_references(
1187        &mut self,
1188        source_id: CatalogItemId,
1189    ) -> Result<(), CatalogError> {
1190        let deleted = self
1191            .source_references
1192            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1193            .is_some();
1194        if deleted {
1195            Ok(())
1196        } else {
1197            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1198        }
1199    }
1200
1201    /// Removes all user roles in `roles` from the transaction.
1202    ///
1203    /// Returns an error if any id in `roles` is not found.
1204    ///
1205    /// NOTE: On error, there still may be some roles removed from the transaction. It
1206    /// is up to the caller to either abort the transaction or commit.
1207    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1208        assert!(
1209            roles.iter().all(|id| id.is_user()),
1210            "cannot delete non-user roles"
1211        );
1212        self.remove_roles(roles)
1213    }
1214
1215    /// Removes all roles in `roles` from the transaction.
1216    ///
1217    /// Returns an error if any id in `roles` is not found.
1218    ///
1219    /// NOTE: On error, there still may be some roles removed from the transaction. It
1220    /// is up to the caller to either abort the transaction or commit.
1221    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1222        if roles.is_empty() {
1223            return Ok(());
1224        }
1225
1226        let to_remove_keys = roles
1227            .iter()
1228            .map(|role_id| RoleKey { id: *role_id })
1229            .collect::<Vec<_>>();
1230
1231        let to_remove_roles = to_remove_keys
1232            .iter()
1233            .map(|role_key| (role_key.clone(), None))
1234            .collect();
1235
1236        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1237
1238        let to_remove_role_auth = to_remove_keys
1239            .iter()
1240            .map(|role_key| {
1241                (
1242                    RoleAuthKey {
1243                        role_id: role_key.id,
1244                    },
1245                    None,
1246                )
1247            })
1248            .collect();
1249
1250        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1251
1252        prev.retain(|_k, v| v.is_none());
1253        if !prev.is_empty() {
1254            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1255            return Err(SqlCatalogError::UnknownRole(err).into());
1256        }
1257
1258        role_auth_prev.retain(|_k, v| v.is_none());
1259        // The reason we don't to the same check as above is that the role auth table
1260        // is not required to have all roles in the role table.
1261
1262        Ok(())
1263    }
1264
1265    /// Removes all cluster in `clusters` from the transaction.
1266    ///
1267    /// Returns an error if any id in `clusters` is not found.
1268    ///
1269    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1270    /// the caller to either abort the transaction or commit.
1271    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1272        if clusters.is_empty() {
1273            return Ok(());
1274        }
1275
1276        let to_remove = clusters
1277            .iter()
1278            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1279            .collect();
1280        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1281
1282        prev.retain(|_k, v| v.is_none());
1283        if !prev.is_empty() {
1284            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1285            return Err(SqlCatalogError::UnknownCluster(err).into());
1286        }
1287
1288        // Cascade delete introspection sources and cluster replicas.
1289        //
1290        // TODO(benesch): this doesn't seem right. Cascade deletions should
1291        // be entirely the domain of the higher catalog layer, not the
1292        // storage layer.
1293        self.cluster_replicas
1294            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1295        self.introspection_sources
1296            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1297
1298        Ok(())
1299    }
1300
1301    /// Removes the cluster replica `id` from the transaction.
1302    ///
1303    /// Returns an error if `id` is not found.
1304    ///
1305    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1306    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1307    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1308        let deleted = self
1309            .cluster_replicas
1310            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1311            .is_some();
1312        if deleted {
1313            Ok(())
1314        } else {
1315            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1316        }
1317    }
1318
1319    /// Removes all cluster replicas in `replicas` from the transaction.
1320    ///
1321    /// Returns an error if any id in `replicas` is not found.
1322    ///
1323    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1324    /// is up to the caller to either abort the transaction or commit.
1325    pub fn remove_cluster_replicas(
1326        &mut self,
1327        replicas: &BTreeSet<ReplicaId>,
1328    ) -> Result<(), CatalogError> {
1329        if replicas.is_empty() {
1330            return Ok(());
1331        }
1332
1333        let to_remove = replicas
1334            .iter()
1335            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1336            .collect();
1337        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1338
1339        prev.retain(|_k, v| v.is_none());
1340        if !prev.is_empty() {
1341            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1342            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1343        }
1344
1345        Ok(())
1346    }
1347
1348    /// Removes item `id` from the transaction.
1349    ///
1350    /// Returns an error if `id` is not found.
1351    ///
1352    /// Runtime is linear with respect to the total number of items in the catalog.
1353    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1354    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1355        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1356        if prev.is_some() {
1357            Ok(())
1358        } else {
1359            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1360        }
1361    }
1362
1363    /// Removes all items in `ids` from the transaction.
1364    ///
1365    /// Returns an error if any id in `ids` is not found.
1366    ///
1367    /// NOTE: On error, there still may be some items removed from the transaction. It is
1368    /// up to the caller to either abort the transaction or commit.
1369    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1370        if ids.is_empty() {
1371            return Ok(());
1372        }
1373
1374        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1375        let n = self.items.delete_by_keys(ks, self.op_id).len();
1376        if n == ids.len() {
1377            Ok(())
1378        } else {
1379            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1380            let mut unknown = ids.difference(&item_ids);
1381            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1382        }
1383    }
1384
1385    /// Removes all system object mappings in `descriptions` from the transaction.
1386    ///
1387    /// Returns an error if any description in `descriptions` is not found.
1388    ///
1389    /// NOTE: On error, there still may be some items removed from the transaction. It is
1390    /// up to the caller to either abort the transaction or commit.
1391    pub fn remove_system_object_mappings(
1392        &mut self,
1393        descriptions: BTreeSet<SystemObjectDescription>,
1394    ) -> Result<(), CatalogError> {
1395        if descriptions.is_empty() {
1396            return Ok(());
1397        }
1398
1399        let ks: Vec<_> = descriptions
1400            .clone()
1401            .into_iter()
1402            .map(|desc| GidMappingKey {
1403                schema_name: desc.schema_name,
1404                object_type: desc.object_type,
1405                object_name: desc.object_name,
1406            })
1407            .collect();
1408        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1409
1410        if n == descriptions.len() {
1411            Ok(())
1412        } else {
1413            let item_descriptions = self
1414                .system_gid_mapping
1415                .items()
1416                .keys()
1417                .map(|k| SystemObjectDescription {
1418                    schema_name: k.schema_name.clone(),
1419                    object_type: k.object_type.clone(),
1420                    object_name: k.object_name.clone(),
1421                })
1422                .collect();
1423            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1424                format!(
1425                    "{} {}.{}",
1426                    desc.object_type, desc.schema_name, desc.object_name
1427                )
1428            });
1429            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1430        }
1431    }
1432
1433    /// Removes all introspection source indexes in `indexes` from the transaction.
1434    ///
1435    /// Returns an error if any index in `indexes` is not found.
1436    ///
1437    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1438    /// up to the caller to either abort the transaction or commit.
1439    pub fn remove_introspection_source_indexes(
1440        &mut self,
1441        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1442    ) -> Result<(), CatalogError> {
1443        if introspection_source_indexes.is_empty() {
1444            return Ok(());
1445        }
1446
1447        let ks: Vec<_> = introspection_source_indexes
1448            .clone()
1449            .into_iter()
1450            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1451            .collect();
1452        let n = self
1453            .introspection_sources
1454            .delete_by_keys(ks, self.op_id)
1455            .len();
1456        if n == introspection_source_indexes.len() {
1457            Ok(())
1458        } else {
1459            let txn_indexes = self
1460                .introspection_sources
1461                .items()
1462                .keys()
1463                .map(|k| (k.cluster_id, k.name.clone()))
1464                .collect();
1465            let mut unknown = introspection_source_indexes
1466                .difference(&txn_indexes)
1467                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1468            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1469        }
1470    }
1471
1472    /// Updates item `id` in the transaction to `item_name` and `item`.
1473    ///
1474    /// Returns an error if `id` is not found.
1475    ///
1476    /// Runtime is linear with respect to the total number of items in the catalog.
1477    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1478    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1479        let updated =
1480            self.items
1481                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1482        if updated {
1483            Ok(())
1484        } else {
1485            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1486        }
1487    }
1488
1489    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1490    /// corresponding value in `items`.
1491    ///
1492    /// Returns an error if any id in `items` is not found.
1493    ///
1494    /// NOTE: On error, there still may be some items updated in the transaction. It is
1495    /// up to the caller to either abort the transaction or commit.
1496    pub fn update_items(
1497        &mut self,
1498        items: BTreeMap<CatalogItemId, Item>,
1499    ) -> Result<(), CatalogError> {
1500        if items.is_empty() {
1501            return Ok(());
1502        }
1503
1504        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1505        let kvs: Vec<_> = items
1506            .clone()
1507            .into_iter()
1508            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1509            .collect();
1510        let n = self.items.update_by_keys(kvs, self.op_id)?;
1511        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1512        if n == update_ids.len() {
1513            Ok(())
1514        } else {
1515            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1516            let mut unknown = update_ids.difference(&item_ids);
1517            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1518        }
1519    }
1520
1521    /// Updates role `id` in the transaction to `role`.
1522    ///
1523    /// Returns an error if `id` is not found.
1524    ///
1525    /// Runtime is linear with respect to the total number of items in the catalog.
1526    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1527    /// You should model it after [`Self::update_items`].
1528    pub fn update_role(
1529        &mut self,
1530        id: RoleId,
1531        role: Role,
1532        password: PasswordAction,
1533    ) -> Result<(), CatalogError> {
1534        let key = RoleKey { id };
1535        if self.roles.get(&key).is_some() {
1536            let auth_key = RoleAuthKey { role_id: id };
1537
1538            match password {
1539                PasswordAction::Set(new_password) => {
1540                    let hash = mz_auth::hash::scram256_hash(
1541                        &new_password.password,
1542                        &new_password.scram_iterations,
1543                    )
1544                    .expect("password hash should be valid");
1545                    let value = RoleAuthValue {
1546                        password_hash: Some(hash),
1547                        updated_at: SYSTEM_TIME(),
1548                    };
1549
1550                    if self.role_auth.get(&auth_key).is_some() {
1551                        self.role_auth
1552                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1553                    } else {
1554                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1555                    }
1556                }
1557                PasswordAction::Clear => {
1558                    let value = RoleAuthValue {
1559                        password_hash: None,
1560                        updated_at: SYSTEM_TIME(),
1561                    };
1562                    if self.role_auth.get(&auth_key).is_some() {
1563                        self.role_auth
1564                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1565                    }
1566                }
1567                PasswordAction::NoChange => {}
1568            }
1569
1570            self.roles
1571                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1572
1573            Ok(())
1574        } else {
1575            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1576        }
1577    }
1578
1579    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1580    /// corresponding value in `roles`.
1581    ///
1582    /// This function does *not* write role_authentication information to the catalog.
1583    /// It is purely for updating the role itself.
1584    ///
1585    /// Returns an error if any id in `roles` is not found.
1586    ///
1587    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1588    /// up to the caller to either abort the transaction or commit.
1589    pub fn update_roles_without_auth(
1590        &mut self,
1591        roles: BTreeMap<RoleId, Role>,
1592    ) -> Result<(), CatalogError> {
1593        if roles.is_empty() {
1594            return Ok(());
1595        }
1596
1597        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1598        let kvs: Vec<_> = roles
1599            .into_iter()
1600            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1601            .collect();
1602        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1603        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1604
1605        if n == update_role_ids.len() {
1606            Ok(())
1607        } else {
1608            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1609            let mut unknown = update_role_ids.difference(&role_ids);
1610            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1611        }
1612    }
1613
1614    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1615    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1616    ///
1617    /// Panics if provided id is not a system id.
1618    pub fn update_system_object_mappings(
1619        &mut self,
1620        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1621    ) -> Result<(), CatalogError> {
1622        if mappings.is_empty() {
1623            return Ok(());
1624        }
1625
1626        let n = self.system_gid_mapping.update(
1627            |_k, v| {
1628                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1629                    let (_, new_value) = mapping.clone().into_key_value();
1630                    Some(new_value)
1631                } else {
1632                    None
1633                }
1634            },
1635            self.op_id,
1636        )?;
1637
1638        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1639            != mappings.len()
1640        {
1641            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1642            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1643        }
1644
1645        Ok(())
1646    }
1647
1648    /// Updates cluster `id` in the transaction to `cluster`.
1649    ///
1650    /// Returns an error if `id` is not found.
1651    ///
1652    /// Runtime is linear with respect to the total number of clusters in the catalog.
1653    /// DO NOT call this function in a loop.
1654    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1655        let updated = self.clusters.update_by_key(
1656            ClusterKey { id },
1657            cluster.into_key_value().1,
1658            self.op_id,
1659        )?;
1660        if updated {
1661            Ok(())
1662        } else {
1663            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1664        }
1665    }
1666
1667    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1668    ///
1669    /// Returns an error if `replica_id` is not found.
1670    ///
1671    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1672    /// DO NOT call this function in a loop.
1673    pub fn update_cluster_replica(
1674        &mut self,
1675        replica_id: ReplicaId,
1676        replica: ClusterReplica,
1677    ) -> Result<(), CatalogError> {
1678        let updated = self.cluster_replicas.update_by_key(
1679            ClusterReplicaKey { id: replica_id },
1680            replica.into_key_value().1,
1681            self.op_id,
1682        )?;
1683        if updated {
1684            Ok(())
1685        } else {
1686            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1687        }
1688    }
1689
1690    /// Updates database `id` in the transaction to `database`.
1691    ///
1692    /// Returns an error if `id` is not found.
1693    ///
1694    /// Runtime is linear with respect to the total number of databases in the catalog.
1695    /// DO NOT call this function in a loop.
1696    pub fn update_database(
1697        &mut self,
1698        id: DatabaseId,
1699        database: Database,
1700    ) -> Result<(), CatalogError> {
1701        let updated = self.databases.update_by_key(
1702            DatabaseKey { id },
1703            database.into_key_value().1,
1704            self.op_id,
1705        )?;
1706        if updated {
1707            Ok(())
1708        } else {
1709            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1710        }
1711    }
1712
1713    /// Updates schema `schema_id` in the transaction to `schema`.
1714    ///
1715    /// Returns an error if `schema_id` is not found.
1716    ///
1717    /// Runtime is linear with respect to the total number of schemas in the catalog.
1718    /// DO NOT call this function in a loop.
1719    pub fn update_schema(
1720        &mut self,
1721        schema_id: SchemaId,
1722        schema: Schema,
1723    ) -> Result<(), CatalogError> {
1724        let updated = self.schemas.update_by_key(
1725            SchemaKey { id: schema_id },
1726            schema.into_key_value().1,
1727            self.op_id,
1728        )?;
1729        if updated {
1730            Ok(())
1731        } else {
1732            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1733        }
1734    }
1735
1736    /// Updates `network_policy_id` in the transaction to `network policy`.
1737    ///
1738    /// Returns an error if `id` is not found.
1739    ///
1740    /// Runtime is linear with respect to the total number of databases in the catalog.
1741    /// DO NOT call this function in a loop.
1742    pub fn update_network_policy(
1743        &mut self,
1744        id: NetworkPolicyId,
1745        network_policy: NetworkPolicy,
1746    ) -> Result<(), CatalogError> {
1747        let updated = self.network_policies.update_by_key(
1748            NetworkPolicyKey { id },
1749            network_policy.into_key_value().1,
1750            self.op_id,
1751        )?;
1752        if updated {
1753            Ok(())
1754        } else {
1755            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1756        }
1757    }
1758    /// Removes all network policies in `network policies` from the transaction.
1759    ///
1760    /// Returns an error if any id in `network policy` is not found.
1761    ///
1762    /// NOTE: On error, there still may be some roles removed from the transaction. It
1763    /// is up to the caller to either abort the transaction or commit.
1764    pub fn remove_network_policies(
1765        &mut self,
1766        network_policies: &BTreeSet<NetworkPolicyId>,
1767    ) -> Result<(), CatalogError> {
1768        if network_policies.is_empty() {
1769            return Ok(());
1770        }
1771
1772        let to_remove = network_policies
1773            .iter()
1774            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1775            .collect();
1776        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1777        assert!(
1778            prev.iter().all(|(k, _)| k.id.is_user()),
1779            "cannot delete non-user network policy"
1780        );
1781
1782        prev.retain(|_k, v| v.is_none());
1783        if !prev.is_empty() {
1784            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1785            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1786        }
1787
1788        Ok(())
1789    }
1790    /// Set persisted default privilege.
1791    ///
1792    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1793    pub fn set_default_privilege(
1794        &mut self,
1795        role_id: RoleId,
1796        database_id: Option<DatabaseId>,
1797        schema_id: Option<SchemaId>,
1798        object_type: ObjectType,
1799        grantee: RoleId,
1800        privileges: Option<AclMode>,
1801    ) -> Result<(), CatalogError> {
1802        self.default_privileges.set(
1803            DefaultPrivilegesKey {
1804                role_id,
1805                database_id,
1806                schema_id,
1807                object_type,
1808                grantee,
1809            },
1810            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1811            self.op_id,
1812        )?;
1813        Ok(())
1814    }
1815
1816    /// Set persisted default privileges.
1817    pub fn set_default_privileges(
1818        &mut self,
1819        default_privileges: Vec<DefaultPrivilege>,
1820    ) -> Result<(), CatalogError> {
1821        if default_privileges.is_empty() {
1822            return Ok(());
1823        }
1824
1825        let default_privileges = default_privileges
1826            .into_iter()
1827            .map(DurableType::into_key_value)
1828            .map(|(k, v)| (k, Some(v)))
1829            .collect();
1830        self.default_privileges
1831            .set_many(default_privileges, self.op_id)?;
1832        Ok(())
1833    }
1834
1835    /// Set persisted system privilege.
1836    ///
1837    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1838    pub fn set_system_privilege(
1839        &mut self,
1840        grantee: RoleId,
1841        grantor: RoleId,
1842        acl_mode: Option<AclMode>,
1843    ) -> Result<(), CatalogError> {
1844        self.system_privileges.set(
1845            SystemPrivilegesKey { grantee, grantor },
1846            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1847            self.op_id,
1848        )?;
1849        Ok(())
1850    }
1851
1852    /// Set persisted system privileges.
1853    pub fn set_system_privileges(
1854        &mut self,
1855        system_privileges: Vec<MzAclItem>,
1856    ) -> Result<(), CatalogError> {
1857        if system_privileges.is_empty() {
1858            return Ok(());
1859        }
1860
1861        let system_privileges = system_privileges
1862            .into_iter()
1863            .map(DurableType::into_key_value)
1864            .map(|(k, v)| (k, Some(v)))
1865            .collect();
1866        self.system_privileges
1867            .set_many(system_privileges, self.op_id)?;
1868        Ok(())
1869    }
1870
1871    /// Set persisted setting.
1872    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1873        self.settings.set(
1874            SettingKey { name },
1875            value.map(|value| SettingValue { value }),
1876            self.op_id,
1877        )?;
1878        Ok(())
1879    }
1880
1881    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1882        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1883    }
1884
1885    /// Insert persisted introspection source index.
1886    pub fn insert_introspection_source_indexes(
1887        &mut self,
1888        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1889        temporary_oids: &HashSet<u32>,
1890    ) -> Result<(), CatalogError> {
1891        if introspection_source_indexes.is_empty() {
1892            return Ok(());
1893        }
1894
1895        let amount = usize_to_u64(introspection_source_indexes.len());
1896        let oids = self.allocate_oids(amount, temporary_oids)?;
1897        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1898            .into_iter()
1899            .zip_eq(oids)
1900            .map(
1901                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1902                    cluster_id,
1903                    name,
1904                    item_id,
1905                    index_id,
1906                    oid,
1907                },
1908            )
1909            .collect();
1910
1911        for introspection_source_index in introspection_source_indexes {
1912            let (key, value) = introspection_source_index.into_key_value();
1913            self.introspection_sources.insert(key, value, self.op_id)?;
1914        }
1915
1916        Ok(())
1917    }
1918
1919    /// Set persisted system object mappings.
1920    pub fn set_system_object_mappings(
1921        &mut self,
1922        mappings: Vec<SystemObjectMapping>,
1923    ) -> Result<(), CatalogError> {
1924        if mappings.is_empty() {
1925            return Ok(());
1926        }
1927
1928        let mappings = mappings
1929            .into_iter()
1930            .map(DurableType::into_key_value)
1931            .map(|(k, v)| (k, Some(v)))
1932            .collect();
1933        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1934        Ok(())
1935    }
1936
1937    /// Set persisted replica.
1938    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1939        if replicas.is_empty() {
1940            return Ok(());
1941        }
1942
1943        let replicas = replicas
1944            .into_iter()
1945            .map(DurableType::into_key_value)
1946            .map(|(k, v)| (k, Some(v)))
1947            .collect();
1948        self.cluster_replicas.set_many(replicas, self.op_id)?;
1949        Ok(())
1950    }
1951
1952    /// Set persisted configuration.
1953    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1954        match value {
1955            Some(value) => {
1956                let config = Config { key, value };
1957                let (key, value) = config.into_key_value();
1958                self.configs.set(key, Some(value), self.op_id)?;
1959            }
1960            None => {
1961                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1962            }
1963        }
1964        Ok(())
1965    }
1966
1967    /// Get the value of a persisted config.
1968    pub fn get_config(&self, key: String) -> Option<u64> {
1969        self.configs
1970            .get(&ConfigKey { key })
1971            .map(|entry| entry.value)
1972    }
1973
1974    /// Get the value of a persisted setting.
1975    pub fn get_setting(&self, name: String) -> Option<&str> {
1976        self.settings
1977            .get(&SettingKey { name })
1978            .map(|entry| &*entry.value)
1979    }
1980
1981    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1982        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1983            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1984    }
1985
1986    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1987        self.set_setting(
1988            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1989            Some(shard_id.to_string()),
1990        )
1991    }
1992
1993    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1994        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1995            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1996    }
1997
1998    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1999        self.set_setting(
2000            EXPRESSION_CACHE_SHARD_KEY.to_string(),
2001            Some(shard_id.to_string()),
2002        )
2003    }
2004
2005    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
2006    /// match the `with_0dt_deployment_max_wait` "system var" value.
2007    ///
2008    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2009    /// but use it in boot before Launch Darkly is available.
2010    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2011        self.set_config(
2012            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2013            Some(
2014                value
2015                    .as_millis()
2016                    .try_into()
2017                    .expect("max wait fits into u64"),
2018            ),
2019        )
2020    }
2021
2022    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
2023    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2024    /// value.
2025    ///
2026    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2027    /// but use it in boot before Launch Darkly is available.
2028    pub fn set_0dt_deployment_ddl_check_interval(
2029        &mut self,
2030        value: Duration,
2031    ) -> Result<(), CatalogError> {
2032        self.set_config(
2033            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2034            Some(
2035                value
2036                    .as_millis()
2037                    .try_into()
2038                    .expect("ddl check interval fits into u64"),
2039            ),
2040        )
2041    }
2042
2043    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
2044    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
2045    ///
2046    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2047    /// but use it in boot before Launch Darkly is available.
2048    pub fn set_enable_0dt_deployment_panic_after_timeout(
2049        &mut self,
2050        value: bool,
2051    ) -> Result<(), CatalogError> {
2052        self.set_config(
2053            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2054            Some(u64::from(value)),
2055        )
2056    }
2057
2058    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2059    /// match the `with_0dt_deployment_max_wait` "system var" value.
2060    ///
2061    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2062    /// but use it in boot before LaunchDarkly is available.
2063    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2064        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2065    }
2066
2067    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2068    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2069    /// value.
2070    ///
2071    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2072    /// use it in boot before LaunchDarkly is available.
2073    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2074        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2075    }
2076
2077    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2078    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2079    /// var" value.
2080    ///
2081    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2082    /// use it in boot before LaunchDarkly is available.
2083    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2084        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2085    }
2086
2087    /// Updates the catalog `system_config_synced` "config" value to true.
2088    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2089        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2090    }
2091
2092    pub fn update_comment(
2093        &mut self,
2094        object_id: CommentObjectId,
2095        sub_component: Option<usize>,
2096        comment: Option<String>,
2097    ) -> Result<(), CatalogError> {
2098        let key = CommentKey {
2099            object_id,
2100            sub_component,
2101        };
2102        let value = comment.map(|c| CommentValue { comment: c });
2103        self.comments.set(key, value, self.op_id)?;
2104
2105        Ok(())
2106    }
2107
2108    pub fn drop_comments(
2109        &mut self,
2110        object_ids: &BTreeSet<CommentObjectId>,
2111    ) -> Result<(), CatalogError> {
2112        if object_ids.is_empty() {
2113            return Ok(());
2114        }
2115
2116        self.comments
2117            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2118        Ok(())
2119    }
2120
2121    pub fn update_source_references(
2122        &mut self,
2123        source_id: CatalogItemId,
2124        references: Vec<SourceReference>,
2125        updated_at: u64,
2126    ) -> Result<(), CatalogError> {
2127        let key = SourceReferencesKey { source_id };
2128        let value = SourceReferencesValue {
2129            references,
2130            updated_at,
2131        };
2132        self.source_references.set(key, Some(value), self.op_id)?;
2133        Ok(())
2134    }
2135
2136    /// Upserts persisted system configuration `name` to `value`.
2137    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2138        let key = ServerConfigurationKey {
2139            name: name.to_string(),
2140        };
2141        let value = ServerConfigurationValue { value };
2142        self.system_configurations
2143            .set(key, Some(value), self.op_id)?;
2144        Ok(())
2145    }
2146
2147    /// Removes persisted system configuration `name`.
2148    pub fn remove_system_config(&mut self, name: &str) {
2149        let key = ServerConfigurationKey {
2150            name: name.to_string(),
2151        };
2152        self.system_configurations
2153            .set(key, None, self.op_id)
2154            .expect("cannot have uniqueness violation");
2155    }
2156
2157    /// Removes all persisted system configurations.
2158    pub fn clear_system_configs(&mut self) {
2159        self.system_configurations.delete(|_k, _v| true, self.op_id);
2160    }
2161
2162    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2163        match self.configs.insert(
2164            ConfigKey { key: key.clone() },
2165            ConfigValue { value },
2166            self.op_id,
2167        ) {
2168            Ok(_) => Ok(()),
2169            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2170        }
2171    }
2172
2173    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2174        self.clusters
2175            .items()
2176            .into_iter()
2177            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2178    }
2179
2180    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2181        self.cluster_replicas
2182            .items()
2183            .into_iter()
2184            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2185    }
2186
2187    pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2188        self.databases
2189            .items()
2190            .into_iter()
2191            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2192    }
2193
2194    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2195        self.roles
2196            .items()
2197            .into_iter()
2198            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2199    }
2200
2201    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2202        self.network_policies
2203            .items()
2204            .into_iter()
2205            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2206    }
2207
2208    pub fn get_system_object_mappings(
2209        &self,
2210    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2211        self.system_gid_mapping
2212            .items()
2213            .into_iter()
2214            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2215    }
2216
2217    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2218        self.schemas
2219            .items()
2220            .into_iter()
2221            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2222    }
2223
2224    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2225        self.system_configurations
2226            .items()
2227            .into_iter()
2228            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2229    }
2230
2231    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2232        let key = SchemaKey { id: *id };
2233        self.schemas
2234            .get(&key)
2235            .map(|v| DurableType::from_key_value(key, v.clone()))
2236    }
2237
2238    pub fn get_introspection_source_indexes(
2239        &self,
2240        cluster_id: ClusterId,
2241    ) -> BTreeMap<&str, (GlobalId, u32)> {
2242        self.introspection_sources
2243            .items()
2244            .into_iter()
2245            .filter(|(k, _v)| k.cluster_id == cluster_id)
2246            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2247            .collect()
2248    }
2249
2250    pub fn get_catalog_content_version(&self) -> Option<&str> {
2251        self.settings
2252            .get(&SettingKey {
2253                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2254            })
2255            .map(|value| &*value.value)
2256    }
2257
2258    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2259        self.settings
2260            .get(&SettingKey {
2261                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2262            })
2263            .map(|value| value.value.clone())
2264    }
2265
2266    /// Commit the current operation within the transaction. This does not cause anything to be
2267    /// written durably, but signals to the current transaction that we are moving on to the next
2268    /// operation.
2269    ///
2270    /// Returns the updates of the committed operation.
2271    #[must_use]
2272    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2273        let updates = self.get_op_updates();
2274        self.commit_op();
2275        updates
2276    }
2277
2278    fn get_op_updates(&self) -> Vec<StateUpdate> {
2279        fn get_collection_op_updates<'a, T>(
2280            table_txn: &'a TableTransaction<T::Key, T::Value>,
2281            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2282            op: Timestamp,
2283        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2284        where
2285            T::Key: Ord + Eq + Clone + Debug,
2286            T::Value: Ord + Clone + Debug,
2287            T: DurableType,
2288        {
2289            table_txn
2290                .pending
2291                .iter()
2292                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2293                .filter_map(move |(k, v)| {
2294                    if v.ts == op {
2295                        let key = k.clone();
2296                        let value = v.value.clone();
2297                        let diff = v.diff.clone().try_into().expect("invalid diff");
2298                        let update = DurableType::from_key_value(key, value);
2299                        let kind = kind_fn(update);
2300                        Some((kind, diff))
2301                    } else {
2302                        None
2303                    }
2304                })
2305        }
2306
2307        fn get_large_collection_op_updates<'a, T>(
2308            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2309            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2310            op: Timestamp,
2311        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2312        where
2313            T::Key: Ord + Eq + Clone + Debug,
2314            T: DurableType<Value = ()>,
2315        {
2316            collection.iter().filter_map(move |(k, diff, ts)| {
2317                if *ts == op {
2318                    let key = k.clone();
2319                    let diff = diff.clone().try_into().expect("invalid diff");
2320                    let update = DurableType::from_key_value(key, ());
2321                    let kind = kind_fn(update);
2322                    Some((kind, diff))
2323                } else {
2324                    None
2325                }
2326            })
2327        }
2328
2329        let Transaction {
2330            durable_catalog: _,
2331            databases,
2332            schemas,
2333            items,
2334            comments,
2335            roles,
2336            role_auth,
2337            clusters,
2338            network_policies,
2339            cluster_replicas,
2340            introspection_sources,
2341            system_gid_mapping,
2342            system_configurations,
2343            default_privileges,
2344            source_references,
2345            system_privileges,
2346            audit_log_updates,
2347            storage_collection_metadata,
2348            unfinalized_shards,
2349            // Not representable as a `StateUpdate`.
2350            id_allocator: _,
2351            configs: _,
2352            settings: _,
2353            txn_wal_shard: _,
2354            upper,
2355            op_id: _,
2356        } = &self;
2357
2358        let updates = std::iter::empty()
2359            .chain(get_collection_op_updates(
2360                roles,
2361                StateUpdateKind::Role,
2362                self.op_id,
2363            ))
2364            .chain(get_collection_op_updates(
2365                role_auth,
2366                StateUpdateKind::RoleAuth,
2367                self.op_id,
2368            ))
2369            .chain(get_collection_op_updates(
2370                databases,
2371                StateUpdateKind::Database,
2372                self.op_id,
2373            ))
2374            .chain(get_collection_op_updates(
2375                schemas,
2376                StateUpdateKind::Schema,
2377                self.op_id,
2378            ))
2379            .chain(get_collection_op_updates(
2380                default_privileges,
2381                StateUpdateKind::DefaultPrivilege,
2382                self.op_id,
2383            ))
2384            .chain(get_collection_op_updates(
2385                system_privileges,
2386                StateUpdateKind::SystemPrivilege,
2387                self.op_id,
2388            ))
2389            .chain(get_collection_op_updates(
2390                system_configurations,
2391                StateUpdateKind::SystemConfiguration,
2392                self.op_id,
2393            ))
2394            .chain(get_collection_op_updates(
2395                clusters,
2396                StateUpdateKind::Cluster,
2397                self.op_id,
2398            ))
2399            .chain(get_collection_op_updates(
2400                network_policies,
2401                StateUpdateKind::NetworkPolicy,
2402                self.op_id,
2403            ))
2404            .chain(get_collection_op_updates(
2405                introspection_sources,
2406                StateUpdateKind::IntrospectionSourceIndex,
2407                self.op_id,
2408            ))
2409            .chain(get_collection_op_updates(
2410                cluster_replicas,
2411                StateUpdateKind::ClusterReplica,
2412                self.op_id,
2413            ))
2414            .chain(get_collection_op_updates(
2415                system_gid_mapping,
2416                StateUpdateKind::SystemObjectMapping,
2417                self.op_id,
2418            ))
2419            .chain(get_collection_op_updates(
2420                items,
2421                StateUpdateKind::Item,
2422                self.op_id,
2423            ))
2424            .chain(get_collection_op_updates(
2425                comments,
2426                StateUpdateKind::Comment,
2427                self.op_id,
2428            ))
2429            .chain(get_collection_op_updates(
2430                source_references,
2431                StateUpdateKind::SourceReferences,
2432                self.op_id,
2433            ))
2434            .chain(get_collection_op_updates(
2435                storage_collection_metadata,
2436                StateUpdateKind::StorageCollectionMetadata,
2437                self.op_id,
2438            ))
2439            .chain(get_collection_op_updates(
2440                unfinalized_shards,
2441                StateUpdateKind::UnfinalizedShard,
2442                self.op_id,
2443            ))
2444            .chain(get_large_collection_op_updates(
2445                audit_log_updates,
2446                StateUpdateKind::AuditLog,
2447                self.op_id,
2448            ))
2449            .map(|(kind, diff)| StateUpdate {
2450                kind,
2451                ts: upper.clone(),
2452                diff,
2453            })
2454            .collect();
2455
2456        updates
2457    }
2458
2459    pub fn is_savepoint(&self) -> bool {
2460        self.durable_catalog.is_savepoint()
2461    }
2462
2463    fn commit_op(&mut self) {
2464        self.op_id += 1;
2465    }
2466
2467    pub fn op_id(&self) -> Timestamp {
2468        self.op_id
2469    }
2470
2471    pub fn upper(&self) -> mz_repr::Timestamp {
2472        self.upper
2473    }
2474
2475    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2476        let audit_log_updates = self
2477            .audit_log_updates
2478            .into_iter()
2479            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2480            .collect();
2481
2482        let txn_batch = TransactionBatch {
2483            databases: self.databases.pending(),
2484            schemas: self.schemas.pending(),
2485            items: self.items.pending(),
2486            comments: self.comments.pending(),
2487            roles: self.roles.pending(),
2488            role_auth: self.role_auth.pending(),
2489            clusters: self.clusters.pending(),
2490            cluster_replicas: self.cluster_replicas.pending(),
2491            network_policies: self.network_policies.pending(),
2492            introspection_sources: self.introspection_sources.pending(),
2493            id_allocator: self.id_allocator.pending(),
2494            configs: self.configs.pending(),
2495            source_references: self.source_references.pending(),
2496            settings: self.settings.pending(),
2497            system_gid_mapping: self.system_gid_mapping.pending(),
2498            system_configurations: self.system_configurations.pending(),
2499            default_privileges: self.default_privileges.pending(),
2500            system_privileges: self.system_privileges.pending(),
2501            storage_collection_metadata: self.storage_collection_metadata.pending(),
2502            unfinalized_shards: self.unfinalized_shards.pending(),
2503            txn_wal_shard: self.txn_wal_shard.pending(),
2504            audit_log_updates,
2505            upper: self.upper,
2506        };
2507        (txn_batch, self.durable_catalog)
2508    }
2509
2510    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2511    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2512    /// before proceeding. In general, this must be fatal to the calling process. We do not
2513    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2514    ///
2515    /// The transaction is committed at `commit_ts`.
2516    ///
2517    /// Returns what the upper was directly after the transaction committed.
2518    ///
2519    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2520    /// catalog is not writeable.
2521    #[mz_ore::instrument(level = "debug")]
2522    pub(crate) async fn commit_internal(
2523        self,
2524        commit_ts: mz_repr::Timestamp,
2525    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2526        let (mut txn_batch, durable_catalog) = self.into_parts();
2527        let TransactionBatch {
2528            databases,
2529            schemas,
2530            items,
2531            comments,
2532            roles,
2533            role_auth,
2534            clusters,
2535            cluster_replicas,
2536            network_policies,
2537            introspection_sources,
2538            id_allocator,
2539            configs,
2540            source_references,
2541            settings,
2542            system_gid_mapping,
2543            system_configurations,
2544            default_privileges,
2545            system_privileges,
2546            storage_collection_metadata,
2547            unfinalized_shards,
2548            txn_wal_shard,
2549            audit_log_updates,
2550            upper: _,
2551        } = &mut txn_batch;
2552        // Consolidate in memory because it will likely be faster than consolidating after the
2553        // transaction has been made durable.
2554        differential_dataflow::consolidation::consolidate_updates(databases);
2555        differential_dataflow::consolidation::consolidate_updates(schemas);
2556        differential_dataflow::consolidation::consolidate_updates(items);
2557        differential_dataflow::consolidation::consolidate_updates(comments);
2558        differential_dataflow::consolidation::consolidate_updates(roles);
2559        differential_dataflow::consolidation::consolidate_updates(role_auth);
2560        differential_dataflow::consolidation::consolidate_updates(clusters);
2561        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2562        differential_dataflow::consolidation::consolidate_updates(network_policies);
2563        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2564        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2565        differential_dataflow::consolidation::consolidate_updates(configs);
2566        differential_dataflow::consolidation::consolidate_updates(settings);
2567        differential_dataflow::consolidation::consolidate_updates(source_references);
2568        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2569        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2570        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2571        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2572        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2573        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2574        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2575        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2576
2577        let upper = durable_catalog
2578            .commit_transaction(txn_batch, commit_ts)
2579            .await?;
2580        Ok((durable_catalog, upper))
2581    }
2582
2583    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2584    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2585    /// before proceeding. In general, this must be fatal to the calling process. We do not
2586    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2587    ///
2588    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2589    /// catalog is not writeable.
2590    ///
2591    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2592    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2593    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2594    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2595    ///
2596    /// An alternative implementation would be for the caller to explicitly consume their updates
2597    /// after committing and only then apply the updates in-memory. While this removes assumptions
2598    /// about the caller in this method, in practice it results in duplicate work on every commit.
2599    #[mz_ore::instrument(level = "debug")]
2600    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2601        let op_updates = self.get_op_updates();
2602        assert!(
2603            op_updates.is_empty(),
2604            "unconsumed transaction updates: {op_updates:?}"
2605        );
2606
2607        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2608        // Drain all the updates from the commit since it is assumed that they were already applied.
2609        let updates = durable_storage.sync_updates(upper).await?;
2610        // Writable and savepoint catalogs should have consumed all updates before committing a
2611        // transaction, otherwise the commit was performed with an out of date state.
2612        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2613        // updates before committing.
2614        soft_assert_no_log!(
2615            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2616            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2617        );
2618        Ok(())
2619    }
2620}
2621
2622use crate::durable::async_trait;
2623
2624use super::objects::{RoleAuthKey, RoleAuthValue};
2625
2626#[async_trait]
2627impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2628    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2629        self.storage_collection_metadata
2630            .items()
2631            .into_iter()
2632            .map(
2633                |(
2634                    StorageCollectionMetadataKey { id },
2635                    StorageCollectionMetadataValue { shard },
2636                )| { (*id, shard.clone()) },
2637            )
2638            .collect()
2639    }
2640
2641    fn insert_collection_metadata(
2642        &mut self,
2643        metadata: BTreeMap<GlobalId, ShardId>,
2644    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2645        for (id, shard) in metadata {
2646            self.storage_collection_metadata
2647                .insert(
2648                    StorageCollectionMetadataKey { id },
2649                    StorageCollectionMetadataValue {
2650                        shard: shard.clone(),
2651                    },
2652                    self.op_id,
2653                )
2654                .map_err(|err| match err {
2655                    DurableCatalogError::DuplicateKey => {
2656                        StorageError::CollectionMetadataAlreadyExists(id)
2657                    }
2658                    DurableCatalogError::UniquenessViolation => {
2659                        StorageError::PersistShardAlreadyInUse(shard)
2660                    }
2661                    err => StorageError::Generic(anyhow::anyhow!(err)),
2662                })?;
2663        }
2664        Ok(())
2665    }
2666
2667    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2668        let ks: Vec<_> = ids
2669            .into_iter()
2670            .map(|id| StorageCollectionMetadataKey { id })
2671            .collect();
2672        self.storage_collection_metadata
2673            .delete_by_keys(ks, self.op_id)
2674            .into_iter()
2675            .map(
2676                |(
2677                    StorageCollectionMetadataKey { id },
2678                    StorageCollectionMetadataValue { shard },
2679                )| (id, shard),
2680            )
2681            .collect()
2682    }
2683
2684    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2685        self.unfinalized_shards
2686            .items()
2687            .into_iter()
2688            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2689            .collect()
2690    }
2691
2692    fn insert_unfinalized_shards(
2693        &mut self,
2694        s: BTreeSet<ShardId>,
2695    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2696        for shard in s {
2697            match self
2698                .unfinalized_shards
2699                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2700            {
2701                // Inserting duplicate keys has no effect.
2702                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2703                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2704            };
2705        }
2706        Ok(())
2707    }
2708
2709    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2710        let ks: Vec<_> = shards
2711            .into_iter()
2712            .map(|shard| UnfinalizedShardKey { shard })
2713            .collect();
2714        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2715    }
2716
2717    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2718        self.txn_wal_shard
2719            .values()
2720            .iter()
2721            .next()
2722            .map(|TxnWalShardValue { shard }| *shard)
2723    }
2724
2725    fn write_txn_wal_shard(
2726        &mut self,
2727        shard: ShardId,
2728    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2729        self.txn_wal_shard
2730            .insert((), TxnWalShardValue { shard }, self.op_id)
2731            .map_err(|err| match err {
2732                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2733                err => StorageError::Generic(anyhow::anyhow!(err)),
2734            })
2735    }
2736}
2737
2738/// Describes a set of changes to apply as the result of a catalog transaction.
2739#[derive(Debug, Clone, Default, PartialEq)]
2740pub struct TransactionBatch {
2741    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2742    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2743    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2744    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2745    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2746    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2747    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2748    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2749    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2750    pub(crate) introspection_sources: Vec<(
2751        proto::ClusterIntrospectionSourceIndexKey,
2752        proto::ClusterIntrospectionSourceIndexValue,
2753        Diff,
2754    )>,
2755    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2756    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2757    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2758    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2759    pub(crate) system_configurations: Vec<(
2760        proto::ServerConfigurationKey,
2761        proto::ServerConfigurationValue,
2762        Diff,
2763    )>,
2764    pub(crate) default_privileges: Vec<(
2765        proto::DefaultPrivilegesKey,
2766        proto::DefaultPrivilegesValue,
2767        Diff,
2768    )>,
2769    pub(crate) source_references: Vec<(
2770        proto::SourceReferencesKey,
2771        proto::SourceReferencesValue,
2772        Diff,
2773    )>,
2774    pub(crate) system_privileges: Vec<(
2775        proto::SystemPrivilegesKey,
2776        proto::SystemPrivilegesValue,
2777        Diff,
2778    )>,
2779    pub(crate) storage_collection_metadata: Vec<(
2780        proto::StorageCollectionMetadataKey,
2781        proto::StorageCollectionMetadataValue,
2782        Diff,
2783    )>,
2784    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2785    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2786    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2787    /// The upper of the catalog when the transaction started.
2788    pub(crate) upper: mz_repr::Timestamp,
2789}
2790
2791impl TransactionBatch {
2792    pub fn is_empty(&self) -> bool {
2793        let TransactionBatch {
2794            databases,
2795            schemas,
2796            items,
2797            comments,
2798            roles,
2799            role_auth,
2800            clusters,
2801            cluster_replicas,
2802            network_policies,
2803            introspection_sources,
2804            id_allocator,
2805            configs,
2806            settings,
2807            source_references,
2808            system_gid_mapping,
2809            system_configurations,
2810            default_privileges,
2811            system_privileges,
2812            storage_collection_metadata,
2813            unfinalized_shards,
2814            txn_wal_shard,
2815            audit_log_updates,
2816            upper: _,
2817        } = self;
2818        databases.is_empty()
2819            && schemas.is_empty()
2820            && items.is_empty()
2821            && comments.is_empty()
2822            && roles.is_empty()
2823            && role_auth.is_empty()
2824            && clusters.is_empty()
2825            && cluster_replicas.is_empty()
2826            && network_policies.is_empty()
2827            && introspection_sources.is_empty()
2828            && id_allocator.is_empty()
2829            && configs.is_empty()
2830            && settings.is_empty()
2831            && source_references.is_empty()
2832            && system_gid_mapping.is_empty()
2833            && system_configurations.is_empty()
2834            && default_privileges.is_empty()
2835            && system_privileges.is_empty()
2836            && storage_collection_metadata.is_empty()
2837            && unfinalized_shards.is_empty()
2838            && txn_wal_shard.is_empty()
2839            && audit_log_updates.is_empty()
2840    }
2841}
2842
2843#[derive(Debug, Clone, PartialEq, Eq)]
2844struct TransactionUpdate<V> {
2845    value: V,
2846    ts: Timestamp,
2847    diff: Diff,
2848}
2849
2850/// Utility trait to check for plan validity.
2851trait UniqueName {
2852    /// Does the item have a unique name? If yes, we can check for name equality in validity
2853    /// checking.
2854    const HAS_UNIQUE_NAME: bool;
2855    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2856    fn unique_name(&self) -> &str;
2857}
2858
2859mod unique_name {
2860    use crate::durable::objects::*;
2861
2862    macro_rules! impl_unique_name {
2863        ($($t:ty),* $(,)?) => {
2864            $(
2865                impl crate::durable::transaction::UniqueName for $t {
2866                    const HAS_UNIQUE_NAME: bool = true;
2867                    fn unique_name(&self) -> &str {
2868                        &self.name
2869                    }
2870                }
2871            )*
2872        };
2873    }
2874
2875    macro_rules! impl_no_unique_name {
2876        ($($t:ty),* $(,)?) => {
2877            $(
2878                impl crate::durable::transaction::UniqueName for $t {
2879                    const HAS_UNIQUE_NAME: bool = false;
2880                    fn unique_name(&self) -> &str {
2881                       ""
2882                    }
2883                }
2884            )*
2885        };
2886    }
2887
2888    impl_unique_name! {
2889        ClusterReplicaValue,
2890        ClusterValue,
2891        DatabaseValue,
2892        ItemValue,
2893        NetworkPolicyValue,
2894        RoleValue,
2895        SchemaValue,
2896    }
2897
2898    impl_no_unique_name!(
2899        (),
2900        ClusterIntrospectionSourceIndexValue,
2901        CommentValue,
2902        ConfigValue,
2903        DefaultPrivilegesValue,
2904        GidMappingValue,
2905        IdAllocValue,
2906        ServerConfigurationValue,
2907        SettingValue,
2908        SourceReferencesValue,
2909        StorageCollectionMetadataValue,
2910        SystemPrivilegesValue,
2911        TxnWalShardValue,
2912        RoleAuthValue,
2913    );
2914
2915    #[cfg(test)]
2916    mod test {
2917        impl_no_unique_name!(String,);
2918    }
2919}
2920
2921/// TableTransaction emulates some features of a typical SQL transaction over
2922/// table for a Collection.
2923///
2924/// It supports:
2925/// - uniqueness constraints
2926/// - transactional reads and writes (including read-your-writes before commit)
2927///
2928/// `K` is the primary key type. Multiple entries with the same key are disallowed.
2929/// `V` is the an arbitrary value type.
2930#[derive(Debug)]
2931struct TableTransaction<K, V> {
2932    initial: BTreeMap<K, V>,
2933    // The desired updates to keys after commit.
2934    // Invariant: Value is sorted by `ts`.
2935    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2936    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2937}
2938
2939impl<K, V> TableTransaction<K, V>
2940where
2941    K: Ord + Eq + Clone + Debug,
2942    V: Ord + Clone + Debug + UniqueName,
2943{
2944    /// Create a new TableTransaction with initial data.
2945    ///
2946    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
2947    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
2948    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
2949    /// over.
2950    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2951    where
2952        K: RustType<KP>,
2953        V: RustType<VP>,
2954    {
2955        let initial = initial
2956            .into_iter()
2957            .map(RustType::from_proto)
2958            .collect::<Result<_, _>>()?;
2959
2960        Ok(Self {
2961            initial,
2962            pending: BTreeMap::new(),
2963            uniqueness_violation: None,
2964        })
2965    }
2966
2967    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
2968    /// that determines whether there is a uniqueness violation among two values.
2969    fn new_with_uniqueness_fn<KP, VP>(
2970        initial: BTreeMap<KP, VP>,
2971        uniqueness_violation: fn(a: &V, b: &V) -> bool,
2972    ) -> Result<Self, TryFromProtoError>
2973    where
2974        K: RustType<KP>,
2975        V: RustType<VP>,
2976    {
2977        let initial = initial
2978            .into_iter()
2979            .map(RustType::from_proto)
2980            .collect::<Result<_, _>>()?;
2981
2982        Ok(Self {
2983            initial,
2984            pending: BTreeMap::new(),
2985            uniqueness_violation: Some(uniqueness_violation),
2986        })
2987    }
2988
2989    /// Consumes and returns the pending changes and their diffs. `Diff` is
2990    /// guaranteed to be 1 or -1.
2991    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2992    where
2993        K: RustType<KP>,
2994        V: RustType<VP>,
2995    {
2996        soft_assert_no_log!(self.verify().is_ok());
2997        // Pending describes the desired final state for some keys. K,V pairs should be
2998        // retracted if they already exist and were deleted or are being updated.
2999        self.pending
3000            .into_iter()
3001            .flat_map(|(k, v)| {
3002                let mut v: Vec<_> = v
3003                    .into_iter()
3004                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3005                    .collect();
3006                differential_dataflow::consolidation::consolidate(&mut v);
3007                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3008            })
3009            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3010            .collect()
3011    }
3012
3013    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
3014    ///
3015    /// Runtime is O(n^2), where n is the number of items in `self`, if
3016    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
3017    fn verify(&self) -> Result<(), DurableCatalogError> {
3018        if let Some(uniqueness_violation) = self.uniqueness_violation {
3019            // Compare each value to each other value and ensure they are unique.
3020            let items = self.values();
3021            if V::HAS_UNIQUE_NAME {
3022                let by_name: BTreeMap<_, _> = items
3023                    .iter()
3024                    .enumerate()
3025                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3026                    .collect();
3027                for (i, vi) in items.iter().enumerate() {
3028                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3029                        if i != *j && uniqueness_violation(vi, *vj) {
3030                            return Err(DurableCatalogError::UniquenessViolation);
3031                        }
3032                    }
3033                }
3034            } else {
3035                for (i, vi) in items.iter().enumerate() {
3036                    for (j, vj) in items.iter().enumerate() {
3037                        if i != j && uniqueness_violation(vi, vj) {
3038                            return Err(DurableCatalogError::UniquenessViolation);
3039                        }
3040                    }
3041                }
3042            }
3043        }
3044        soft_assert_no_log!(
3045            self.pending
3046                .values()
3047                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3048            "pending should be sorted by timestamp: {:?}",
3049            self.pending
3050        );
3051        Ok(())
3052    }
3053
3054    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3055    ///
3056    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3057    /// items in `keys`.
3058    fn verify_keys<'a>(
3059        &self,
3060        keys: impl IntoIterator<Item = &'a K>,
3061    ) -> Result<(), DurableCatalogError>
3062    where
3063        K: 'a,
3064    {
3065        if let Some(uniqueness_violation) = self.uniqueness_violation {
3066            let entries: Vec<_> = keys
3067                .into_iter()
3068                .filter_map(|key| self.get(key).map(|value| (key, value)))
3069                .collect();
3070            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3071            for (ki, vi) in self.items() {
3072                for (kj, vj) in &entries {
3073                    if ki != *kj && uniqueness_violation(vi, vj) {
3074                        return Err(DurableCatalogError::UniquenessViolation);
3075                    }
3076                }
3077            }
3078        }
3079        soft_assert_no_log!(self.verify().is_ok());
3080        Ok(())
3081    }
3082
3083    /// Iterates over the items viewable in the current transaction in arbitrary
3084    /// order and applies `f` on all key, value pairs.
3085    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3086        let mut seen = BTreeSet::new();
3087        for k in self.pending.keys() {
3088            seen.insert(k);
3089            let v = self.get(k);
3090            // Deleted items don't exist so shouldn't be visited, but still suppress
3091            // visiting the key later.
3092            if let Some(v) = v {
3093                f(k, v);
3094            }
3095        }
3096        for (k, v) in self.initial.iter() {
3097            // Add on initial items that don't have updates.
3098            if !seen.contains(k) {
3099                f(k, v);
3100            }
3101        }
3102    }
3103
3104    /// Returns the current value of `k`.
3105    fn get(&self, k: &K) -> Option<&V> {
3106        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3107        let mut updates = Vec::with_capacity(pending.len() + 1);
3108        if let Some(initial) = self.initial.get(k) {
3109            updates.push((initial, Diff::ONE));
3110        }
3111        updates.extend(
3112            pending
3113                .into_iter()
3114                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3115        );
3116
3117        differential_dataflow::consolidation::consolidate(&mut updates);
3118        assert!(updates.len() <= 1);
3119        updates.into_iter().next().map(|(v, _)| v)
3120    }
3121
3122    /// Returns the items viewable in the current transaction. The items are
3123    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3124    /// [`Self::for_values`].
3125    // Used by tests.
3126    #[cfg(test)]
3127    fn items_cloned(&self) -> BTreeMap<K, V> {
3128        let mut items = BTreeMap::new();
3129        self.for_values(|k, v| {
3130            items.insert(k.clone(), v.clone());
3131        });
3132        items
3133    }
3134
3135    /// Returns the current items as proto-typed key-value pairs, suitable for
3136    /// constructing a [`Snapshot`]. This merges `initial` and `pending` to
3137    /// produce the current view and converts back to proto types.
3138    fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3139    where
3140        K: RustType<KP>,
3141        V: RustType<VP>,
3142        KP: Ord,
3143    {
3144        let mut items = BTreeMap::new();
3145        self.for_values(|k, v| {
3146            items.insert(k.into_proto(), v.into_proto());
3147        });
3148        items
3149    }
3150
3151    /// Returns the items viewable in the current transaction as references. Returns a map
3152    /// of references.
3153    fn items(&self) -> BTreeMap<&K, &V> {
3154        let mut items = BTreeMap::new();
3155        self.for_values(|k, v| {
3156            items.insert(k, v);
3157        });
3158        items
3159    }
3160
3161    /// Returns the values viewable in the current transaction as references.
3162    fn values(&self) -> BTreeSet<&V> {
3163        let mut items = BTreeSet::new();
3164        self.for_values(|_, v| {
3165            items.insert(v);
3166        });
3167        items
3168    }
3169
3170    /// Returns the number of items viewable in the current transaction.
3171    fn len(&self) -> usize {
3172        let mut count = 0;
3173        self.for_values(|_, _| {
3174            count += 1;
3175        });
3176        count
3177    }
3178
3179    /// Iterates over the items viewable in the current transaction, and provides a
3180    /// map where additional pending items can be inserted, which will be appended
3181    /// to current pending items. Does not verify uniqueness.
3182    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3183        &mut self,
3184        mut f: F,
3185    ) {
3186        let mut pending = BTreeMap::new();
3187        self.for_values(|k, v| f(&mut pending, k, v));
3188        for (k, updates) in pending {
3189            self.pending.entry(k).or_default().extend(updates);
3190        }
3191    }
3192
3193    /// Inserts a new k,v pair.
3194    ///
3195    /// Returns an error if the uniqueness check failed or the key already exists.
3196    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3197        let mut violation = None;
3198        self.for_values(|for_k, for_v| {
3199            if &k == for_k {
3200                violation = Some(DurableCatalogError::DuplicateKey);
3201            }
3202            if let Some(uniqueness_violation) = self.uniqueness_violation {
3203                if uniqueness_violation(for_v, &v) {
3204                    violation = Some(DurableCatalogError::UniquenessViolation);
3205                }
3206            }
3207        });
3208        if let Some(violation) = violation {
3209            return Err(violation);
3210        }
3211        self.pending.entry(k).or_default().push(TransactionUpdate {
3212            value: v,
3213            ts,
3214            diff: Diff::ONE,
3215        });
3216        soft_assert_no_log!(self.verify().is_ok());
3217        Ok(())
3218    }
3219
3220    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3221    /// value should be updated, otherwise `None`. Returns the number of changed
3222    /// entries.
3223    ///
3224    /// Returns an error if the uniqueness check failed.
3225    ///
3226    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3227    /// better performance.
3228    fn update<F: Fn(&K, &V) -> Option<V>>(
3229        &mut self,
3230        f: F,
3231        ts: Timestamp,
3232    ) -> Result<Diff, DurableCatalogError> {
3233        let mut changed = Diff::ZERO;
3234        let mut keys = BTreeSet::new();
3235        // Keep a copy of pending in case of uniqueness violation.
3236        let pending = self.pending.clone();
3237        self.for_values_mut(|p, k, v| {
3238            if let Some(next) = f(k, v) {
3239                changed += Diff::ONE;
3240                keys.insert(k.clone());
3241                let updates = p.entry(k.clone()).or_default();
3242                updates.push(TransactionUpdate {
3243                    value: v.clone(),
3244                    ts,
3245                    diff: Diff::MINUS_ONE,
3246                });
3247                updates.push(TransactionUpdate {
3248                    value: next,
3249                    ts,
3250                    diff: Diff::ONE,
3251                });
3252            }
3253        });
3254        // Check for uniqueness violation.
3255        if let Err(err) = self.verify_keys(&keys) {
3256            self.pending = pending;
3257            Err(err)
3258        } else {
3259            Ok(changed)
3260        }
3261    }
3262
3263    /// Updates `k`, `v` pair if `k` already exists in `self`.
3264    ///
3265    /// Returns `true` if `k` was updated, `false` otherwise.
3266    /// Returns an error if the uniqueness check failed.
3267    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3268        if let Some(cur_v) = self.get(&k) {
3269            if v != *cur_v {
3270                self.set(k, Some(v), ts)?;
3271            }
3272            Ok(true)
3273        } else {
3274            Ok(false)
3275        }
3276    }
3277
3278    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3279    ///
3280    /// Returns the number of changed entries.
3281    /// Returns an error if the uniqueness check failed.
3282    fn update_by_keys(
3283        &mut self,
3284        kvs: impl IntoIterator<Item = (K, V)>,
3285        ts: Timestamp,
3286    ) -> Result<Diff, DurableCatalogError> {
3287        let kvs: Vec<_> = kvs
3288            .into_iter()
3289            .filter_map(|(k, v)| match self.get(&k) {
3290                // Record if updating this entry would be a no-op.
3291                Some(cur_v) => Some((*cur_v == v, k, v)),
3292                None => None,
3293            })
3294            .collect();
3295        let changed = kvs.len();
3296        let changed =
3297            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3298        let kvs = kvs
3299            .into_iter()
3300            // Filter out no-ops to save some work.
3301            .filter(|(no_op, _, _)| !no_op)
3302            .map(|(_, k, v)| (k, Some(v)))
3303            .collect();
3304        self.set_many(kvs, ts)?;
3305        Ok(changed)
3306    }
3307
3308    /// Set the value for a key. Returns the previous entry if the key existed,
3309    /// otherwise None.
3310    ///
3311    /// Returns an error if the uniqueness check failed.
3312    ///
3313    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3314    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3315        let prev = self.get(&k).cloned();
3316        let entry = self.pending.entry(k.clone()).or_default();
3317        let restore_len = entry.len();
3318
3319        match (v, prev.clone()) {
3320            (Some(v), Some(prev)) => {
3321                entry.push(TransactionUpdate {
3322                    value: prev,
3323                    ts,
3324                    diff: Diff::MINUS_ONE,
3325                });
3326                entry.push(TransactionUpdate {
3327                    value: v,
3328                    ts,
3329                    diff: Diff::ONE,
3330                });
3331            }
3332            (Some(v), None) => {
3333                entry.push(TransactionUpdate {
3334                    value: v,
3335                    ts,
3336                    diff: Diff::ONE,
3337                });
3338            }
3339            (None, Some(prev)) => {
3340                entry.push(TransactionUpdate {
3341                    value: prev,
3342                    ts,
3343                    diff: Diff::MINUS_ONE,
3344                });
3345            }
3346            (None, None) => {}
3347        }
3348
3349        // Check for uniqueness violation.
3350        if let Err(err) = self.verify_keys([&k]) {
3351            // Revert self.pending to the state it was in before calling this
3352            // function.
3353            let pending = self.pending.get_mut(&k).expect("inserted above");
3354            pending.truncate(restore_len);
3355            Err(err)
3356        } else {
3357            Ok(prev)
3358        }
3359    }
3360
3361    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3362    /// otherwise None.
3363    ///
3364    /// Returns an error if any uniqueness check failed.
3365    fn set_many(
3366        &mut self,
3367        kvs: BTreeMap<K, Option<V>>,
3368        ts: Timestamp,
3369    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3370        if kvs.is_empty() {
3371            return Ok(BTreeMap::new());
3372        }
3373
3374        let mut prevs = BTreeMap::new();
3375        let mut restores = BTreeMap::new();
3376
3377        for (k, v) in kvs {
3378            let prev = self.get(&k).cloned();
3379            let entry = self.pending.entry(k.clone()).or_default();
3380            restores.insert(k.clone(), entry.len());
3381
3382            match (v, prev.clone()) {
3383                (Some(v), Some(prev)) => {
3384                    entry.push(TransactionUpdate {
3385                        value: prev,
3386                        ts,
3387                        diff: Diff::MINUS_ONE,
3388                    });
3389                    entry.push(TransactionUpdate {
3390                        value: v,
3391                        ts,
3392                        diff: Diff::ONE,
3393                    });
3394                }
3395                (Some(v), None) => {
3396                    entry.push(TransactionUpdate {
3397                        value: v,
3398                        ts,
3399                        diff: Diff::ONE,
3400                    });
3401                }
3402                (None, Some(prev)) => {
3403                    entry.push(TransactionUpdate {
3404                        value: prev,
3405                        ts,
3406                        diff: Diff::MINUS_ONE,
3407                    });
3408                }
3409                (None, None) => {}
3410            }
3411
3412            prevs.insert(k, prev);
3413        }
3414
3415        // Check for uniqueness violation.
3416        if let Err(err) = self.verify_keys(prevs.keys()) {
3417            for (k, restore_len) in restores {
3418                // Revert self.pending to the state it was in before calling this
3419                // function.
3420                let pending = self.pending.get_mut(&k).expect("inserted above");
3421                pending.truncate(restore_len);
3422            }
3423            Err(err)
3424        } else {
3425            Ok(prevs)
3426        }
3427    }
3428
3429    /// Deletes items for which `f` returns true. Returns the keys and values of
3430    /// the deleted entries.
3431    ///
3432    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3433    /// better performance.
3434    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3435        let mut deleted = Vec::new();
3436        self.for_values_mut(|p, k, v| {
3437            if f(k, v) {
3438                deleted.push((k.clone(), v.clone()));
3439                p.entry(k.clone()).or_default().push(TransactionUpdate {
3440                    value: v.clone(),
3441                    ts,
3442                    diff: Diff::MINUS_ONE,
3443                });
3444            }
3445        });
3446        soft_assert_no_log!(self.verify().is_ok());
3447        deleted
3448    }
3449
3450    /// Deletes item with key `k`.
3451    ///
3452    /// Returns the value of the deleted entry, if it existed.
3453    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3454        self.set(k, None, ts)
3455            .expect("deleting an entry cannot violate uniqueness")
3456    }
3457
3458    /// Deletes items with key in `ks`.
3459    ///
3460    /// Returns the keys and values of the deleted entries.
3461    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3462        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3463        let prevs = self
3464            .set_many(kvs, ts)
3465            .expect("deleting entries cannot violate uniqueness");
3466        prevs
3467            .into_iter()
3468            .filter_map(|(k, v)| v.map(|v| (k, v)))
3469            .collect()
3470    }
3471}
3472
3473#[cfg(test)]
3474#[allow(clippy::unwrap_used)]
3475mod tests {
3476    use super::*;
3477
3478    use mz_ore::now::SYSTEM_TIME;
3479    use mz_ore::{assert_none, assert_ok};
3480    use mz_persist_client::cache::PersistClientCache;
3481    use mz_persist_types::PersistLocation;
3482    use semver::Version;
3483
3484    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3485    use crate::memory;
3486
3487    #[mz_ore::test]
3488    fn test_table_transaction_simple() {
3489        fn uniqueness_violation(a: &String, b: &String) -> bool {
3490            a == b
3491        }
3492        let mut table = TableTransaction::new_with_uniqueness_fn(
3493            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3494            uniqueness_violation,
3495        )
3496        .unwrap();
3497
3498        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3499        // for DurableCatalogError.
3500        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3501        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3502        assert!(
3503            table
3504                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3505                .is_err()
3506        );
3507        assert!(
3508            table
3509                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3510                .is_err()
3511        );
3512    }
3513
3514    #[mz_ore::test]
3515    fn test_table_transaction() {
3516        fn uniqueness_violation(a: &String, b: &String) -> bool {
3517            a == b
3518        }
3519        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3520
3521        fn commit(
3522            table: &mut BTreeMap<Vec<u8>, String>,
3523            mut pending: Vec<(Vec<u8>, String, Diff)>,
3524        ) {
3525            // Sort by diff so that we process retractions first.
3526            pending.sort_by(|a, b| a.2.cmp(&b.2));
3527            for (k, v, diff) in pending {
3528                if diff == Diff::MINUS_ONE {
3529                    let prev = table.remove(&k);
3530                    assert_eq!(prev, Some(v));
3531                } else if diff == Diff::ONE {
3532                    let prev = table.insert(k, v);
3533                    assert_eq!(prev, None);
3534                } else {
3535                    panic!("unexpected diff: {diff}");
3536                }
3537            }
3538        }
3539
3540        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3541        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3542        let mut table_txn =
3543            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3544        assert_eq!(table_txn.items_cloned(), table);
3545        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3546        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3547        assert_eq!(
3548            table_txn.items_cloned(),
3549            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3550        );
3551        assert_eq!(
3552            table_txn
3553                .update(|_k, _v| Some("v3".to_string()), 2)
3554                .unwrap(),
3555            Diff::ONE
3556        );
3557
3558        // Uniqueness violation.
3559        table_txn
3560            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3561            .unwrap_err();
3562
3563        table_txn
3564            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3565            .unwrap();
3566        assert_eq!(
3567            table_txn.items_cloned(),
3568            BTreeMap::from([
3569                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3570                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3571            ])
3572        );
3573        let err = table_txn
3574            .update(|_k, _v| Some("v1".to_string()), 5)
3575            .unwrap_err();
3576        assert!(
3577            matches!(err, DurableCatalogError::UniquenessViolation),
3578            "unexpected err: {err:?}"
3579        );
3580        let pending = table_txn.pending();
3581        assert_eq!(
3582            pending,
3583            vec![
3584                (
3585                    1i64.to_le_bytes().to_vec(),
3586                    "v1".to_string(),
3587                    Diff::MINUS_ONE
3588                ),
3589                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3590                (
3591                    2i64.to_le_bytes().to_vec(),
3592                    "v2".to_string(),
3593                    Diff::MINUS_ONE
3594                ),
3595                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3596            ]
3597        );
3598        commit(&mut table, pending);
3599        assert_eq!(
3600            table,
3601            BTreeMap::from([
3602                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3603                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3604            ])
3605        );
3606
3607        let mut table_txn =
3608            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3609        // Deleting then creating an item that has a uniqueness violation should work.
3610        assert_eq!(
3611            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3612            1
3613        );
3614        table_txn
3615            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3616            .unwrap();
3617        // Uniqueness violation in value.
3618        table_txn
3619            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3620            .unwrap_err();
3621        // Key already exists, expect error.
3622        table_txn
3623            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3624            .unwrap_err();
3625        assert_eq!(
3626            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3627            1
3628        );
3629        // Both the inserts work now because the key and uniqueness violation are gone.
3630        table_txn
3631            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3632            .unwrap();
3633        table_txn
3634            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3635            .unwrap();
3636        let pending = table_txn.pending();
3637        assert_eq!(
3638            pending,
3639            vec![
3640                (
3641                    1i64.to_le_bytes().to_vec(),
3642                    "v3".to_string(),
3643                    Diff::MINUS_ONE
3644                ),
3645                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3646                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3647            ]
3648        );
3649        commit(&mut table, pending);
3650        assert_eq!(
3651            table,
3652            BTreeMap::from([
3653                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3654                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3655                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3656            ])
3657        );
3658
3659        let mut table_txn =
3660            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3661        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3662        table_txn
3663            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3664            .unwrap();
3665
3666        commit(&mut table, table_txn.pending());
3667        assert_eq!(
3668            table,
3669            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3670        );
3671
3672        let mut table_txn =
3673            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3674        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3675        table_txn
3676            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3677            .unwrap();
3678        commit(&mut table, table_txn.pending());
3679        assert_eq!(
3680            table,
3681            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3682        );
3683
3684        // Verify we don't try to delete v3 or v4 during commit.
3685        let mut table_txn =
3686            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3687        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3688        table_txn
3689            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3690            .unwrap();
3691        table_txn
3692            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3693            .unwrap_err();
3694        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3695        table_txn
3696            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3697            .unwrap();
3698        commit(&mut table, table_txn.pending());
3699        assert_eq!(
3700            table.clone().into_iter().collect::<Vec<_>>(),
3701            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3702        );
3703
3704        // Test `set`.
3705        let mut table_txn =
3706            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3707        // Uniqueness violation.
3708        table_txn
3709            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3710            .unwrap_err();
3711        table_txn
3712            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3713            .unwrap();
3714        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3715        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3716        let pending = table_txn.pending();
3717        assert_eq!(
3718            pending,
3719            vec![
3720                (
3721                    1i64.to_le_bytes().to_vec(),
3722                    "v5".to_string(),
3723                    Diff::MINUS_ONE
3724                ),
3725                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3726            ]
3727        );
3728        commit(&mut table, pending);
3729        assert_eq!(
3730            table,
3731            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3732        );
3733
3734        // Duplicate `set`.
3735        let mut table_txn =
3736            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3737        table_txn
3738            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3739            .unwrap();
3740        let pending = table_txn.pending::<Vec<u8>, String>();
3741        assert!(pending.is_empty());
3742
3743        // Test `set_many`.
3744        let mut table_txn =
3745            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3746        // Uniqueness violation.
3747        table_txn
3748            .set_many(
3749                BTreeMap::from([
3750                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3751                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3752                ]),
3753                0,
3754            )
3755            .unwrap_err();
3756        table_txn
3757            .set_many(
3758                BTreeMap::from([
3759                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3760                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3761                ]),
3762                1,
3763            )
3764            .unwrap();
3765        table_txn
3766            .set_many(
3767                BTreeMap::from([
3768                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3769                    (3i64.to_le_bytes().to_vec(), None),
3770                ]),
3771                2,
3772            )
3773            .unwrap();
3774        let pending = table_txn.pending();
3775        assert_eq!(
3776            pending,
3777            vec![
3778                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3779                (
3780                    3i64.to_le_bytes().to_vec(),
3781                    "v6".to_string(),
3782                    Diff::MINUS_ONE
3783                ),
3784                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3785            ]
3786        );
3787        commit(&mut table, pending);
3788        assert_eq!(
3789            table,
3790            BTreeMap::from([
3791                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3792                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3793            ])
3794        );
3795
3796        // Duplicate `set_many`.
3797        let mut table_txn =
3798            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3799        table_txn
3800            .set_many(
3801                BTreeMap::from([
3802                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3803                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3804                ]),
3805                0,
3806            )
3807            .unwrap();
3808        let pending = table_txn.pending::<Vec<u8>, String>();
3809        assert!(pending.is_empty());
3810        commit(&mut table, pending);
3811        assert_eq!(
3812            table,
3813            BTreeMap::from([
3814                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3815                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3816            ])
3817        );
3818
3819        // Test `update_by_key`
3820        let mut table_txn =
3821            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3822        // Uniqueness violation.
3823        table_txn
3824            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3825            .unwrap_err();
3826        assert!(
3827            table_txn
3828                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3829                .unwrap()
3830        );
3831        assert!(
3832            !table_txn
3833                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3834                .unwrap()
3835        );
3836        let pending = table_txn.pending();
3837        assert_eq!(
3838            pending,
3839            vec![
3840                (
3841                    1i64.to_le_bytes().to_vec(),
3842                    "v6".to_string(),
3843                    Diff::MINUS_ONE
3844                ),
3845                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3846            ]
3847        );
3848        commit(&mut table, pending);
3849        assert_eq!(
3850            table,
3851            BTreeMap::from([
3852                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3853                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3854            ])
3855        );
3856
3857        // Duplicate `update_by_key`.
3858        let mut table_txn =
3859            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3860        assert!(
3861            table_txn
3862                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3863                .unwrap()
3864        );
3865        let pending = table_txn.pending::<Vec<u8>, String>();
3866        assert!(pending.is_empty());
3867        commit(&mut table, pending);
3868        assert_eq!(
3869            table,
3870            BTreeMap::from([
3871                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3872                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3873            ])
3874        );
3875
3876        // Test `update_by_keys`
3877        let mut table_txn =
3878            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3879        // Uniqueness violation.
3880        table_txn
3881            .update_by_keys(
3882                [
3883                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3884                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3885                ],
3886                0,
3887            )
3888            .unwrap_err();
3889        let n = table_txn
3890            .update_by_keys(
3891                [
3892                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3893                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3894                ],
3895                1,
3896            )
3897            .unwrap();
3898        assert_eq!(n, Diff::ONE);
3899        let n = table_txn
3900            .update_by_keys(
3901                [
3902                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3903                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3904                ],
3905                2,
3906            )
3907            .unwrap();
3908        assert_eq!(n, Diff::ZERO);
3909        let pending = table_txn.pending();
3910        assert_eq!(
3911            pending,
3912            vec![
3913                (
3914                    1i64.to_le_bytes().to_vec(),
3915                    "v8".to_string(),
3916                    Diff::MINUS_ONE
3917                ),
3918                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3919            ]
3920        );
3921        commit(&mut table, pending);
3922        assert_eq!(
3923            table,
3924            BTreeMap::from([
3925                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3926                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3927            ])
3928        );
3929
3930        // Duplicate `update_by_keys`.
3931        let mut table_txn =
3932            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3933        let n = table_txn
3934            .update_by_keys(
3935                [
3936                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3937                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3938                ],
3939                0,
3940            )
3941            .unwrap();
3942        assert_eq!(n, Diff::from(2));
3943        let pending = table_txn.pending::<Vec<u8>, String>();
3944        assert!(pending.is_empty());
3945        commit(&mut table, pending);
3946        assert_eq!(
3947            table,
3948            BTreeMap::from([
3949                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3950                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3951            ])
3952        );
3953
3954        // Test `delete_by_key`
3955        let mut table_txn =
3956            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3957        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3958        assert_eq!(prev, Some("v9".to_string()));
3959        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3960        assert_none!(prev);
3961        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3962        assert_none!(prev);
3963        let pending = table_txn.pending();
3964        assert_eq!(
3965            pending,
3966            vec![(
3967                1i64.to_le_bytes().to_vec(),
3968                "v9".to_string(),
3969                Diff::MINUS_ONE
3970            ),]
3971        );
3972        commit(&mut table, pending);
3973        assert_eq!(
3974            table,
3975            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3976        );
3977
3978        // Test `delete_by_keys`
3979        let mut table_txn =
3980            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3981        let prevs = table_txn.delete_by_keys(
3982            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3983            0,
3984        );
3985        assert_eq!(
3986            prevs,
3987            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3988        );
3989        let prevs = table_txn.delete_by_keys(
3990            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3991            1,
3992        );
3993        assert_eq!(prevs, vec![]);
3994        let prevs = table_txn.delete_by_keys(
3995            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3996            2,
3997        );
3998        assert_eq!(prevs, vec![]);
3999        let pending = table_txn.pending();
4000        assert_eq!(
4001            pending,
4002            vec![(
4003                42i64.to_le_bytes().to_vec(),
4004                "v7".to_string(),
4005                Diff::MINUS_ONE
4006            ),]
4007        );
4008        commit(&mut table, pending);
4009        assert_eq!(table, BTreeMap::new());
4010    }
4011
4012    #[mz_ore::test(tokio::test)]
4013    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
4014    async fn test_savepoint() {
4015        const VERSION: Version = Version::new(26, 0, 0);
4016        let mut persist_cache = PersistClientCache::new_no_metrics();
4017        persist_cache.cfg.build_version = VERSION;
4018        let persist_client = persist_cache
4019            .open(PersistLocation::new_in_mem())
4020            .await
4021            .unwrap();
4022        let state_builder = TestCatalogStateBuilder::new(persist_client)
4023            .with_default_deploy_generation()
4024            .with_version(VERSION);
4025
4026        // Initialize catalog.
4027        let _ = state_builder
4028            .clone()
4029            .unwrap_build()
4030            .await
4031            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4032            .await
4033            .unwrap()
4034            .0;
4035        let mut savepoint_state = state_builder
4036            .unwrap_build()
4037            .await
4038            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4039            .await
4040            .unwrap()
4041            .0;
4042
4043        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4044        assert!(!initial_snapshot.is_empty());
4045
4046        let db_name = "db";
4047        let db_owner = RoleId::User(42);
4048        let db_privileges = Vec::new();
4049        let mut txn = savepoint_state.transaction().await.unwrap();
4050        let (db_id, db_oid) = txn
4051            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4052            .unwrap();
4053        let commit_ts = txn.upper();
4054        txn.commit_internal(commit_ts).await.unwrap();
4055        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4056        let update = updates.into_element();
4057
4058        assert_eq!(update.diff, StateDiff::Addition);
4059
4060        let db = match update.kind {
4061            memory::objects::StateUpdateKind::Database(db) => db,
4062            update => panic!("unexpected update: {update:?}"),
4063        };
4064
4065        assert_eq!(db_id, db.id);
4066        assert_eq!(db_oid, db.oid);
4067        assert_eq!(db_name, db.name);
4068        assert_eq!(db_owner, db.owner_id);
4069        assert_eq!(db_privileges, db.privileges);
4070    }
4071
4072    #[mz_ore::test]
4073    fn test_allocate_introspection_source_index_id() {
4074        let cluster_variant: u8 = 0b0000_0001;
4075        let cluster_id_inner: u64 =
4076            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4077        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4078
4079        let cluster_id = ClusterId::System(cluster_id_inner);
4080        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4081
4082        let introspection_source_index_id: u64 =
4083            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4084
4085        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4086        {
4087            let mut cluster_variant_mask = 0xFF << 56;
4088            cluster_variant_mask &= introspection_source_index_id;
4089            cluster_variant_mask >>= 56;
4090            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4091        }
4092
4093        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4094        {
4095            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4096            cluster_id_inner_mask &= introspection_source_index_id;
4097            cluster_id_inner_mask >>= 8;
4098            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4099        }
4100
4101        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4102        {
4103            let mut log_variant_mask = 0xFF;
4104            log_variant_mask &= introspection_source_index_id;
4105            assert_eq!(
4106                log_variant_mask,
4107                u64::from(timely_messages_received_log_variant)
4108            );
4109        }
4110
4111        let (catalog_item_id, global_id) =
4112            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4113
4114        assert_eq!(
4115            catalog_item_id,
4116            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4117        );
4118        assert_eq!(
4119            global_id,
4120            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4121        );
4122    }
4123}