Skip to main content

mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35    RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53    ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54    Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55    DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57    ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67    SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68    SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69    USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70    USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
77/// An operation also logically groups multiple catalog updates together.
78#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81    #[derivative(Debug = "ignore")]
82    #[derivative(PartialEq = "ignore")]
83    durable_catalog: &'a mut dyn DurableCatalogState,
84    databases: TableTransaction<DatabaseKey, DatabaseValue>,
85    schemas: TableTransaction<SchemaKey, SchemaValue>,
86    items: TableTransaction<ItemKey, ItemValue>,
87    comments: TableTransaction<CommentKey, CommentValue>,
88    roles: TableTransaction<RoleKey, RoleValue>,
89    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90    clusters: TableTransaction<ClusterKey, ClusterValue>,
91    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92    introspection_sources:
93        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95    configs: TableTransaction<ConfigKey, ConfigValue>,
96    settings: TableTransaction<SettingKey, SettingValue>,
97    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103    storage_collection_metadata:
104        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107    // Don't make this a table transaction so that it's not read into the
108    // in-memory cache.
109    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110    /// The upper of `durable_catalog` at the start of the transaction.
111    upper: mz_repr::Timestamp,
112    /// The ID of the current operation of this transaction.
113    op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117    pub fn new(
118        durable_catalog: &'a mut dyn DurableCatalogState,
119        Snapshot {
120            databases,
121            schemas,
122            roles,
123            role_auth,
124            items,
125            comments,
126            clusters,
127            network_policies,
128            cluster_replicas,
129            introspection_sources,
130            id_allocator,
131            configs,
132            settings,
133            source_references,
134            system_object_mappings,
135            system_configurations,
136            default_privileges,
137            system_privileges,
138            storage_collection_metadata,
139            unfinalized_shards,
140            txn_wal_shard,
141        }: Snapshot,
142        upper: mz_repr::Timestamp,
143    ) -> Result<Transaction<'a>, CatalogError> {
144        Ok(Transaction {
145            durable_catalog,
146            databases: TableTransaction::new_with_uniqueness_fn(
147                databases,
148                |a: &DatabaseValue, b| a.name == b.name,
149            )?,
150            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151                a.database_id == b.database_id && a.name == b.name
152            })?,
153            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154                a.schema_id == b.schema_id && a.name == b.name && {
155                    // `item_type` is slow, only compute if needed.
156                    let a_type = a.item_type();
157                    let b_type = b.item_type();
158                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161                }
162            })?,
163            comments: TableTransaction::new(comments)?,
164            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165                a.name == b.name
166            })?,
167            role_auth: TableTransaction::new(role_auth)?,
168            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169                a.name == b.name
170            })?,
171            network_policies: TableTransaction::new_with_uniqueness_fn(
172                network_policies,
173                |a: &NetworkPolicyValue, b| a.name == b.name,
174            )?,
175            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176                cluster_replicas,
177                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178            )?,
179            introspection_sources: TableTransaction::new(introspection_sources)?,
180            id_allocator: TableTransaction::new(id_allocator)?,
181            configs: TableTransaction::new(configs)?,
182            settings: TableTransaction::new(settings)?,
183            source_references: TableTransaction::new(source_references)?,
184            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185            system_configurations: TableTransaction::new(system_configurations)?,
186            default_privileges: TableTransaction::new(default_privileges)?,
187            system_privileges: TableTransaction::new(system_privileges)?,
188            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190            // Uniqueness violations for this value occur at the key rather than
191            // the value (the key is the unit struct `()` so this is a singleton
192            // value).
193            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194            audit_log_updates: Vec::new(),
195            upper,
196            op_id: 0,
197        })
198    }
199
200    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201        let key = ItemKey { id: *id };
202        self.items
203            .get(&key)
204            .map(|v| DurableType::from_key_value(key, v.clone()))
205    }
206
207    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208        self.items
209            .items()
210            .into_iter()
211            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212            .sorted_by_key(|Item { id, .. }| *id)
213    }
214
215    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216        self.insert_audit_log_events([event]);
217    }
218
219    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220        let events = events
221            .into_iter()
222            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223        self.audit_log_updates.extend(events);
224    }
225
226    pub fn insert_user_database(
227        &mut self,
228        database_name: &str,
229        owner_id: RoleId,
230        privileges: Vec<MzAclItem>,
231        temporary_oids: &HashSet<u32>,
232    ) -> Result<(DatabaseId, u32), CatalogError> {
233        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234        let id = DatabaseId::User(id);
235        let oid = self.allocate_oid(temporary_oids)?;
236        self.insert_database(id, database_name, owner_id, privileges, oid)?;
237        Ok((id, oid))
238    }
239
240    pub(crate) fn insert_database(
241        &mut self,
242        id: DatabaseId,
243        database_name: &str,
244        owner_id: RoleId,
245        privileges: Vec<MzAclItem>,
246        oid: u32,
247    ) -> Result<u32, CatalogError> {
248        match self.databases.insert(
249            DatabaseKey { id },
250            DatabaseValue {
251                name: database_name.to_string(),
252                owner_id,
253                privileges,
254                oid,
255            },
256            self.op_id,
257        ) {
258            Ok(_) => Ok(oid),
259            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260        }
261    }
262
263    pub fn insert_user_schema(
264        &mut self,
265        database_id: DatabaseId,
266        schema_name: &str,
267        owner_id: RoleId,
268        privileges: Vec<MzAclItem>,
269        temporary_oids: &HashSet<u32>,
270    ) -> Result<(SchemaId, u32), CatalogError> {
271        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272        let id = SchemaId::User(id);
273        let oid = self.allocate_oid(temporary_oids)?;
274        self.insert_schema(
275            id,
276            Some(database_id),
277            schema_name.to_string(),
278            owner_id,
279            privileges,
280            oid,
281        )?;
282        Ok((id, oid))
283    }
284
285    pub fn insert_system_schema(
286        &mut self,
287        schema_id: u64,
288        schema_name: &str,
289        owner_id: RoleId,
290        privileges: Vec<MzAclItem>,
291        oid: u32,
292    ) -> Result<(), CatalogError> {
293        let id = SchemaId::System(schema_id);
294        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295    }
296
297    pub(crate) fn insert_schema(
298        &mut self,
299        schema_id: SchemaId,
300        database_id: Option<DatabaseId>,
301        schema_name: String,
302        owner_id: RoleId,
303        privileges: Vec<MzAclItem>,
304        oid: u32,
305    ) -> Result<(), CatalogError> {
306        match self.schemas.insert(
307            SchemaKey { id: schema_id },
308            SchemaValue {
309                database_id,
310                name: schema_name.clone(),
311                owner_id,
312                privileges,
313                oid,
314            },
315            self.op_id,
316        ) {
317            Ok(_) => Ok(()),
318            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319        }
320    }
321
322    pub fn insert_builtin_role(
323        &mut self,
324        id: RoleId,
325        name: String,
326        attributes: RoleAttributesRaw,
327        membership: RoleMembership,
328        vars: RoleVars,
329        oid: u32,
330    ) -> Result<RoleId, CatalogError> {
331        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332        self.insert_role(id, name, attributes, membership, vars, oid)?;
333        Ok(id)
334    }
335
336    pub fn insert_user_role(
337        &mut self,
338        name: String,
339        attributes: RoleAttributesRaw,
340        membership: RoleMembership,
341        vars: RoleVars,
342        temporary_oids: &HashSet<u32>,
343    ) -> Result<(RoleId, u32), CatalogError> {
344        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345        let id = RoleId::User(id);
346        let oid = self.allocate_oid(temporary_oids)?;
347        self.insert_role(id, name, attributes, membership, vars, oid)?;
348        Ok((id, oid))
349    }
350
351    fn insert_role(
352        &mut self,
353        id: RoleId,
354        name: String,
355        attributes: RoleAttributesRaw,
356        membership: RoleMembership,
357        vars: RoleVars,
358        oid: u32,
359    ) -> Result<(), CatalogError> {
360        if let Some(ref password) = attributes.password {
361            let hash = mz_auth::hash::scram256_hash(
362                password,
363                &attributes
364                    .scram_iterations
365                    .or_else(|| {
366                        soft_panic_or_log!(
367                            "Hash iterations must be set if a password is provided."
368                        );
369                        None
370                    })
371                    // This should never happen, but rather than panicking we'll
372                    // set a known secure value as a fallback.
373                    .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374            )
375            .expect("password hash should be valid");
376            match self.role_auth.insert(
377                RoleAuthKey { role_id: id },
378                RoleAuthValue {
379                    password_hash: Some(hash),
380                    updated_at: SYSTEM_TIME(),
381                },
382                self.op_id,
383            ) {
384                Ok(_) => {}
385                Err(_) => {
386                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387                }
388            }
389        }
390
391        match self.roles.insert(
392            RoleKey { id },
393            RoleValue {
394                name: name.clone(),
395                attributes: attributes.into(),
396                membership,
397                vars,
398                oid,
399            },
400            self.op_id,
401        ) {
402            Ok(_) => Ok(()),
403            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404        }
405    }
406
407    /// Panics if any introspection source id is not a system id
408    pub fn insert_user_cluster(
409        &mut self,
410        cluster_id: ClusterId,
411        cluster_name: &str,
412        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413        owner_id: RoleId,
414        privileges: Vec<MzAclItem>,
415        config: ClusterConfig,
416        temporary_oids: &HashSet<u32>,
417    ) -> Result<(), CatalogError> {
418        self.insert_cluster(
419            cluster_id,
420            cluster_name,
421            introspection_source_indexes,
422            owner_id,
423            privileges,
424            config,
425            temporary_oids,
426        )
427    }
428
429    /// Panics if any introspection source id is not a system id
430    pub fn insert_system_cluster(
431        &mut self,
432        cluster_name: &str,
433        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434        privileges: Vec<MzAclItem>,
435        owner_id: RoleId,
436        config: ClusterConfig,
437        temporary_oids: &HashSet<u32>,
438    ) -> Result<(), CatalogError> {
439        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441        self.insert_cluster(
442            cluster_id,
443            cluster_name,
444            introspection_source_indexes,
445            owner_id,
446            privileges,
447            config,
448            temporary_oids,
449        )
450    }
451
452    fn insert_cluster(
453        &mut self,
454        cluster_id: ClusterId,
455        cluster_name: &str,
456        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457        owner_id: RoleId,
458        privileges: Vec<MzAclItem>,
459        config: ClusterConfig,
460        temporary_oids: &HashSet<u32>,
461    ) -> Result<(), CatalogError> {
462        if let Err(_) = self.clusters.insert(
463            ClusterKey { id: cluster_id },
464            ClusterValue {
465                name: cluster_name.to_string(),
466                owner_id,
467                privileges,
468                config,
469            },
470            self.op_id,
471        ) {
472            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473        };
474
475        let amount = usize_to_u64(introspection_source_indexes.len());
476        let oids = self.allocate_oids(amount, temporary_oids)?;
477        let introspection_source_indexes: Vec<_> = introspection_source_indexes
478            .into_iter()
479            .zip_eq(oids)
480            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481            .collect();
482        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483            let introspection_source_index = IntrospectionSourceIndex {
484                cluster_id,
485                name: builtin.name.to_string(),
486                item_id,
487                index_id,
488                oid,
489            };
490            let (key, value) = introspection_source_index.into_key_value();
491            self.introspection_sources
492                .insert(key, value, self.op_id)
493                .expect("no uniqueness violation");
494        }
495
496        Ok(())
497    }
498
499    pub fn rename_cluster(
500        &mut self,
501        cluster_id: ClusterId,
502        cluster_name: &str,
503        cluster_to_name: &str,
504    ) -> Result<(), CatalogError> {
505        let key = ClusterKey { id: cluster_id };
506
507        match self.clusters.update(
508            |k, v| {
509                if *k == key {
510                    let mut value = v.clone();
511                    value.name = cluster_to_name.to_string();
512                    Some(value)
513                } else {
514                    None
515                }
516            },
517            self.op_id,
518        )? {
519            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520            Diff::ONE => Ok(()),
521            n => panic!(
522                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523            ),
524        }
525    }
526
527    pub fn rename_cluster_replica(
528        &mut self,
529        replica_id: ReplicaId,
530        replica_name: &QualifiedReplica,
531        replica_to_name: &str,
532    ) -> Result<(), CatalogError> {
533        let key = ClusterReplicaKey { id: replica_id };
534
535        match self.cluster_replicas.update(
536            |k, v| {
537                if *k == key {
538                    let mut value = v.clone();
539                    value.name = replica_to_name.to_string();
540                    Some(value)
541                } else {
542                    None
543                }
544            },
545            self.op_id,
546        )? {
547            Diff::ZERO => {
548                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549            }
550            Diff::ONE => Ok(()),
551            n => panic!(
552                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553            ),
554        }
555    }
556
557    pub fn insert_cluster_replica(
558        &mut self,
559        cluster_id: ClusterId,
560        replica_name: &str,
561        config: ReplicaConfig,
562        owner_id: RoleId,
563    ) -> Result<ReplicaId, CatalogError> {
564        let replica_id = match cluster_id {
565            ClusterId::System(_) => self.allocate_system_replica_id()?,
566            ClusterId::User(_) => self.allocate_user_replica_id()?,
567        };
568        self.insert_cluster_replica_with_id(
569            cluster_id,
570            replica_id,
571            replica_name,
572            config,
573            owner_id,
574        )?;
575        Ok(replica_id)
576    }
577
578    pub(crate) fn insert_cluster_replica_with_id(
579        &mut self,
580        cluster_id: ClusterId,
581        replica_id: ReplicaId,
582        replica_name: &str,
583        config: ReplicaConfig,
584        owner_id: RoleId,
585    ) -> Result<(), CatalogError> {
586        if let Err(_) = self.cluster_replicas.insert(
587            ClusterReplicaKey { id: replica_id },
588            ClusterReplicaValue {
589                cluster_id,
590                name: replica_name.into(),
591                config,
592                owner_id,
593            },
594            self.op_id,
595        ) {
596            let cluster = self
597                .clusters
598                .get(&ClusterKey { id: cluster_id })
599                .expect("cluster exists");
600            return Err(SqlCatalogError::DuplicateReplica(
601                replica_name.to_string(),
602                cluster.name.to_string(),
603            )
604            .into());
605        };
606        Ok(())
607    }
608
609    pub fn insert_user_network_policy(
610        &mut self,
611        name: String,
612        rules: Vec<NetworkPolicyRule>,
613        privileges: Vec<MzAclItem>,
614        owner_id: RoleId,
615        temporary_oids: &HashSet<u32>,
616    ) -> Result<NetworkPolicyId, CatalogError> {
617        let oid = self.allocate_oid(temporary_oids)?;
618        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619        let id = NetworkPolicyId::User(id);
620        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621    }
622
623    pub fn insert_network_policy(
624        &mut self,
625        id: NetworkPolicyId,
626        name: String,
627        rules: Vec<NetworkPolicyRule>,
628        privileges: Vec<MzAclItem>,
629        owner_id: RoleId,
630        oid: u32,
631    ) -> Result<NetworkPolicyId, CatalogError> {
632        match self.network_policies.insert(
633            NetworkPolicyKey { id },
634            NetworkPolicyValue {
635                name: name.clone(),
636                rules,
637                privileges,
638                owner_id,
639                oid,
640            },
641            self.op_id,
642        ) {
643            Ok(_) => Ok(id),
644            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645        }
646    }
647
648    /// Updates persisted information about persisted introspection source
649    /// indexes.
650    ///
651    /// Panics if provided id is not a system id.
652    pub fn update_introspection_source_index_gids(
653        &mut self,
654        mappings: impl Iterator<
655            Item = (
656                ClusterId,
657                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658            ),
659        >,
660    ) -> Result<(), CatalogError> {
661        for (cluster_id, updates) in mappings {
662            for (name, item_id, index_id, oid) in updates {
663                let introspection_source_index = IntrospectionSourceIndex {
664                    cluster_id,
665                    name,
666                    item_id,
667                    index_id,
668                    oid,
669                };
670                let (key, value) = introspection_source_index.into_key_value();
671
672                let prev = self
673                    .introspection_sources
674                    .set(key, Some(value), self.op_id)?;
675                if prev.is_none() {
676                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677                        "{index_id}"
678                    ))
679                    .into());
680                }
681            }
682        }
683        Ok(())
684    }
685
686    pub fn insert_user_item(
687        &mut self,
688        id: CatalogItemId,
689        global_id: GlobalId,
690        schema_id: SchemaId,
691        item_name: &str,
692        create_sql: String,
693        owner_id: RoleId,
694        privileges: Vec<MzAclItem>,
695        temporary_oids: &HashSet<u32>,
696        versions: BTreeMap<RelationVersion, GlobalId>,
697    ) -> Result<u32, CatalogError> {
698        let oid = self.allocate_oid(temporary_oids)?;
699        self.insert_item(
700            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701        )?;
702        Ok(oid)
703    }
704
705    pub fn insert_item(
706        &mut self,
707        id: CatalogItemId,
708        oid: u32,
709        global_id: GlobalId,
710        schema_id: SchemaId,
711        item_name: &str,
712        create_sql: String,
713        owner_id: RoleId,
714        privileges: Vec<MzAclItem>,
715        extra_versions: BTreeMap<RelationVersion, GlobalId>,
716    ) -> Result<(), CatalogError> {
717        match self.items.insert(
718            ItemKey { id },
719            ItemValue {
720                schema_id,
721                name: item_name.to_string(),
722                create_sql,
723                owner_id,
724                privileges,
725                oid,
726                global_id,
727                extra_versions,
728            },
729            self.op_id,
730        ) {
731            Ok(_) => Ok(()),
732            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733        }
734    }
735
736    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738    }
739
740    pub fn get_and_increment_id_by(
741        &mut self,
742        key: String,
743        amount: u64,
744    ) -> Result<Vec<u64>, CatalogError> {
745        assert!(
746            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747            "system item IDs cannot be allocated outside of bootstrap"
748        );
749
750        let current_id = self
751            .id_allocator
752            .items()
753            .get(&IdAllocKey { name: key.clone() })
754            .unwrap_or_else(|| panic!("{key} id allocator missing"))
755            .next_id;
756        let next_id = current_id
757            .checked_add(amount)
758            .ok_or(SqlCatalogError::IdExhaustion)?;
759        let prev = self.id_allocator.set(
760            IdAllocKey { name: key },
761            Some(IdAllocValue { next_id }),
762            self.op_id,
763        )?;
764        assert_eq!(
765            prev,
766            Some(IdAllocValue {
767                next_id: current_id
768            })
769        );
770        Ok((current_id..next_id).collect())
771    }
772
773    pub fn allocate_system_item_ids(
774        &mut self,
775        amount: u64,
776    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777        assert!(
778            !self.durable_catalog.is_bootstrap_complete(),
779            "we can only allocate system item IDs during bootstrap"
780        );
781        Ok(self
782            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783            .into_iter()
784            // TODO(alter_table): Use separate ID allocators.
785            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786            .collect())
787    }
788
789    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
790    /// from the `cluster_id` and `log_variant`.
791    ///
792    /// Introspection source indexes are a special edge case of items. They are considered system
793    /// items, but they are the only system item that can be created by the user at any time. All
794    /// other system items can only be created by the system during the startup of an upgrade.
795    ///
796    /// Furthermore, all other system item IDs are allocated deterministically in the same order
797    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
798    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
799    /// only one of them can successfully write the IDs down to the catalog. This removes the need
800    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
801    ///
802    /// Since introspection IDs can be allocated at any time, read-only instances would either need
803    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
804    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
805    ///
806    /// Introspection source index IDs are 64 bit integers, with the following format (not to
807    /// scale):
808    ///
809    /// -------------------------------------------------------------
810    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
811    /// |--------------------|------------------------|-------------|
812    /// |       8-bits       |         48-bits        |   8-bits    |
813    /// -------------------------------------------------------------
814    ///
815    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
816    ///                          to.
817    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
818    ///                          belongs to.
819    /// Log Variant:             A unique number indicating the log variant this index is on.
820    pub fn allocate_introspection_source_index_id(
821        cluster_id: &ClusterId,
822        log_variant: LogVariant,
823    ) -> (CatalogItemId, GlobalId) {
824        let cluster_variant: u8 = match cluster_id {
825            ClusterId::System(_) => 1,
826            ClusterId::User(_) => 2,
827        };
828        let cluster_id: u64 = cluster_id.inner_id();
829        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830        assert_eq!(
831            CLUSTER_ID_MASK & cluster_id,
832            0,
833            "invalid cluster ID: {cluster_id}"
834        );
835        let log_variant: u8 = match log_variant {
836            LogVariant::Timely(TimelyLog::Operates) => 1,
837            LogVariant::Timely(TimelyLog::Channels) => 2,
838            LogVariant::Timely(TimelyLog::Elapsed) => 3,
839            LogVariant::Timely(TimelyLog::Histogram) => 4,
840            LogVariant::Timely(TimelyLog::Addresses) => 5,
841            LogVariant::Timely(TimelyLog::Parks) => 6,
842            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844            LogVariant::Timely(TimelyLog::Reachability) => 9,
845            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849            LogVariant::Differential(DifferentialLog::Sharing) => 14,
850            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
863            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
864            LogVariant::Compute(ComputeLog::LirMapping) => 30,
865            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
866            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
867        };
868
869        let mut id: u64 = u64::from(cluster_variant) << 56;
870        id |= cluster_id << 8;
871        id |= u64::from(log_variant);
872
873        (
874            CatalogItemId::IntrospectionSourceIndex(id),
875            GlobalId::IntrospectionSourceIndex(id),
876        )
877    }
878
879    pub fn allocate_user_item_ids(
880        &mut self,
881        amount: u64,
882    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
883        Ok(self
884            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
885            .into_iter()
886            // TODO(alter_table): Use separate ID allocators.
887            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
888            .collect())
889    }
890
891    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
892        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
893        Ok(ReplicaId::User(id))
894    }
895
896    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
897        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
898        Ok(ReplicaId::System(id))
899    }
900
901    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
902        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
903    }
904
905    pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
906        self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
907    }
908
909    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
910    /// object.
911    #[mz_ore::instrument]
912    fn allocate_oids(
913        &mut self,
914        amount: u64,
915        temporary_oids: &HashSet<u32>,
916    ) -> Result<Vec<u32>, CatalogError> {
917        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
918        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
919        struct UserOid(u32);
920
921        impl UserOid {
922            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
923                if oid < FIRST_USER_OID {
924                    Err(anyhow!("invalid user OID {oid}"))
925                } else {
926                    Ok(UserOid(oid))
927                }
928            }
929        }
930
931        impl std::ops::AddAssign<u32> for UserOid {
932            fn add_assign(&mut self, rhs: u32) {
933                let (res, overflow) = self.0.overflowing_add(rhs);
934                self.0 = if overflow { FIRST_USER_OID + res } else { res };
935            }
936        }
937
938        if amount > u32::MAX.into() {
939            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
940        }
941
942        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
943        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
944        // However, benchmarking shows that this doesn't make a noticeable difference and the other
945        // approach requires making sure that allocator always stays in-sync which can be
946        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
947        let mut allocated_oids = HashSet::with_capacity(
948            self.databases.len()
949                + self.schemas.len()
950                + self.roles.len()
951                + self.items.len()
952                + self.introspection_sources.len()
953                + temporary_oids.len(),
954        );
955        self.databases.for_values(|_, value| {
956            allocated_oids.insert(value.oid);
957        });
958        self.schemas.for_values(|_, value| {
959            allocated_oids.insert(value.oid);
960        });
961        self.roles.for_values(|_, value| {
962            allocated_oids.insert(value.oid);
963        });
964        self.items.for_values(|_, value| {
965            allocated_oids.insert(value.oid);
966        });
967        self.introspection_sources.for_values(|_, value| {
968            allocated_oids.insert(value.oid);
969        });
970
971        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
972
973        let start_oid: u32 = self
974            .id_allocator
975            .items()
976            .get(&IdAllocKey {
977                name: OID_ALLOC_KEY.to_string(),
978            })
979            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
980            .next_id
981            .try_into()
982            .expect("we should never persist an oid outside of the u32 range");
983        let mut current_oid = UserOid::new(start_oid)
984            .expect("we should never persist an oid outside of user OID range");
985        let mut oids = Vec::new();
986        while oids.len() < u64_to_usize(amount) {
987            if !is_allocated(current_oid.0) {
988                oids.push(current_oid.0);
989            }
990            current_oid += 1;
991
992            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
993                // We've exhausted all possible OIDs and still don't have `amount`.
994                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
995            }
996        }
997
998        let next_id = current_oid.0;
999        let prev = self.id_allocator.set(
1000            IdAllocKey {
1001                name: OID_ALLOC_KEY.to_string(),
1002            },
1003            Some(IdAllocValue {
1004                next_id: next_id.into(),
1005            }),
1006            self.op_id,
1007        )?;
1008        assert_eq!(
1009            prev,
1010            Some(IdAllocValue {
1011                next_id: start_oid.into(),
1012            })
1013        );
1014
1015        Ok(oids)
1016    }
1017
1018    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1019    /// object.
1020    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1021        self.allocate_oids(1, temporary_oids)
1022            .map(|oids| oids.into_element())
1023    }
1024
1025    /// Exports the current state of this transaction as a [`Snapshot`].
1026    ///
1027    /// This merges each `TableTransaction`'s initial data with its pending
1028    /// changes to produce the current view, then converts back to proto types.
1029    /// Used to persist transaction state between incremental DDL dry runs so
1030    /// the next dry run's fresh `Transaction` starts in sync with the
1031    /// accumulated `CatalogState`.
1032    pub fn current_snapshot(&self) -> Snapshot {
1033        Snapshot {
1034            databases: self.databases.current_items_proto(),
1035            schemas: self.schemas.current_items_proto(),
1036            roles: self.roles.current_items_proto(),
1037            role_auth: self.role_auth.current_items_proto(),
1038            items: self.items.current_items_proto(),
1039            comments: self.comments.current_items_proto(),
1040            clusters: self.clusters.current_items_proto(),
1041            network_policies: self.network_policies.current_items_proto(),
1042            cluster_replicas: self.cluster_replicas.current_items_proto(),
1043            introspection_sources: self.introspection_sources.current_items_proto(),
1044            id_allocator: self.id_allocator.current_items_proto(),
1045            configs: self.configs.current_items_proto(),
1046            settings: self.settings.current_items_proto(),
1047            system_object_mappings: self.system_gid_mapping.current_items_proto(),
1048            system_configurations: self.system_configurations.current_items_proto(),
1049            default_privileges: self.default_privileges.current_items_proto(),
1050            source_references: self.source_references.current_items_proto(),
1051            system_privileges: self.system_privileges.current_items_proto(),
1052            storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1053            unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1054            txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1055        }
1056    }
1057
1058    pub(crate) fn insert_id_allocator(
1059        &mut self,
1060        name: String,
1061        next_id: u64,
1062    ) -> Result<(), CatalogError> {
1063        match self.id_allocator.insert(
1064            IdAllocKey { name: name.clone() },
1065            IdAllocValue { next_id },
1066            self.op_id,
1067        ) {
1068            Ok(_) => Ok(()),
1069            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1070        }
1071    }
1072
1073    /// Removes the database `id` from the transaction.
1074    ///
1075    /// Returns an error if `id` is not found.
1076    ///
1077    /// Runtime is linear with respect to the total number of databases in the catalog.
1078    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1079    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1080        let prev = self
1081            .databases
1082            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1083        if prev.is_some() {
1084            Ok(())
1085        } else {
1086            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1087        }
1088    }
1089
1090    /// Removes all databases in `databases` from the transaction.
1091    ///
1092    /// Returns an error if any id in `databases` is not found.
1093    ///
1094    /// NOTE: On error, there still may be some databases removed from the transaction. It
1095    /// is up to the caller to either abort the transaction or commit.
1096    pub fn remove_databases(
1097        &mut self,
1098        databases: &BTreeSet<DatabaseId>,
1099    ) -> Result<(), CatalogError> {
1100        if databases.is_empty() {
1101            return Ok(());
1102        }
1103
1104        let to_remove = databases
1105            .iter()
1106            .map(|id| (DatabaseKey { id: *id }, None))
1107            .collect();
1108        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1109        prev.retain(|_k, val| val.is_none());
1110
1111        if !prev.is_empty() {
1112            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1113            return Err(SqlCatalogError::UnknownDatabase(err).into());
1114        }
1115
1116        Ok(())
1117    }
1118
1119    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1120    ///
1121    /// Returns an error if `(database_id, schema_id)` is not found.
1122    ///
1123    /// Runtime is linear with respect to the total number of schemas in the catalog.
1124    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1125    pub fn remove_schema(
1126        &mut self,
1127        database_id: &Option<DatabaseId>,
1128        schema_id: &SchemaId,
1129    ) -> Result<(), CatalogError> {
1130        let prev = self
1131            .schemas
1132            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1133        if prev.is_some() {
1134            Ok(())
1135        } else {
1136            let database_name = match database_id {
1137                Some(id) => format!("{id}."),
1138                None => "".to_string(),
1139            };
1140            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1141        }
1142    }
1143
1144    /// Removes all schemas in `schemas` from the transaction.
1145    ///
1146    /// Returns an error if any id in `schemas` is not found.
1147    ///
1148    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1149    /// is up to the caller to either abort the transaction or commit.
1150    pub fn remove_schemas(
1151        &mut self,
1152        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1153    ) -> Result<(), CatalogError> {
1154        if schemas.is_empty() {
1155            return Ok(());
1156        }
1157
1158        let to_remove = schemas
1159            .iter()
1160            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1161            .collect();
1162        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1163        prev.retain(|_k, v| v.is_none());
1164
1165        if !prev.is_empty() {
1166            let err = prev
1167                .keys()
1168                .map(|k| {
1169                    let db_spec = schemas.get(&k.id).expect("should_exist");
1170                    let db_name = match db_spec {
1171                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1172                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1173                    };
1174                    format!("{}.{}", db_name, k.id)
1175                })
1176                .join(", ");
1177
1178            return Err(SqlCatalogError::UnknownSchema(err).into());
1179        }
1180
1181        Ok(())
1182    }
1183
1184    pub fn remove_source_references(
1185        &mut self,
1186        source_id: CatalogItemId,
1187    ) -> Result<(), CatalogError> {
1188        let deleted = self
1189            .source_references
1190            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1191            .is_some();
1192        if deleted {
1193            Ok(())
1194        } else {
1195            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1196        }
1197    }
1198
1199    /// Removes all user roles in `roles` from the transaction.
1200    ///
1201    /// Returns an error if any id in `roles` is not found.
1202    ///
1203    /// NOTE: On error, there still may be some roles removed from the transaction. It
1204    /// is up to the caller to either abort the transaction or commit.
1205    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1206        assert!(
1207            roles.iter().all(|id| id.is_user()),
1208            "cannot delete non-user roles"
1209        );
1210        self.remove_roles(roles)
1211    }
1212
1213    /// Removes all roles in `roles` from the transaction.
1214    ///
1215    /// Returns an error if any id in `roles` is not found.
1216    ///
1217    /// NOTE: On error, there still may be some roles removed from the transaction. It
1218    /// is up to the caller to either abort the transaction or commit.
1219    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1220        if roles.is_empty() {
1221            return Ok(());
1222        }
1223
1224        let to_remove_keys = roles
1225            .iter()
1226            .map(|role_id| RoleKey { id: *role_id })
1227            .collect::<Vec<_>>();
1228
1229        let to_remove_roles = to_remove_keys
1230            .iter()
1231            .map(|role_key| (role_key.clone(), None))
1232            .collect();
1233
1234        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1235
1236        let to_remove_role_auth = to_remove_keys
1237            .iter()
1238            .map(|role_key| {
1239                (
1240                    RoleAuthKey {
1241                        role_id: role_key.id,
1242                    },
1243                    None,
1244                )
1245            })
1246            .collect();
1247
1248        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1249
1250        prev.retain(|_k, v| v.is_none());
1251        if !prev.is_empty() {
1252            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1253            return Err(SqlCatalogError::UnknownRole(err).into());
1254        }
1255
1256        role_auth_prev.retain(|_k, v| v.is_none());
1257        // The reason we don't to the same check as above is that the role auth table
1258        // is not required to have all roles in the role table.
1259
1260        Ok(())
1261    }
1262
1263    /// Removes all cluster in `clusters` from the transaction.
1264    ///
1265    /// Returns an error if any id in `clusters` is not found.
1266    ///
1267    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1268    /// the caller to either abort the transaction or commit.
1269    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1270        if clusters.is_empty() {
1271            return Ok(());
1272        }
1273
1274        let to_remove = clusters
1275            .iter()
1276            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1277            .collect();
1278        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1279
1280        prev.retain(|_k, v| v.is_none());
1281        if !prev.is_empty() {
1282            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1283            return Err(SqlCatalogError::UnknownCluster(err).into());
1284        }
1285
1286        // Cascade delete introspection sources and cluster replicas.
1287        //
1288        // TODO(benesch): this doesn't seem right. Cascade deletions should
1289        // be entirely the domain of the higher catalog layer, not the
1290        // storage layer.
1291        self.cluster_replicas
1292            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1293        self.introspection_sources
1294            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1295
1296        Ok(())
1297    }
1298
1299    /// Removes the cluster replica `id` from the transaction.
1300    ///
1301    /// Returns an error if `id` is not found.
1302    ///
1303    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1304    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1305    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1306        let deleted = self
1307            .cluster_replicas
1308            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1309            .is_some();
1310        if deleted {
1311            Ok(())
1312        } else {
1313            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1314        }
1315    }
1316
1317    /// Removes all cluster replicas in `replicas` from the transaction.
1318    ///
1319    /// Returns an error if any id in `replicas` is not found.
1320    ///
1321    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1322    /// is up to the caller to either abort the transaction or commit.
1323    pub fn remove_cluster_replicas(
1324        &mut self,
1325        replicas: &BTreeSet<ReplicaId>,
1326    ) -> Result<(), CatalogError> {
1327        if replicas.is_empty() {
1328            return Ok(());
1329        }
1330
1331        let to_remove = replicas
1332            .iter()
1333            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1334            .collect();
1335        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1336
1337        prev.retain(|_k, v| v.is_none());
1338        if !prev.is_empty() {
1339            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1340            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1341        }
1342
1343        Ok(())
1344    }
1345
1346    /// Removes item `id` from the transaction.
1347    ///
1348    /// Returns an error if `id` is not found.
1349    ///
1350    /// Runtime is linear with respect to the total number of items in the catalog.
1351    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1352    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1353        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1354        if prev.is_some() {
1355            Ok(())
1356        } else {
1357            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1358        }
1359    }
1360
1361    /// Removes all items in `ids` from the transaction.
1362    ///
1363    /// Returns an error if any id in `ids` is not found.
1364    ///
1365    /// NOTE: On error, there still may be some items removed from the transaction. It is
1366    /// up to the caller to either abort the transaction or commit.
1367    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1368        if ids.is_empty() {
1369            return Ok(());
1370        }
1371
1372        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1373        let n = self.items.delete_by_keys(ks, self.op_id).len();
1374        if n == ids.len() {
1375            Ok(())
1376        } else {
1377            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1378            let mut unknown = ids.difference(&item_ids);
1379            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1380        }
1381    }
1382
1383    /// Removes all system object mappings in `descriptions` from the transaction.
1384    ///
1385    /// Returns an error if any description in `descriptions` is not found.
1386    ///
1387    /// NOTE: On error, there still may be some items removed from the transaction. It is
1388    /// up to the caller to either abort the transaction or commit.
1389    pub fn remove_system_object_mappings(
1390        &mut self,
1391        descriptions: BTreeSet<SystemObjectDescription>,
1392    ) -> Result<(), CatalogError> {
1393        if descriptions.is_empty() {
1394            return Ok(());
1395        }
1396
1397        let ks: Vec<_> = descriptions
1398            .clone()
1399            .into_iter()
1400            .map(|desc| GidMappingKey {
1401                schema_name: desc.schema_name,
1402                object_type: desc.object_type,
1403                object_name: desc.object_name,
1404            })
1405            .collect();
1406        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1407
1408        if n == descriptions.len() {
1409            Ok(())
1410        } else {
1411            let item_descriptions = self
1412                .system_gid_mapping
1413                .items()
1414                .keys()
1415                .map(|k| SystemObjectDescription {
1416                    schema_name: k.schema_name.clone(),
1417                    object_type: k.object_type.clone(),
1418                    object_name: k.object_name.clone(),
1419                })
1420                .collect();
1421            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1422                format!(
1423                    "{} {}.{}",
1424                    desc.object_type, desc.schema_name, desc.object_name
1425                )
1426            });
1427            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1428        }
1429    }
1430
1431    /// Removes all introspection source indexes in `indexes` from the transaction.
1432    ///
1433    /// Returns an error if any index in `indexes` is not found.
1434    ///
1435    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1436    /// up to the caller to either abort the transaction or commit.
1437    pub fn remove_introspection_source_indexes(
1438        &mut self,
1439        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1440    ) -> Result<(), CatalogError> {
1441        if introspection_source_indexes.is_empty() {
1442            return Ok(());
1443        }
1444
1445        let ks: Vec<_> = introspection_source_indexes
1446            .clone()
1447            .into_iter()
1448            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1449            .collect();
1450        let n = self
1451            .introspection_sources
1452            .delete_by_keys(ks, self.op_id)
1453            .len();
1454        if n == introspection_source_indexes.len() {
1455            Ok(())
1456        } else {
1457            let txn_indexes = self
1458                .introspection_sources
1459                .items()
1460                .keys()
1461                .map(|k| (k.cluster_id, k.name.clone()))
1462                .collect();
1463            let mut unknown = introspection_source_indexes
1464                .difference(&txn_indexes)
1465                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1466            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1467        }
1468    }
1469
1470    /// Updates item `id` in the transaction to `item_name` and `item`.
1471    ///
1472    /// Returns an error if `id` is not found.
1473    ///
1474    /// Runtime is linear with respect to the total number of items in the catalog.
1475    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1476    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1477        let updated =
1478            self.items
1479                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1480        if updated {
1481            Ok(())
1482        } else {
1483            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1484        }
1485    }
1486
1487    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1488    /// corresponding value in `items`.
1489    ///
1490    /// Returns an error if any id in `items` is not found.
1491    ///
1492    /// NOTE: On error, there still may be some items updated in the transaction. It is
1493    /// up to the caller to either abort the transaction or commit.
1494    pub fn update_items(
1495        &mut self,
1496        items: BTreeMap<CatalogItemId, Item>,
1497    ) -> Result<(), CatalogError> {
1498        if items.is_empty() {
1499            return Ok(());
1500        }
1501
1502        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1503        let kvs: Vec<_> = items
1504            .clone()
1505            .into_iter()
1506            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1507            .collect();
1508        let n = self.items.update_by_keys(kvs, self.op_id)?;
1509        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1510        if n == update_ids.len() {
1511            Ok(())
1512        } else {
1513            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1514            let mut unknown = update_ids.difference(&item_ids);
1515            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1516        }
1517    }
1518
1519    /// Updates role `id` in the transaction to `role`.
1520    ///
1521    /// Returns an error if `id` is not found.
1522    ///
1523    /// Runtime is linear with respect to the total number of items in the catalog.
1524    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1525    /// You should model it after [`Self::update_items`].
1526    pub fn update_role(
1527        &mut self,
1528        id: RoleId,
1529        role: Role,
1530        password: PasswordAction,
1531    ) -> Result<(), CatalogError> {
1532        let key = RoleKey { id };
1533        if self.roles.get(&key).is_some() {
1534            let auth_key = RoleAuthKey { role_id: id };
1535
1536            match password {
1537                PasswordAction::Set(new_password) => {
1538                    let hash = mz_auth::hash::scram256_hash(
1539                        &new_password.password,
1540                        &new_password.scram_iterations,
1541                    )
1542                    .expect("password hash should be valid");
1543                    let value = RoleAuthValue {
1544                        password_hash: Some(hash),
1545                        updated_at: SYSTEM_TIME(),
1546                    };
1547
1548                    if self.role_auth.get(&auth_key).is_some() {
1549                        self.role_auth
1550                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1551                    } else {
1552                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1553                    }
1554                }
1555                PasswordAction::Clear => {
1556                    let value = RoleAuthValue {
1557                        password_hash: None,
1558                        updated_at: SYSTEM_TIME(),
1559                    };
1560                    if self.role_auth.get(&auth_key).is_some() {
1561                        self.role_auth
1562                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1563                    }
1564                }
1565                PasswordAction::NoChange => {}
1566            }
1567
1568            self.roles
1569                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1570
1571            Ok(())
1572        } else {
1573            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1574        }
1575    }
1576
1577    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1578    /// corresponding value in `roles`.
1579    ///
1580    /// This function does *not* write role_authentication information to the catalog.
1581    /// It is purely for updating the role itself.
1582    ///
1583    /// Returns an error if any id in `roles` is not found.
1584    ///
1585    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1586    /// up to the caller to either abort the transaction or commit.
1587    pub fn update_roles_without_auth(
1588        &mut self,
1589        roles: BTreeMap<RoleId, Role>,
1590    ) -> Result<(), CatalogError> {
1591        if roles.is_empty() {
1592            return Ok(());
1593        }
1594
1595        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1596        let kvs: Vec<_> = roles
1597            .into_iter()
1598            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1599            .collect();
1600        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1601        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1602
1603        if n == update_role_ids.len() {
1604            Ok(())
1605        } else {
1606            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1607            let mut unknown = update_role_ids.difference(&role_ids);
1608            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1609        }
1610    }
1611
1612    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1613    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1614    ///
1615    /// Panics if provided id is not a system id.
1616    pub fn update_system_object_mappings(
1617        &mut self,
1618        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1619    ) -> Result<(), CatalogError> {
1620        if mappings.is_empty() {
1621            return Ok(());
1622        }
1623
1624        let n = self.system_gid_mapping.update(
1625            |_k, v| {
1626                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1627                    let (_, new_value) = mapping.clone().into_key_value();
1628                    Some(new_value)
1629                } else {
1630                    None
1631                }
1632            },
1633            self.op_id,
1634        )?;
1635
1636        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1637            != mappings.len()
1638        {
1639            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1640            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1641        }
1642
1643        Ok(())
1644    }
1645
1646    /// Updates cluster `id` in the transaction to `cluster`.
1647    ///
1648    /// Returns an error if `id` is not found.
1649    ///
1650    /// Runtime is linear with respect to the total number of clusters in the catalog.
1651    /// DO NOT call this function in a loop.
1652    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1653        let updated = self.clusters.update_by_key(
1654            ClusterKey { id },
1655            cluster.into_key_value().1,
1656            self.op_id,
1657        )?;
1658        if updated {
1659            Ok(())
1660        } else {
1661            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1662        }
1663    }
1664
1665    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1666    ///
1667    /// Returns an error if `replica_id` is not found.
1668    ///
1669    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1670    /// DO NOT call this function in a loop.
1671    pub fn update_cluster_replica(
1672        &mut self,
1673        replica_id: ReplicaId,
1674        replica: ClusterReplica,
1675    ) -> Result<(), CatalogError> {
1676        let updated = self.cluster_replicas.update_by_key(
1677            ClusterReplicaKey { id: replica_id },
1678            replica.into_key_value().1,
1679            self.op_id,
1680        )?;
1681        if updated {
1682            Ok(())
1683        } else {
1684            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1685        }
1686    }
1687
1688    /// Updates database `id` in the transaction to `database`.
1689    ///
1690    /// Returns an error if `id` is not found.
1691    ///
1692    /// Runtime is linear with respect to the total number of databases in the catalog.
1693    /// DO NOT call this function in a loop.
1694    pub fn update_database(
1695        &mut self,
1696        id: DatabaseId,
1697        database: Database,
1698    ) -> Result<(), CatalogError> {
1699        let updated = self.databases.update_by_key(
1700            DatabaseKey { id },
1701            database.into_key_value().1,
1702            self.op_id,
1703        )?;
1704        if updated {
1705            Ok(())
1706        } else {
1707            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1708        }
1709    }
1710
1711    /// Updates schema `schema_id` in the transaction to `schema`.
1712    ///
1713    /// Returns an error if `schema_id` is not found.
1714    ///
1715    /// Runtime is linear with respect to the total number of schemas in the catalog.
1716    /// DO NOT call this function in a loop.
1717    pub fn update_schema(
1718        &mut self,
1719        schema_id: SchemaId,
1720        schema: Schema,
1721    ) -> Result<(), CatalogError> {
1722        let updated = self.schemas.update_by_key(
1723            SchemaKey { id: schema_id },
1724            schema.into_key_value().1,
1725            self.op_id,
1726        )?;
1727        if updated {
1728            Ok(())
1729        } else {
1730            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1731        }
1732    }
1733
1734    /// Updates `network_policy_id` in the transaction to `network policy`.
1735    ///
1736    /// Returns an error if `id` is not found.
1737    ///
1738    /// Runtime is linear with respect to the total number of databases in the catalog.
1739    /// DO NOT call this function in a loop.
1740    pub fn update_network_policy(
1741        &mut self,
1742        id: NetworkPolicyId,
1743        network_policy: NetworkPolicy,
1744    ) -> Result<(), CatalogError> {
1745        let updated = self.network_policies.update_by_key(
1746            NetworkPolicyKey { id },
1747            network_policy.into_key_value().1,
1748            self.op_id,
1749        )?;
1750        if updated {
1751            Ok(())
1752        } else {
1753            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1754        }
1755    }
1756    /// Removes all network policies in `network policies` from the transaction.
1757    ///
1758    /// Returns an error if any id in `network policy` is not found.
1759    ///
1760    /// NOTE: On error, there still may be some roles removed from the transaction. It
1761    /// is up to the caller to either abort the transaction or commit.
1762    pub fn remove_network_policies(
1763        &mut self,
1764        network_policies: &BTreeSet<NetworkPolicyId>,
1765    ) -> Result<(), CatalogError> {
1766        if network_policies.is_empty() {
1767            return Ok(());
1768        }
1769
1770        let to_remove = network_policies
1771            .iter()
1772            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1773            .collect();
1774        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1775        assert!(
1776            prev.iter().all(|(k, _)| k.id.is_user()),
1777            "cannot delete non-user network policy"
1778        );
1779
1780        prev.retain(|_k, v| v.is_none());
1781        if !prev.is_empty() {
1782            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1783            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1784        }
1785
1786        Ok(())
1787    }
1788    /// Set persisted default privilege.
1789    ///
1790    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1791    pub fn set_default_privilege(
1792        &mut self,
1793        role_id: RoleId,
1794        database_id: Option<DatabaseId>,
1795        schema_id: Option<SchemaId>,
1796        object_type: ObjectType,
1797        grantee: RoleId,
1798        privileges: Option<AclMode>,
1799    ) -> Result<(), CatalogError> {
1800        self.default_privileges.set(
1801            DefaultPrivilegesKey {
1802                role_id,
1803                database_id,
1804                schema_id,
1805                object_type,
1806                grantee,
1807            },
1808            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1809            self.op_id,
1810        )?;
1811        Ok(())
1812    }
1813
1814    /// Set persisted default privileges.
1815    pub fn set_default_privileges(
1816        &mut self,
1817        default_privileges: Vec<DefaultPrivilege>,
1818    ) -> Result<(), CatalogError> {
1819        if default_privileges.is_empty() {
1820            return Ok(());
1821        }
1822
1823        let default_privileges = default_privileges
1824            .into_iter()
1825            .map(DurableType::into_key_value)
1826            .map(|(k, v)| (k, Some(v)))
1827            .collect();
1828        self.default_privileges
1829            .set_many(default_privileges, self.op_id)?;
1830        Ok(())
1831    }
1832
1833    /// Set persisted system privilege.
1834    ///
1835    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1836    pub fn set_system_privilege(
1837        &mut self,
1838        grantee: RoleId,
1839        grantor: RoleId,
1840        acl_mode: Option<AclMode>,
1841    ) -> Result<(), CatalogError> {
1842        self.system_privileges.set(
1843            SystemPrivilegesKey { grantee, grantor },
1844            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1845            self.op_id,
1846        )?;
1847        Ok(())
1848    }
1849
1850    /// Set persisted system privileges.
1851    pub fn set_system_privileges(
1852        &mut self,
1853        system_privileges: Vec<MzAclItem>,
1854    ) -> Result<(), CatalogError> {
1855        if system_privileges.is_empty() {
1856            return Ok(());
1857        }
1858
1859        let system_privileges = system_privileges
1860            .into_iter()
1861            .map(DurableType::into_key_value)
1862            .map(|(k, v)| (k, Some(v)))
1863            .collect();
1864        self.system_privileges
1865            .set_many(system_privileges, self.op_id)?;
1866        Ok(())
1867    }
1868
1869    /// Set persisted setting.
1870    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1871        self.settings.set(
1872            SettingKey { name },
1873            value.map(|value| SettingValue { value }),
1874            self.op_id,
1875        )?;
1876        Ok(())
1877    }
1878
1879    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1880        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1881    }
1882
1883    /// Insert persisted introspection source index.
1884    pub fn insert_introspection_source_indexes(
1885        &mut self,
1886        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1887        temporary_oids: &HashSet<u32>,
1888    ) -> Result<(), CatalogError> {
1889        if introspection_source_indexes.is_empty() {
1890            return Ok(());
1891        }
1892
1893        let amount = usize_to_u64(introspection_source_indexes.len());
1894        let oids = self.allocate_oids(amount, temporary_oids)?;
1895        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1896            .into_iter()
1897            .zip_eq(oids)
1898            .map(
1899                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1900                    cluster_id,
1901                    name,
1902                    item_id,
1903                    index_id,
1904                    oid,
1905                },
1906            )
1907            .collect();
1908
1909        for introspection_source_index in introspection_source_indexes {
1910            let (key, value) = introspection_source_index.into_key_value();
1911            self.introspection_sources.insert(key, value, self.op_id)?;
1912        }
1913
1914        Ok(())
1915    }
1916
1917    /// Set persisted system object mappings.
1918    pub fn set_system_object_mappings(
1919        &mut self,
1920        mappings: Vec<SystemObjectMapping>,
1921    ) -> Result<(), CatalogError> {
1922        if mappings.is_empty() {
1923            return Ok(());
1924        }
1925
1926        let mappings = mappings
1927            .into_iter()
1928            .map(DurableType::into_key_value)
1929            .map(|(k, v)| (k, Some(v)))
1930            .collect();
1931        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1932        Ok(())
1933    }
1934
1935    /// Set persisted replica.
1936    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1937        if replicas.is_empty() {
1938            return Ok(());
1939        }
1940
1941        let replicas = replicas
1942            .into_iter()
1943            .map(DurableType::into_key_value)
1944            .map(|(k, v)| (k, Some(v)))
1945            .collect();
1946        self.cluster_replicas.set_many(replicas, self.op_id)?;
1947        Ok(())
1948    }
1949
1950    /// Set persisted configuration.
1951    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1952        match value {
1953            Some(value) => {
1954                let config = Config { key, value };
1955                let (key, value) = config.into_key_value();
1956                self.configs.set(key, Some(value), self.op_id)?;
1957            }
1958            None => {
1959                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1960            }
1961        }
1962        Ok(())
1963    }
1964
1965    /// Get the value of a persisted config.
1966    pub fn get_config(&self, key: String) -> Option<u64> {
1967        self.configs
1968            .get(&ConfigKey { key })
1969            .map(|entry| entry.value)
1970    }
1971
1972    /// Get the value of a persisted setting.
1973    pub fn get_setting(&self, name: String) -> Option<&str> {
1974        self.settings
1975            .get(&SettingKey { name })
1976            .map(|entry| &*entry.value)
1977    }
1978
1979    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1980        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1981            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1982    }
1983
1984    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1985        self.set_setting(
1986            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1987            Some(shard_id.to_string()),
1988        )
1989    }
1990
1991    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1992        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1993            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1994    }
1995
1996    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1997        self.set_setting(
1998            EXPRESSION_CACHE_SHARD_KEY.to_string(),
1999            Some(shard_id.to_string()),
2000        )
2001    }
2002
2003    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
2004    /// match the `with_0dt_deployment_max_wait` "system var" value.
2005    ///
2006    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2007    /// but use it in boot before Launch Darkly is available.
2008    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2009        self.set_config(
2010            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2011            Some(
2012                value
2013                    .as_millis()
2014                    .try_into()
2015                    .expect("max wait fits into u64"),
2016            ),
2017        )
2018    }
2019
2020    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
2021    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2022    /// value.
2023    ///
2024    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2025    /// but use it in boot before Launch Darkly is available.
2026    pub fn set_0dt_deployment_ddl_check_interval(
2027        &mut self,
2028        value: Duration,
2029    ) -> Result<(), CatalogError> {
2030        self.set_config(
2031            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2032            Some(
2033                value
2034                    .as_millis()
2035                    .try_into()
2036                    .expect("ddl check interval fits into u64"),
2037            ),
2038        )
2039    }
2040
2041    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
2042    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
2043    ///
2044    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2045    /// but use it in boot before Launch Darkly is available.
2046    pub fn set_enable_0dt_deployment_panic_after_timeout(
2047        &mut self,
2048        value: bool,
2049    ) -> Result<(), CatalogError> {
2050        self.set_config(
2051            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2052            Some(u64::from(value)),
2053        )
2054    }
2055
2056    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2057    /// match the `with_0dt_deployment_max_wait` "system var" value.
2058    ///
2059    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2060    /// but use it in boot before LaunchDarkly is available.
2061    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2062        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2063    }
2064
2065    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2066    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2067    /// value.
2068    ///
2069    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2070    /// use it in boot before LaunchDarkly is available.
2071    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2072        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2073    }
2074
2075    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2076    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2077    /// var" value.
2078    ///
2079    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2080    /// use it in boot before LaunchDarkly is available.
2081    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2082        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2083    }
2084
2085    /// Updates the catalog `system_config_synced` "config" value to true.
2086    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2087        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2088    }
2089
2090    pub fn update_comment(
2091        &mut self,
2092        object_id: CommentObjectId,
2093        sub_component: Option<usize>,
2094        comment: Option<String>,
2095    ) -> Result<(), CatalogError> {
2096        let key = CommentKey {
2097            object_id,
2098            sub_component,
2099        };
2100        let value = comment.map(|c| CommentValue { comment: c });
2101        self.comments.set(key, value, self.op_id)?;
2102
2103        Ok(())
2104    }
2105
2106    pub fn drop_comments(
2107        &mut self,
2108        object_ids: &BTreeSet<CommentObjectId>,
2109    ) -> Result<(), CatalogError> {
2110        if object_ids.is_empty() {
2111            return Ok(());
2112        }
2113
2114        self.comments
2115            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2116        Ok(())
2117    }
2118
2119    pub fn update_source_references(
2120        &mut self,
2121        source_id: CatalogItemId,
2122        references: Vec<SourceReference>,
2123        updated_at: u64,
2124    ) -> Result<(), CatalogError> {
2125        let key = SourceReferencesKey { source_id };
2126        let value = SourceReferencesValue {
2127            references,
2128            updated_at,
2129        };
2130        self.source_references.set(key, Some(value), self.op_id)?;
2131        Ok(())
2132    }
2133
2134    /// Upserts persisted system configuration `name` to `value`.
2135    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2136        let key = ServerConfigurationKey {
2137            name: name.to_string(),
2138        };
2139        let value = ServerConfigurationValue { value };
2140        self.system_configurations
2141            .set(key, Some(value), self.op_id)?;
2142        Ok(())
2143    }
2144
2145    /// Removes persisted system configuration `name`.
2146    pub fn remove_system_config(&mut self, name: &str) {
2147        let key = ServerConfigurationKey {
2148            name: name.to_string(),
2149        };
2150        self.system_configurations
2151            .set(key, None, self.op_id)
2152            .expect("cannot have uniqueness violation");
2153    }
2154
2155    /// Removes all persisted system configurations.
2156    pub fn clear_system_configs(&mut self) {
2157        self.system_configurations.delete(|_k, _v| true, self.op_id);
2158    }
2159
2160    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2161        match self.configs.insert(
2162            ConfigKey { key: key.clone() },
2163            ConfigValue { value },
2164            self.op_id,
2165        ) {
2166            Ok(_) => Ok(()),
2167            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2168        }
2169    }
2170
2171    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2172        self.clusters
2173            .items()
2174            .into_iter()
2175            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2176    }
2177
2178    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2179        self.cluster_replicas
2180            .items()
2181            .into_iter()
2182            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2183    }
2184
2185    pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2186        self.databases
2187            .items()
2188            .into_iter()
2189            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2190    }
2191
2192    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2193        self.roles
2194            .items()
2195            .into_iter()
2196            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2197    }
2198
2199    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2200        self.network_policies
2201            .items()
2202            .into_iter()
2203            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2204    }
2205
2206    pub fn get_system_object_mappings(
2207        &self,
2208    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2209        self.system_gid_mapping
2210            .items()
2211            .into_iter()
2212            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2213    }
2214
2215    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2216        self.schemas
2217            .items()
2218            .into_iter()
2219            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2220    }
2221
2222    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2223        self.system_configurations
2224            .items()
2225            .into_iter()
2226            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2227    }
2228
2229    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2230        let key = SchemaKey { id: *id };
2231        self.schemas
2232            .get(&key)
2233            .map(|v| DurableType::from_key_value(key, v.clone()))
2234    }
2235
2236    pub fn get_introspection_source_indexes(
2237        &self,
2238        cluster_id: ClusterId,
2239    ) -> BTreeMap<&str, (GlobalId, u32)> {
2240        self.introspection_sources
2241            .items()
2242            .into_iter()
2243            .filter(|(k, _v)| k.cluster_id == cluster_id)
2244            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2245            .collect()
2246    }
2247
2248    pub fn get_catalog_content_version(&self) -> Option<&str> {
2249        self.settings
2250            .get(&SettingKey {
2251                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2252            })
2253            .map(|value| &*value.value)
2254    }
2255
2256    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2257        self.settings
2258            .get(&SettingKey {
2259                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2260            })
2261            .map(|value| value.value.clone())
2262    }
2263
2264    /// Commit the current operation within the transaction. This does not cause anything to be
2265    /// written durably, but signals to the current transaction that we are moving on to the next
2266    /// operation.
2267    ///
2268    /// Returns the updates of the committed operation.
2269    #[must_use]
2270    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2271        let updates = self.get_op_updates();
2272        self.commit_op();
2273        updates
2274    }
2275
2276    fn get_op_updates(&self) -> Vec<StateUpdate> {
2277        fn get_collection_op_updates<'a, T>(
2278            table_txn: &'a TableTransaction<T::Key, T::Value>,
2279            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2280            op: Timestamp,
2281        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2282        where
2283            T::Key: Ord + Eq + Clone + Debug,
2284            T::Value: Ord + Clone + Debug,
2285            T: DurableType,
2286        {
2287            table_txn
2288                .pending
2289                .iter()
2290                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2291                .filter_map(move |(k, v)| {
2292                    if v.ts == op {
2293                        let key = k.clone();
2294                        let value = v.value.clone();
2295                        let diff = v.diff.clone().try_into().expect("invalid diff");
2296                        let update = DurableType::from_key_value(key, value);
2297                        let kind = kind_fn(update);
2298                        Some((kind, diff))
2299                    } else {
2300                        None
2301                    }
2302                })
2303        }
2304
2305        fn get_large_collection_op_updates<'a, T>(
2306            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2307            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2308            op: Timestamp,
2309        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2310        where
2311            T::Key: Ord + Eq + Clone + Debug,
2312            T: DurableType<Value = ()>,
2313        {
2314            collection.iter().filter_map(move |(k, diff, ts)| {
2315                if *ts == op {
2316                    let key = k.clone();
2317                    let diff = diff.clone().try_into().expect("invalid diff");
2318                    let update = DurableType::from_key_value(key, ());
2319                    let kind = kind_fn(update);
2320                    Some((kind, diff))
2321                } else {
2322                    None
2323                }
2324            })
2325        }
2326
2327        let Transaction {
2328            durable_catalog: _,
2329            databases,
2330            schemas,
2331            items,
2332            comments,
2333            roles,
2334            role_auth,
2335            clusters,
2336            network_policies,
2337            cluster_replicas,
2338            introspection_sources,
2339            system_gid_mapping,
2340            system_configurations,
2341            default_privileges,
2342            source_references,
2343            system_privileges,
2344            audit_log_updates,
2345            storage_collection_metadata,
2346            unfinalized_shards,
2347            // Not representable as a `StateUpdate`.
2348            id_allocator: _,
2349            configs: _,
2350            settings: _,
2351            txn_wal_shard: _,
2352            upper,
2353            op_id: _,
2354        } = &self;
2355
2356        let updates = std::iter::empty()
2357            .chain(get_collection_op_updates(
2358                roles,
2359                StateUpdateKind::Role,
2360                self.op_id,
2361            ))
2362            .chain(get_collection_op_updates(
2363                role_auth,
2364                StateUpdateKind::RoleAuth,
2365                self.op_id,
2366            ))
2367            .chain(get_collection_op_updates(
2368                databases,
2369                StateUpdateKind::Database,
2370                self.op_id,
2371            ))
2372            .chain(get_collection_op_updates(
2373                schemas,
2374                StateUpdateKind::Schema,
2375                self.op_id,
2376            ))
2377            .chain(get_collection_op_updates(
2378                default_privileges,
2379                StateUpdateKind::DefaultPrivilege,
2380                self.op_id,
2381            ))
2382            .chain(get_collection_op_updates(
2383                system_privileges,
2384                StateUpdateKind::SystemPrivilege,
2385                self.op_id,
2386            ))
2387            .chain(get_collection_op_updates(
2388                system_configurations,
2389                StateUpdateKind::SystemConfiguration,
2390                self.op_id,
2391            ))
2392            .chain(get_collection_op_updates(
2393                clusters,
2394                StateUpdateKind::Cluster,
2395                self.op_id,
2396            ))
2397            .chain(get_collection_op_updates(
2398                network_policies,
2399                StateUpdateKind::NetworkPolicy,
2400                self.op_id,
2401            ))
2402            .chain(get_collection_op_updates(
2403                introspection_sources,
2404                StateUpdateKind::IntrospectionSourceIndex,
2405                self.op_id,
2406            ))
2407            .chain(get_collection_op_updates(
2408                cluster_replicas,
2409                StateUpdateKind::ClusterReplica,
2410                self.op_id,
2411            ))
2412            .chain(get_collection_op_updates(
2413                system_gid_mapping,
2414                StateUpdateKind::SystemObjectMapping,
2415                self.op_id,
2416            ))
2417            .chain(get_collection_op_updates(
2418                items,
2419                StateUpdateKind::Item,
2420                self.op_id,
2421            ))
2422            .chain(get_collection_op_updates(
2423                comments,
2424                StateUpdateKind::Comment,
2425                self.op_id,
2426            ))
2427            .chain(get_collection_op_updates(
2428                source_references,
2429                StateUpdateKind::SourceReferences,
2430                self.op_id,
2431            ))
2432            .chain(get_collection_op_updates(
2433                storage_collection_metadata,
2434                StateUpdateKind::StorageCollectionMetadata,
2435                self.op_id,
2436            ))
2437            .chain(get_collection_op_updates(
2438                unfinalized_shards,
2439                StateUpdateKind::UnfinalizedShard,
2440                self.op_id,
2441            ))
2442            .chain(get_large_collection_op_updates(
2443                audit_log_updates,
2444                StateUpdateKind::AuditLog,
2445                self.op_id,
2446            ))
2447            .map(|(kind, diff)| StateUpdate {
2448                kind,
2449                ts: upper.clone(),
2450                diff,
2451            })
2452            .collect();
2453
2454        updates
2455    }
2456
2457    pub fn is_savepoint(&self) -> bool {
2458        self.durable_catalog.is_savepoint()
2459    }
2460
2461    fn commit_op(&mut self) {
2462        self.op_id += 1;
2463    }
2464
2465    pub fn op_id(&self) -> Timestamp {
2466        self.op_id
2467    }
2468
2469    pub fn upper(&self) -> mz_repr::Timestamp {
2470        self.upper
2471    }
2472
2473    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2474        let audit_log_updates = self
2475            .audit_log_updates
2476            .into_iter()
2477            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2478            .collect();
2479
2480        let txn_batch = TransactionBatch {
2481            databases: self.databases.pending(),
2482            schemas: self.schemas.pending(),
2483            items: self.items.pending(),
2484            comments: self.comments.pending(),
2485            roles: self.roles.pending(),
2486            role_auth: self.role_auth.pending(),
2487            clusters: self.clusters.pending(),
2488            cluster_replicas: self.cluster_replicas.pending(),
2489            network_policies: self.network_policies.pending(),
2490            introspection_sources: self.introspection_sources.pending(),
2491            id_allocator: self.id_allocator.pending(),
2492            configs: self.configs.pending(),
2493            source_references: self.source_references.pending(),
2494            settings: self.settings.pending(),
2495            system_gid_mapping: self.system_gid_mapping.pending(),
2496            system_configurations: self.system_configurations.pending(),
2497            default_privileges: self.default_privileges.pending(),
2498            system_privileges: self.system_privileges.pending(),
2499            storage_collection_metadata: self.storage_collection_metadata.pending(),
2500            unfinalized_shards: self.unfinalized_shards.pending(),
2501            txn_wal_shard: self.txn_wal_shard.pending(),
2502            audit_log_updates,
2503            upper: self.upper,
2504        };
2505        (txn_batch, self.durable_catalog)
2506    }
2507
2508    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2509    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2510    /// before proceeding. In general, this must be fatal to the calling process. We do not
2511    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2512    ///
2513    /// The transaction is committed at `commit_ts`.
2514    ///
2515    /// Returns what the upper was directly after the transaction committed.
2516    ///
2517    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2518    /// catalog is not writeable.
2519    #[mz_ore::instrument(level = "debug")]
2520    pub(crate) async fn commit_internal(
2521        self,
2522        commit_ts: mz_repr::Timestamp,
2523    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2524        let (mut txn_batch, durable_catalog) = self.into_parts();
2525        let TransactionBatch {
2526            databases,
2527            schemas,
2528            items,
2529            comments,
2530            roles,
2531            role_auth,
2532            clusters,
2533            cluster_replicas,
2534            network_policies,
2535            introspection_sources,
2536            id_allocator,
2537            configs,
2538            source_references,
2539            settings,
2540            system_gid_mapping,
2541            system_configurations,
2542            default_privileges,
2543            system_privileges,
2544            storage_collection_metadata,
2545            unfinalized_shards,
2546            txn_wal_shard,
2547            audit_log_updates,
2548            upper,
2549        } = &mut txn_batch;
2550        // Consolidate in memory because it will likely be faster than consolidating after the
2551        // transaction has been made durable.
2552        differential_dataflow::consolidation::consolidate_updates(databases);
2553        differential_dataflow::consolidation::consolidate_updates(schemas);
2554        differential_dataflow::consolidation::consolidate_updates(items);
2555        differential_dataflow::consolidation::consolidate_updates(comments);
2556        differential_dataflow::consolidation::consolidate_updates(roles);
2557        differential_dataflow::consolidation::consolidate_updates(role_auth);
2558        differential_dataflow::consolidation::consolidate_updates(clusters);
2559        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2560        differential_dataflow::consolidation::consolidate_updates(network_policies);
2561        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2562        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2563        differential_dataflow::consolidation::consolidate_updates(configs);
2564        differential_dataflow::consolidation::consolidate_updates(settings);
2565        differential_dataflow::consolidation::consolidate_updates(source_references);
2566        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2567        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2568        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2569        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2570        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2571        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2572        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2573        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2574
2575        assert!(
2576            commit_ts >= *upper,
2577            "expected commit ts, {}, to be greater than or equal to upper, {}",
2578            commit_ts,
2579            upper
2580        );
2581        let upper = durable_catalog
2582            .commit_transaction(txn_batch, commit_ts)
2583            .await?;
2584        Ok((durable_catalog, upper))
2585    }
2586
2587    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2588    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2589    /// before proceeding. In general, this must be fatal to the calling process. We do not
2590    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2591    ///
2592    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2593    /// catalog is not writeable.
2594    ///
2595    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2596    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2597    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2598    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2599    ///
2600    /// An alternative implementation would be for the caller to explicitly consume their updates
2601    /// after committing and only then apply the updates in-memory. While this removes assumptions
2602    /// about the caller in this method, in practice it results in duplicate work on every commit.
2603    #[mz_ore::instrument(level = "debug")]
2604    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2605        let op_updates = self.get_op_updates();
2606        assert!(
2607            op_updates.is_empty(),
2608            "unconsumed transaction updates: {op_updates:?}"
2609        );
2610
2611        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2612        // Drain all the updates from the commit since it is assumed that they were already applied.
2613        let updates = durable_storage.sync_updates(upper).await?;
2614        // Writable and savepoint catalogs should have consumed all updates before committing a
2615        // transaction, otherwise the commit was performed with an out of date state.
2616        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2617        // updates before committing.
2618        soft_assert_no_log!(
2619            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2620            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2621        );
2622        Ok(())
2623    }
2624}
2625
2626use crate::durable::async_trait;
2627
2628use super::objects::{RoleAuthKey, RoleAuthValue};
2629
2630#[async_trait]
2631impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2632    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2633        self.storage_collection_metadata
2634            .items()
2635            .into_iter()
2636            .map(
2637                |(
2638                    StorageCollectionMetadataKey { id },
2639                    StorageCollectionMetadataValue { shard },
2640                )| { (*id, shard.clone()) },
2641            )
2642            .collect()
2643    }
2644
2645    fn insert_collection_metadata(
2646        &mut self,
2647        metadata: BTreeMap<GlobalId, ShardId>,
2648    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2649        for (id, shard) in metadata {
2650            self.storage_collection_metadata
2651                .insert(
2652                    StorageCollectionMetadataKey { id },
2653                    StorageCollectionMetadataValue {
2654                        shard: shard.clone(),
2655                    },
2656                    self.op_id,
2657                )
2658                .map_err(|err| match err {
2659                    DurableCatalogError::DuplicateKey => {
2660                        StorageError::CollectionMetadataAlreadyExists(id)
2661                    }
2662                    DurableCatalogError::UniquenessViolation => {
2663                        StorageError::PersistShardAlreadyInUse(shard)
2664                    }
2665                    err => StorageError::Generic(anyhow::anyhow!(err)),
2666                })?;
2667        }
2668        Ok(())
2669    }
2670
2671    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2672        let ks: Vec<_> = ids
2673            .into_iter()
2674            .map(|id| StorageCollectionMetadataKey { id })
2675            .collect();
2676        self.storage_collection_metadata
2677            .delete_by_keys(ks, self.op_id)
2678            .into_iter()
2679            .map(
2680                |(
2681                    StorageCollectionMetadataKey { id },
2682                    StorageCollectionMetadataValue { shard },
2683                )| (id, shard),
2684            )
2685            .collect()
2686    }
2687
2688    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2689        self.unfinalized_shards
2690            .items()
2691            .into_iter()
2692            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2693            .collect()
2694    }
2695
2696    fn insert_unfinalized_shards(
2697        &mut self,
2698        s: BTreeSet<ShardId>,
2699    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2700        for shard in s {
2701            match self
2702                .unfinalized_shards
2703                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2704            {
2705                // Inserting duplicate keys has no effect.
2706                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2707                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2708            };
2709        }
2710        Ok(())
2711    }
2712
2713    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2714        let ks: Vec<_> = shards
2715            .into_iter()
2716            .map(|shard| UnfinalizedShardKey { shard })
2717            .collect();
2718        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2719    }
2720
2721    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2722        self.txn_wal_shard
2723            .values()
2724            .iter()
2725            .next()
2726            .map(|TxnWalShardValue { shard }| *shard)
2727    }
2728
2729    fn write_txn_wal_shard(
2730        &mut self,
2731        shard: ShardId,
2732    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2733        self.txn_wal_shard
2734            .insert((), TxnWalShardValue { shard }, self.op_id)
2735            .map_err(|err| match err {
2736                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2737                err => StorageError::Generic(anyhow::anyhow!(err)),
2738            })
2739    }
2740}
2741
2742/// Describes a set of changes to apply as the result of a catalog transaction.
2743#[derive(Debug, Clone, Default, PartialEq)]
2744pub struct TransactionBatch {
2745    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2746    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2747    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2748    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2749    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2750    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2751    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2752    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2753    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2754    pub(crate) introspection_sources: Vec<(
2755        proto::ClusterIntrospectionSourceIndexKey,
2756        proto::ClusterIntrospectionSourceIndexValue,
2757        Diff,
2758    )>,
2759    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2760    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2761    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2762    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2763    pub(crate) system_configurations: Vec<(
2764        proto::ServerConfigurationKey,
2765        proto::ServerConfigurationValue,
2766        Diff,
2767    )>,
2768    pub(crate) default_privileges: Vec<(
2769        proto::DefaultPrivilegesKey,
2770        proto::DefaultPrivilegesValue,
2771        Diff,
2772    )>,
2773    pub(crate) source_references: Vec<(
2774        proto::SourceReferencesKey,
2775        proto::SourceReferencesValue,
2776        Diff,
2777    )>,
2778    pub(crate) system_privileges: Vec<(
2779        proto::SystemPrivilegesKey,
2780        proto::SystemPrivilegesValue,
2781        Diff,
2782    )>,
2783    pub(crate) storage_collection_metadata: Vec<(
2784        proto::StorageCollectionMetadataKey,
2785        proto::StorageCollectionMetadataValue,
2786        Diff,
2787    )>,
2788    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2789    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2790    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2791    /// The upper of the catalog when the transaction started.
2792    pub(crate) upper: mz_repr::Timestamp,
2793}
2794
2795impl TransactionBatch {
2796    pub fn is_empty(&self) -> bool {
2797        let TransactionBatch {
2798            databases,
2799            schemas,
2800            items,
2801            comments,
2802            roles,
2803            role_auth,
2804            clusters,
2805            cluster_replicas,
2806            network_policies,
2807            introspection_sources,
2808            id_allocator,
2809            configs,
2810            settings,
2811            source_references,
2812            system_gid_mapping,
2813            system_configurations,
2814            default_privileges,
2815            system_privileges,
2816            storage_collection_metadata,
2817            unfinalized_shards,
2818            txn_wal_shard,
2819            audit_log_updates,
2820            upper: _,
2821        } = self;
2822        databases.is_empty()
2823            && schemas.is_empty()
2824            && items.is_empty()
2825            && comments.is_empty()
2826            && roles.is_empty()
2827            && role_auth.is_empty()
2828            && clusters.is_empty()
2829            && cluster_replicas.is_empty()
2830            && network_policies.is_empty()
2831            && introspection_sources.is_empty()
2832            && id_allocator.is_empty()
2833            && configs.is_empty()
2834            && settings.is_empty()
2835            && source_references.is_empty()
2836            && system_gid_mapping.is_empty()
2837            && system_configurations.is_empty()
2838            && default_privileges.is_empty()
2839            && system_privileges.is_empty()
2840            && storage_collection_metadata.is_empty()
2841            && unfinalized_shards.is_empty()
2842            && txn_wal_shard.is_empty()
2843            && audit_log_updates.is_empty()
2844    }
2845}
2846
2847#[derive(Debug, Clone, PartialEq, Eq)]
2848struct TransactionUpdate<V> {
2849    value: V,
2850    ts: Timestamp,
2851    diff: Diff,
2852}
2853
2854/// Utility trait to check for plan validity.
2855trait UniqueName {
2856    /// Does the item have a unique name? If yes, we can check for name equality in validity
2857    /// checking.
2858    const HAS_UNIQUE_NAME: bool;
2859    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2860    fn unique_name(&self) -> &str;
2861}
2862
2863mod unique_name {
2864    use crate::durable::objects::*;
2865
2866    macro_rules! impl_unique_name {
2867        ($($t:ty),* $(,)?) => {
2868            $(
2869                impl crate::durable::transaction::UniqueName for $t {
2870                    const HAS_UNIQUE_NAME: bool = true;
2871                    fn unique_name(&self) -> &str {
2872                        &self.name
2873                    }
2874                }
2875            )*
2876        };
2877    }
2878
2879    macro_rules! impl_no_unique_name {
2880        ($($t:ty),* $(,)?) => {
2881            $(
2882                impl crate::durable::transaction::UniqueName for $t {
2883                    const HAS_UNIQUE_NAME: bool = false;
2884                    fn unique_name(&self) -> &str {
2885                       ""
2886                    }
2887                }
2888            )*
2889        };
2890    }
2891
2892    impl_unique_name! {
2893        ClusterReplicaValue,
2894        ClusterValue,
2895        DatabaseValue,
2896        ItemValue,
2897        NetworkPolicyValue,
2898        RoleValue,
2899        SchemaValue,
2900    }
2901
2902    impl_no_unique_name!(
2903        (),
2904        ClusterIntrospectionSourceIndexValue,
2905        CommentValue,
2906        ConfigValue,
2907        DefaultPrivilegesValue,
2908        GidMappingValue,
2909        IdAllocValue,
2910        ServerConfigurationValue,
2911        SettingValue,
2912        SourceReferencesValue,
2913        StorageCollectionMetadataValue,
2914        SystemPrivilegesValue,
2915        TxnWalShardValue,
2916        RoleAuthValue,
2917    );
2918
2919    #[cfg(test)]
2920    mod test {
2921        impl_no_unique_name!(String,);
2922    }
2923}
2924
2925/// TableTransaction emulates some features of a typical SQL transaction over
2926/// table for a Collection.
2927///
2928/// It supports:
2929/// - uniqueness constraints
2930/// - transactional reads and writes (including read-your-writes before commit)
2931///
2932/// `K` is the primary key type. Multiple entries with the same key are disallowed.
2933/// `V` is the an arbitrary value type.
2934#[derive(Debug)]
2935struct TableTransaction<K, V> {
2936    initial: BTreeMap<K, V>,
2937    // The desired updates to keys after commit.
2938    // Invariant: Value is sorted by `ts`.
2939    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2940    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2941}
2942
2943impl<K, V> TableTransaction<K, V>
2944where
2945    K: Ord + Eq + Clone + Debug,
2946    V: Ord + Clone + Debug + UniqueName,
2947{
2948    /// Create a new TableTransaction with initial data.
2949    ///
2950    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
2951    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
2952    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
2953    /// over.
2954    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2955    where
2956        K: RustType<KP>,
2957        V: RustType<VP>,
2958    {
2959        let initial = initial
2960            .into_iter()
2961            .map(RustType::from_proto)
2962            .collect::<Result<_, _>>()?;
2963
2964        Ok(Self {
2965            initial,
2966            pending: BTreeMap::new(),
2967            uniqueness_violation: None,
2968        })
2969    }
2970
2971    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
2972    /// that determines whether there is a uniqueness violation among two values.
2973    fn new_with_uniqueness_fn<KP, VP>(
2974        initial: BTreeMap<KP, VP>,
2975        uniqueness_violation: fn(a: &V, b: &V) -> bool,
2976    ) -> Result<Self, TryFromProtoError>
2977    where
2978        K: RustType<KP>,
2979        V: RustType<VP>,
2980    {
2981        let initial = initial
2982            .into_iter()
2983            .map(RustType::from_proto)
2984            .collect::<Result<_, _>>()?;
2985
2986        Ok(Self {
2987            initial,
2988            pending: BTreeMap::new(),
2989            uniqueness_violation: Some(uniqueness_violation),
2990        })
2991    }
2992
2993    /// Consumes and returns the pending changes and their diffs. `Diff` is
2994    /// guaranteed to be 1 or -1.
2995    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2996    where
2997        K: RustType<KP>,
2998        V: RustType<VP>,
2999    {
3000        soft_assert_no_log!(self.verify().is_ok());
3001        // Pending describes the desired final state for some keys. K,V pairs should be
3002        // retracted if they already exist and were deleted or are being updated.
3003        self.pending
3004            .into_iter()
3005            .flat_map(|(k, v)| {
3006                let mut v: Vec<_> = v
3007                    .into_iter()
3008                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3009                    .collect();
3010                differential_dataflow::consolidation::consolidate(&mut v);
3011                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3012            })
3013            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3014            .collect()
3015    }
3016
3017    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
3018    ///
3019    /// Runtime is O(n^2), where n is the number of items in `self`, if
3020    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
3021    fn verify(&self) -> Result<(), DurableCatalogError> {
3022        if let Some(uniqueness_violation) = self.uniqueness_violation {
3023            // Compare each value to each other value and ensure they are unique.
3024            let items = self.values();
3025            if V::HAS_UNIQUE_NAME {
3026                let by_name: BTreeMap<_, _> = items
3027                    .iter()
3028                    .enumerate()
3029                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3030                    .collect();
3031                for (i, vi) in items.iter().enumerate() {
3032                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3033                        if i != *j && uniqueness_violation(vi, *vj) {
3034                            return Err(DurableCatalogError::UniquenessViolation);
3035                        }
3036                    }
3037                }
3038            } else {
3039                for (i, vi) in items.iter().enumerate() {
3040                    for (j, vj) in items.iter().enumerate() {
3041                        if i != j && uniqueness_violation(vi, vj) {
3042                            return Err(DurableCatalogError::UniquenessViolation);
3043                        }
3044                    }
3045                }
3046            }
3047        }
3048        soft_assert_no_log!(
3049            self.pending
3050                .values()
3051                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3052            "pending should be sorted by timestamp: {:?}",
3053            self.pending
3054        );
3055        Ok(())
3056    }
3057
3058    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3059    ///
3060    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3061    /// items in `keys`.
3062    fn verify_keys<'a>(
3063        &self,
3064        keys: impl IntoIterator<Item = &'a K>,
3065    ) -> Result<(), DurableCatalogError>
3066    where
3067        K: 'a,
3068    {
3069        if let Some(uniqueness_violation) = self.uniqueness_violation {
3070            let entries: Vec<_> = keys
3071                .into_iter()
3072                .filter_map(|key| self.get(key).map(|value| (key, value)))
3073                .collect();
3074            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3075            for (ki, vi) in self.items() {
3076                for (kj, vj) in &entries {
3077                    if ki != *kj && uniqueness_violation(vi, vj) {
3078                        return Err(DurableCatalogError::UniquenessViolation);
3079                    }
3080                }
3081            }
3082        }
3083        soft_assert_no_log!(self.verify().is_ok());
3084        Ok(())
3085    }
3086
3087    /// Iterates over the items viewable in the current transaction in arbitrary
3088    /// order and applies `f` on all key, value pairs.
3089    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3090        let mut seen = BTreeSet::new();
3091        for k in self.pending.keys() {
3092            seen.insert(k);
3093            let v = self.get(k);
3094            // Deleted items don't exist so shouldn't be visited, but still suppress
3095            // visiting the key later.
3096            if let Some(v) = v {
3097                f(k, v);
3098            }
3099        }
3100        for (k, v) in self.initial.iter() {
3101            // Add on initial items that don't have updates.
3102            if !seen.contains(k) {
3103                f(k, v);
3104            }
3105        }
3106    }
3107
3108    /// Returns the current value of `k`.
3109    fn get(&self, k: &K) -> Option<&V> {
3110        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3111        let mut updates = Vec::with_capacity(pending.len() + 1);
3112        if let Some(initial) = self.initial.get(k) {
3113            updates.push((initial, Diff::ONE));
3114        }
3115        updates.extend(
3116            pending
3117                .into_iter()
3118                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3119        );
3120
3121        differential_dataflow::consolidation::consolidate(&mut updates);
3122        assert!(updates.len() <= 1);
3123        updates.into_iter().next().map(|(v, _)| v)
3124    }
3125
3126    /// Returns the items viewable in the current transaction. The items are
3127    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3128    /// [`Self::for_values`].
3129    // Used by tests.
3130    #[cfg(test)]
3131    fn items_cloned(&self) -> BTreeMap<K, V> {
3132        let mut items = BTreeMap::new();
3133        self.for_values(|k, v| {
3134            items.insert(k.clone(), v.clone());
3135        });
3136        items
3137    }
3138
3139    /// Returns the current items as proto-typed key-value pairs, suitable for
3140    /// constructing a [`Snapshot`]. This merges `initial` and `pending` to
3141    /// produce the current view and converts back to proto types.
3142    fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3143    where
3144        K: RustType<KP>,
3145        V: RustType<VP>,
3146        KP: Ord,
3147    {
3148        let mut items = BTreeMap::new();
3149        self.for_values(|k, v| {
3150            items.insert(k.into_proto(), v.into_proto());
3151        });
3152        items
3153    }
3154
3155    /// Returns the items viewable in the current transaction as references. Returns a map
3156    /// of references.
3157    fn items(&self) -> BTreeMap<&K, &V> {
3158        let mut items = BTreeMap::new();
3159        self.for_values(|k, v| {
3160            items.insert(k, v);
3161        });
3162        items
3163    }
3164
3165    /// Returns the values viewable in the current transaction as references.
3166    fn values(&self) -> BTreeSet<&V> {
3167        let mut items = BTreeSet::new();
3168        self.for_values(|_, v| {
3169            items.insert(v);
3170        });
3171        items
3172    }
3173
3174    /// Returns the number of items viewable in the current transaction.
3175    fn len(&self) -> usize {
3176        let mut count = 0;
3177        self.for_values(|_, _| {
3178            count += 1;
3179        });
3180        count
3181    }
3182
3183    /// Iterates over the items viewable in the current transaction, and provides a
3184    /// map where additional pending items can be inserted, which will be appended
3185    /// to current pending items. Does not verify uniqueness.
3186    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3187        &mut self,
3188        mut f: F,
3189    ) {
3190        let mut pending = BTreeMap::new();
3191        self.for_values(|k, v| f(&mut pending, k, v));
3192        for (k, updates) in pending {
3193            self.pending.entry(k).or_default().extend(updates);
3194        }
3195    }
3196
3197    /// Inserts a new k,v pair.
3198    ///
3199    /// Returns an error if the uniqueness check failed or the key already exists.
3200    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3201        let mut violation = None;
3202        self.for_values(|for_k, for_v| {
3203            if &k == for_k {
3204                violation = Some(DurableCatalogError::DuplicateKey);
3205            }
3206            if let Some(uniqueness_violation) = self.uniqueness_violation {
3207                if uniqueness_violation(for_v, &v) {
3208                    violation = Some(DurableCatalogError::UniquenessViolation);
3209                }
3210            }
3211        });
3212        if let Some(violation) = violation {
3213            return Err(violation);
3214        }
3215        self.pending.entry(k).or_default().push(TransactionUpdate {
3216            value: v,
3217            ts,
3218            diff: Diff::ONE,
3219        });
3220        soft_assert_no_log!(self.verify().is_ok());
3221        Ok(())
3222    }
3223
3224    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3225    /// value should be updated, otherwise `None`. Returns the number of changed
3226    /// entries.
3227    ///
3228    /// Returns an error if the uniqueness check failed.
3229    ///
3230    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3231    /// better performance.
3232    fn update<F: Fn(&K, &V) -> Option<V>>(
3233        &mut self,
3234        f: F,
3235        ts: Timestamp,
3236    ) -> Result<Diff, DurableCatalogError> {
3237        let mut changed = Diff::ZERO;
3238        let mut keys = BTreeSet::new();
3239        // Keep a copy of pending in case of uniqueness violation.
3240        let pending = self.pending.clone();
3241        self.for_values_mut(|p, k, v| {
3242            if let Some(next) = f(k, v) {
3243                changed += Diff::ONE;
3244                keys.insert(k.clone());
3245                let updates = p.entry(k.clone()).or_default();
3246                updates.push(TransactionUpdate {
3247                    value: v.clone(),
3248                    ts,
3249                    diff: Diff::MINUS_ONE,
3250                });
3251                updates.push(TransactionUpdate {
3252                    value: next,
3253                    ts,
3254                    diff: Diff::ONE,
3255                });
3256            }
3257        });
3258        // Check for uniqueness violation.
3259        if let Err(err) = self.verify_keys(&keys) {
3260            self.pending = pending;
3261            Err(err)
3262        } else {
3263            Ok(changed)
3264        }
3265    }
3266
3267    /// Updates `k`, `v` pair if `k` already exists in `self`.
3268    ///
3269    /// Returns `true` if `k` was updated, `false` otherwise.
3270    /// Returns an error if the uniqueness check failed.
3271    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3272        if let Some(cur_v) = self.get(&k) {
3273            if v != *cur_v {
3274                self.set(k, Some(v), ts)?;
3275            }
3276            Ok(true)
3277        } else {
3278            Ok(false)
3279        }
3280    }
3281
3282    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3283    ///
3284    /// Returns the number of changed entries.
3285    /// Returns an error if the uniqueness check failed.
3286    fn update_by_keys(
3287        &mut self,
3288        kvs: impl IntoIterator<Item = (K, V)>,
3289        ts: Timestamp,
3290    ) -> Result<Diff, DurableCatalogError> {
3291        let kvs: Vec<_> = kvs
3292            .into_iter()
3293            .filter_map(|(k, v)| match self.get(&k) {
3294                // Record if updating this entry would be a no-op.
3295                Some(cur_v) => Some((*cur_v == v, k, v)),
3296                None => None,
3297            })
3298            .collect();
3299        let changed = kvs.len();
3300        let changed =
3301            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3302        let kvs = kvs
3303            .into_iter()
3304            // Filter out no-ops to save some work.
3305            .filter(|(no_op, _, _)| !no_op)
3306            .map(|(_, k, v)| (k, Some(v)))
3307            .collect();
3308        self.set_many(kvs, ts)?;
3309        Ok(changed)
3310    }
3311
3312    /// Set the value for a key. Returns the previous entry if the key existed,
3313    /// otherwise None.
3314    ///
3315    /// Returns an error if the uniqueness check failed.
3316    ///
3317    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3318    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3319        let prev = self.get(&k).cloned();
3320        let entry = self.pending.entry(k.clone()).or_default();
3321        let restore_len = entry.len();
3322
3323        match (v, prev.clone()) {
3324            (Some(v), Some(prev)) => {
3325                entry.push(TransactionUpdate {
3326                    value: prev,
3327                    ts,
3328                    diff: Diff::MINUS_ONE,
3329                });
3330                entry.push(TransactionUpdate {
3331                    value: v,
3332                    ts,
3333                    diff: Diff::ONE,
3334                });
3335            }
3336            (Some(v), None) => {
3337                entry.push(TransactionUpdate {
3338                    value: v,
3339                    ts,
3340                    diff: Diff::ONE,
3341                });
3342            }
3343            (None, Some(prev)) => {
3344                entry.push(TransactionUpdate {
3345                    value: prev,
3346                    ts,
3347                    diff: Diff::MINUS_ONE,
3348                });
3349            }
3350            (None, None) => {}
3351        }
3352
3353        // Check for uniqueness violation.
3354        if let Err(err) = self.verify_keys([&k]) {
3355            // Revert self.pending to the state it was in before calling this
3356            // function.
3357            let pending = self.pending.get_mut(&k).expect("inserted above");
3358            pending.truncate(restore_len);
3359            Err(err)
3360        } else {
3361            Ok(prev)
3362        }
3363    }
3364
3365    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3366    /// otherwise None.
3367    ///
3368    /// Returns an error if any uniqueness check failed.
3369    fn set_many(
3370        &mut self,
3371        kvs: BTreeMap<K, Option<V>>,
3372        ts: Timestamp,
3373    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3374        if kvs.is_empty() {
3375            return Ok(BTreeMap::new());
3376        }
3377
3378        let mut prevs = BTreeMap::new();
3379        let mut restores = BTreeMap::new();
3380
3381        for (k, v) in kvs {
3382            let prev = self.get(&k).cloned();
3383            let entry = self.pending.entry(k.clone()).or_default();
3384            restores.insert(k.clone(), entry.len());
3385
3386            match (v, prev.clone()) {
3387                (Some(v), Some(prev)) => {
3388                    entry.push(TransactionUpdate {
3389                        value: prev,
3390                        ts,
3391                        diff: Diff::MINUS_ONE,
3392                    });
3393                    entry.push(TransactionUpdate {
3394                        value: v,
3395                        ts,
3396                        diff: Diff::ONE,
3397                    });
3398                }
3399                (Some(v), None) => {
3400                    entry.push(TransactionUpdate {
3401                        value: v,
3402                        ts,
3403                        diff: Diff::ONE,
3404                    });
3405                }
3406                (None, Some(prev)) => {
3407                    entry.push(TransactionUpdate {
3408                        value: prev,
3409                        ts,
3410                        diff: Diff::MINUS_ONE,
3411                    });
3412                }
3413                (None, None) => {}
3414            }
3415
3416            prevs.insert(k, prev);
3417        }
3418
3419        // Check for uniqueness violation.
3420        if let Err(err) = self.verify_keys(prevs.keys()) {
3421            for (k, restore_len) in restores {
3422                // Revert self.pending to the state it was in before calling this
3423                // function.
3424                let pending = self.pending.get_mut(&k).expect("inserted above");
3425                pending.truncate(restore_len);
3426            }
3427            Err(err)
3428        } else {
3429            Ok(prevs)
3430        }
3431    }
3432
3433    /// Deletes items for which `f` returns true. Returns the keys and values of
3434    /// the deleted entries.
3435    ///
3436    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3437    /// better performance.
3438    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3439        let mut deleted = Vec::new();
3440        self.for_values_mut(|p, k, v| {
3441            if f(k, v) {
3442                deleted.push((k.clone(), v.clone()));
3443                p.entry(k.clone()).or_default().push(TransactionUpdate {
3444                    value: v.clone(),
3445                    ts,
3446                    diff: Diff::MINUS_ONE,
3447                });
3448            }
3449        });
3450        soft_assert_no_log!(self.verify().is_ok());
3451        deleted
3452    }
3453
3454    /// Deletes item with key `k`.
3455    ///
3456    /// Returns the value of the deleted entry, if it existed.
3457    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3458        self.set(k, None, ts)
3459            .expect("deleting an entry cannot violate uniqueness")
3460    }
3461
3462    /// Deletes items with key in `ks`.
3463    ///
3464    /// Returns the keys and values of the deleted entries.
3465    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3466        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3467        let prevs = self
3468            .set_many(kvs, ts)
3469            .expect("deleting entries cannot violate uniqueness");
3470        prevs
3471            .into_iter()
3472            .filter_map(|(k, v)| v.map(|v| (k, v)))
3473            .collect()
3474    }
3475}
3476
3477#[cfg(test)]
3478#[allow(clippy::unwrap_used)]
3479mod tests {
3480    use super::*;
3481
3482    use mz_ore::now::SYSTEM_TIME;
3483    use mz_ore::{assert_none, assert_ok};
3484    use mz_persist_client::cache::PersistClientCache;
3485    use mz_persist_types::PersistLocation;
3486    use semver::Version;
3487
3488    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3489    use crate::memory;
3490
3491    #[mz_ore::test]
3492    fn test_table_transaction_simple() {
3493        fn uniqueness_violation(a: &String, b: &String) -> bool {
3494            a == b
3495        }
3496        let mut table = TableTransaction::new_with_uniqueness_fn(
3497            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3498            uniqueness_violation,
3499        )
3500        .unwrap();
3501
3502        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3503        // for DurableCatalogError.
3504        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3505        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3506        assert!(
3507            table
3508                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3509                .is_err()
3510        );
3511        assert!(
3512            table
3513                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3514                .is_err()
3515        );
3516    }
3517
3518    #[mz_ore::test]
3519    fn test_table_transaction() {
3520        fn uniqueness_violation(a: &String, b: &String) -> bool {
3521            a == b
3522        }
3523        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3524
3525        fn commit(
3526            table: &mut BTreeMap<Vec<u8>, String>,
3527            mut pending: Vec<(Vec<u8>, String, Diff)>,
3528        ) {
3529            // Sort by diff so that we process retractions first.
3530            pending.sort_by(|a, b| a.2.cmp(&b.2));
3531            for (k, v, diff) in pending {
3532                if diff == Diff::MINUS_ONE {
3533                    let prev = table.remove(&k);
3534                    assert_eq!(prev, Some(v));
3535                } else if diff == Diff::ONE {
3536                    let prev = table.insert(k, v);
3537                    assert_eq!(prev, None);
3538                } else {
3539                    panic!("unexpected diff: {diff}");
3540                }
3541            }
3542        }
3543
3544        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3545        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3546        let mut table_txn =
3547            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3548        assert_eq!(table_txn.items_cloned(), table);
3549        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3550        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3551        assert_eq!(
3552            table_txn.items_cloned(),
3553            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3554        );
3555        assert_eq!(
3556            table_txn
3557                .update(|_k, _v| Some("v3".to_string()), 2)
3558                .unwrap(),
3559            Diff::ONE
3560        );
3561
3562        // Uniqueness violation.
3563        table_txn
3564            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3565            .unwrap_err();
3566
3567        table_txn
3568            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3569            .unwrap();
3570        assert_eq!(
3571            table_txn.items_cloned(),
3572            BTreeMap::from([
3573                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3574                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3575            ])
3576        );
3577        let err = table_txn
3578            .update(|_k, _v| Some("v1".to_string()), 5)
3579            .unwrap_err();
3580        assert!(
3581            matches!(err, DurableCatalogError::UniquenessViolation),
3582            "unexpected err: {err:?}"
3583        );
3584        let pending = table_txn.pending();
3585        assert_eq!(
3586            pending,
3587            vec![
3588                (
3589                    1i64.to_le_bytes().to_vec(),
3590                    "v1".to_string(),
3591                    Diff::MINUS_ONE
3592                ),
3593                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3594                (
3595                    2i64.to_le_bytes().to_vec(),
3596                    "v2".to_string(),
3597                    Diff::MINUS_ONE
3598                ),
3599                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3600            ]
3601        );
3602        commit(&mut table, pending);
3603        assert_eq!(
3604            table,
3605            BTreeMap::from([
3606                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3607                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3608            ])
3609        );
3610
3611        let mut table_txn =
3612            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3613        // Deleting then creating an item that has a uniqueness violation should work.
3614        assert_eq!(
3615            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3616            1
3617        );
3618        table_txn
3619            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3620            .unwrap();
3621        // Uniqueness violation in value.
3622        table_txn
3623            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3624            .unwrap_err();
3625        // Key already exists, expect error.
3626        table_txn
3627            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3628            .unwrap_err();
3629        assert_eq!(
3630            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3631            1
3632        );
3633        // Both the inserts work now because the key and uniqueness violation are gone.
3634        table_txn
3635            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3636            .unwrap();
3637        table_txn
3638            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3639            .unwrap();
3640        let pending = table_txn.pending();
3641        assert_eq!(
3642            pending,
3643            vec![
3644                (
3645                    1i64.to_le_bytes().to_vec(),
3646                    "v3".to_string(),
3647                    Diff::MINUS_ONE
3648                ),
3649                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3650                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3651            ]
3652        );
3653        commit(&mut table, pending);
3654        assert_eq!(
3655            table,
3656            BTreeMap::from([
3657                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3658                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3659                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3660            ])
3661        );
3662
3663        let mut table_txn =
3664            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3665        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3666        table_txn
3667            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3668            .unwrap();
3669
3670        commit(&mut table, table_txn.pending());
3671        assert_eq!(
3672            table,
3673            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3674        );
3675
3676        let mut table_txn =
3677            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3678        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3679        table_txn
3680            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3681            .unwrap();
3682        commit(&mut table, table_txn.pending());
3683        assert_eq!(
3684            table,
3685            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3686        );
3687
3688        // Verify we don't try to delete v3 or v4 during commit.
3689        let mut table_txn =
3690            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3691        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3692        table_txn
3693            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3694            .unwrap();
3695        table_txn
3696            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3697            .unwrap_err();
3698        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3699        table_txn
3700            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3701            .unwrap();
3702        commit(&mut table, table_txn.pending());
3703        assert_eq!(
3704            table.clone().into_iter().collect::<Vec<_>>(),
3705            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3706        );
3707
3708        // Test `set`.
3709        let mut table_txn =
3710            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3711        // Uniqueness violation.
3712        table_txn
3713            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3714            .unwrap_err();
3715        table_txn
3716            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3717            .unwrap();
3718        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3719        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3720        let pending = table_txn.pending();
3721        assert_eq!(
3722            pending,
3723            vec![
3724                (
3725                    1i64.to_le_bytes().to_vec(),
3726                    "v5".to_string(),
3727                    Diff::MINUS_ONE
3728                ),
3729                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3730            ]
3731        );
3732        commit(&mut table, pending);
3733        assert_eq!(
3734            table,
3735            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3736        );
3737
3738        // Duplicate `set`.
3739        let mut table_txn =
3740            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3741        table_txn
3742            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3743            .unwrap();
3744        let pending = table_txn.pending::<Vec<u8>, String>();
3745        assert!(pending.is_empty());
3746
3747        // Test `set_many`.
3748        let mut table_txn =
3749            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3750        // Uniqueness violation.
3751        table_txn
3752            .set_many(
3753                BTreeMap::from([
3754                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3755                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3756                ]),
3757                0,
3758            )
3759            .unwrap_err();
3760        table_txn
3761            .set_many(
3762                BTreeMap::from([
3763                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3764                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3765                ]),
3766                1,
3767            )
3768            .unwrap();
3769        table_txn
3770            .set_many(
3771                BTreeMap::from([
3772                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3773                    (3i64.to_le_bytes().to_vec(), None),
3774                ]),
3775                2,
3776            )
3777            .unwrap();
3778        let pending = table_txn.pending();
3779        assert_eq!(
3780            pending,
3781            vec![
3782                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3783                (
3784                    3i64.to_le_bytes().to_vec(),
3785                    "v6".to_string(),
3786                    Diff::MINUS_ONE
3787                ),
3788                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3789            ]
3790        );
3791        commit(&mut table, pending);
3792        assert_eq!(
3793            table,
3794            BTreeMap::from([
3795                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3796                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3797            ])
3798        );
3799
3800        // Duplicate `set_many`.
3801        let mut table_txn =
3802            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3803        table_txn
3804            .set_many(
3805                BTreeMap::from([
3806                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3807                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3808                ]),
3809                0,
3810            )
3811            .unwrap();
3812        let pending = table_txn.pending::<Vec<u8>, String>();
3813        assert!(pending.is_empty());
3814        commit(&mut table, pending);
3815        assert_eq!(
3816            table,
3817            BTreeMap::from([
3818                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3819                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3820            ])
3821        );
3822
3823        // Test `update_by_key`
3824        let mut table_txn =
3825            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3826        // Uniqueness violation.
3827        table_txn
3828            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3829            .unwrap_err();
3830        assert!(
3831            table_txn
3832                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3833                .unwrap()
3834        );
3835        assert!(
3836            !table_txn
3837                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3838                .unwrap()
3839        );
3840        let pending = table_txn.pending();
3841        assert_eq!(
3842            pending,
3843            vec![
3844                (
3845                    1i64.to_le_bytes().to_vec(),
3846                    "v6".to_string(),
3847                    Diff::MINUS_ONE
3848                ),
3849                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3850            ]
3851        );
3852        commit(&mut table, pending);
3853        assert_eq!(
3854            table,
3855            BTreeMap::from([
3856                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3857                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3858            ])
3859        );
3860
3861        // Duplicate `update_by_key`.
3862        let mut table_txn =
3863            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3864        assert!(
3865            table_txn
3866                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3867                .unwrap()
3868        );
3869        let pending = table_txn.pending::<Vec<u8>, String>();
3870        assert!(pending.is_empty());
3871        commit(&mut table, pending);
3872        assert_eq!(
3873            table,
3874            BTreeMap::from([
3875                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3876                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3877            ])
3878        );
3879
3880        // Test `update_by_keys`
3881        let mut table_txn =
3882            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3883        // Uniqueness violation.
3884        table_txn
3885            .update_by_keys(
3886                [
3887                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3888                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3889                ],
3890                0,
3891            )
3892            .unwrap_err();
3893        let n = table_txn
3894            .update_by_keys(
3895                [
3896                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3897                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3898                ],
3899                1,
3900            )
3901            .unwrap();
3902        assert_eq!(n, Diff::ONE);
3903        let n = table_txn
3904            .update_by_keys(
3905                [
3906                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3907                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3908                ],
3909                2,
3910            )
3911            .unwrap();
3912        assert_eq!(n, Diff::ZERO);
3913        let pending = table_txn.pending();
3914        assert_eq!(
3915            pending,
3916            vec![
3917                (
3918                    1i64.to_le_bytes().to_vec(),
3919                    "v8".to_string(),
3920                    Diff::MINUS_ONE
3921                ),
3922                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3923            ]
3924        );
3925        commit(&mut table, pending);
3926        assert_eq!(
3927            table,
3928            BTreeMap::from([
3929                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3930                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3931            ])
3932        );
3933
3934        // Duplicate `update_by_keys`.
3935        let mut table_txn =
3936            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3937        let n = table_txn
3938            .update_by_keys(
3939                [
3940                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3941                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3942                ],
3943                0,
3944            )
3945            .unwrap();
3946        assert_eq!(n, Diff::from(2));
3947        let pending = table_txn.pending::<Vec<u8>, String>();
3948        assert!(pending.is_empty());
3949        commit(&mut table, pending);
3950        assert_eq!(
3951            table,
3952            BTreeMap::from([
3953                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3954                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3955            ])
3956        );
3957
3958        // Test `delete_by_key`
3959        let mut table_txn =
3960            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3961        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3962        assert_eq!(prev, Some("v9".to_string()));
3963        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3964        assert_none!(prev);
3965        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3966        assert_none!(prev);
3967        let pending = table_txn.pending();
3968        assert_eq!(
3969            pending,
3970            vec![(
3971                1i64.to_le_bytes().to_vec(),
3972                "v9".to_string(),
3973                Diff::MINUS_ONE
3974            ),]
3975        );
3976        commit(&mut table, pending);
3977        assert_eq!(
3978            table,
3979            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3980        );
3981
3982        // Test `delete_by_keys`
3983        let mut table_txn =
3984            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3985        let prevs = table_txn.delete_by_keys(
3986            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3987            0,
3988        );
3989        assert_eq!(
3990            prevs,
3991            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3992        );
3993        let prevs = table_txn.delete_by_keys(
3994            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3995            1,
3996        );
3997        assert_eq!(prevs, vec![]);
3998        let prevs = table_txn.delete_by_keys(
3999            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4000            2,
4001        );
4002        assert_eq!(prevs, vec![]);
4003        let pending = table_txn.pending();
4004        assert_eq!(
4005            pending,
4006            vec![(
4007                42i64.to_le_bytes().to_vec(),
4008                "v7".to_string(),
4009                Diff::MINUS_ONE
4010            ),]
4011        );
4012        commit(&mut table, pending);
4013        assert_eq!(table, BTreeMap::new());
4014    }
4015
4016    #[mz_ore::test(tokio::test)]
4017    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
4018    async fn test_savepoint() {
4019        const VERSION: Version = Version::new(26, 0, 0);
4020        let mut persist_cache = PersistClientCache::new_no_metrics();
4021        persist_cache.cfg.build_version = VERSION;
4022        let persist_client = persist_cache
4023            .open(PersistLocation::new_in_mem())
4024            .await
4025            .unwrap();
4026        let state_builder = TestCatalogStateBuilder::new(persist_client)
4027            .with_default_deploy_generation()
4028            .with_version(VERSION);
4029
4030        // Initialize catalog.
4031        let _ = state_builder
4032            .clone()
4033            .unwrap_build()
4034            .await
4035            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4036            .await
4037            .unwrap()
4038            .0;
4039        let mut savepoint_state = state_builder
4040            .unwrap_build()
4041            .await
4042            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4043            .await
4044            .unwrap()
4045            .0;
4046
4047        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4048        assert!(!initial_snapshot.is_empty());
4049
4050        let db_name = "db";
4051        let db_owner = RoleId::User(42);
4052        let db_privileges = Vec::new();
4053        let mut txn = savepoint_state.transaction().await.unwrap();
4054        let (db_id, db_oid) = txn
4055            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4056            .unwrap();
4057        let commit_ts = txn.upper();
4058        txn.commit_internal(commit_ts).await.unwrap();
4059        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4060        let update = updates.into_element();
4061
4062        assert_eq!(update.diff, StateDiff::Addition);
4063
4064        let db = match update.kind {
4065            memory::objects::StateUpdateKind::Database(db) => db,
4066            update => panic!("unexpected update: {update:?}"),
4067        };
4068
4069        assert_eq!(db_id, db.id);
4070        assert_eq!(db_oid, db.oid);
4071        assert_eq!(db_name, db.name);
4072        assert_eq!(db_owner, db.owner_id);
4073        assert_eq!(db_privileges, db.privileges);
4074    }
4075
4076    #[mz_ore::test]
4077    fn test_allocate_introspection_source_index_id() {
4078        let cluster_variant: u8 = 0b0000_0001;
4079        let cluster_id_inner: u64 =
4080            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4081        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4082
4083        let cluster_id = ClusterId::System(cluster_id_inner);
4084        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4085
4086        let introspection_source_index_id: u64 =
4087            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4088
4089        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4090        {
4091            let mut cluster_variant_mask = 0xFF << 56;
4092            cluster_variant_mask &= introspection_source_index_id;
4093            cluster_variant_mask >>= 56;
4094            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4095        }
4096
4097        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4098        {
4099            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4100            cluster_id_inner_mask &= introspection_source_index_id;
4101            cluster_id_inner_mask >>= 8;
4102            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4103        }
4104
4105        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4106        {
4107            let mut log_variant_mask = 0xFF;
4108            log_variant_mask &= introspection_source_index_id;
4109            assert_eq!(
4110                log_variant_mask,
4111                u64::from(timely_messages_received_log_variant)
4112            );
4113        }
4114
4115        let (catalog_item_id, global_id) =
4116            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4117
4118        assert_eq!(
4119            catalog_item_id,
4120            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4121        );
4122        assert_eq!(
4123            global_id,
4124            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4125        );
4126    }
4127}