mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::time::Duration;
13
14use anyhow::anyhow;
15use derivative::Derivative;
16use itertools::Itertools;
17use mz_audit_log::VersionedEvent;
18use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
19use mz_controller_types::{ClusterId, ReplicaId};
20use mz_ore::cast::{u64_to_usize, usize_to_u64};
21use mz_ore::collections::{CollectionExt, HashSet};
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::vec::VecExt;
24use mz_ore::{soft_assert_no_log, soft_assert_or_log};
25use mz_persist_types::ShardId;
26use mz_pgrepr::oid::FIRST_USER_OID;
27use mz_proto::{RustType, TryFromProtoError};
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::role_id::RoleId;
31use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
32use mz_sql::catalog::{
33    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, RoleAttributes, RoleMembership,
34    RoleVars,
35};
36use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
37use mz_sql::plan::NetworkPolicyRule;
38use mz_sql_parser::ast::QualifiedReplica;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::controller::StorageError;
41
42use crate::builtin::BuiltinLog;
43use crate::durable::initialize::{
44    ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
45    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
46};
47use crate::durable::objects::serialization::proto;
48use crate::durable::objects::{
49    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
50    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
51    ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
52    Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
53    DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
54    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
55    ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
56    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
57    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
58    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
59    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
60};
61use crate::durable::{
62    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
63    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
64    EXPRESSION_CACHE_SHARD_KEY, NetworkPolicy, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY,
65    STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
66    SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
67    USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
68};
69use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
70
71type Timestamp = u64;
72
73/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
74/// An operation also logically groups multiple catalog updates together.
75#[derive(Derivative)]
76#[derivative(Debug, PartialEq)]
77pub struct Transaction<'a> {
78    #[derivative(Debug = "ignore")]
79    #[derivative(PartialEq = "ignore")]
80    durable_catalog: &'a mut dyn DurableCatalogState,
81    databases: TableTransaction<DatabaseKey, DatabaseValue>,
82    schemas: TableTransaction<SchemaKey, SchemaValue>,
83    items: TableTransaction<ItemKey, ItemValue>,
84    comments: TableTransaction<CommentKey, CommentValue>,
85    roles: TableTransaction<RoleKey, RoleValue>,
86    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
87    clusters: TableTransaction<ClusterKey, ClusterValue>,
88    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
89    introspection_sources:
90        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
91    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
92    configs: TableTransaction<ConfigKey, ConfigValue>,
93    settings: TableTransaction<SettingKey, SettingValue>,
94    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
95    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
96    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
97    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
98    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
99    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
100    storage_collection_metadata:
101        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
102    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
103    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
104    // Don't make this a table transaction so that it's not read into the
105    // in-memory cache.
106    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
107    /// The upper of `durable_catalog` at the start of the transaction.
108    upper: mz_repr::Timestamp,
109    /// The ID of the current operation of this transaction.
110    op_id: Timestamp,
111}
112
113impl<'a> Transaction<'a> {
114    pub fn new(
115        durable_catalog: &'a mut dyn DurableCatalogState,
116        Snapshot {
117            databases,
118            schemas,
119            roles,
120            role_auth,
121            items,
122            comments,
123            clusters,
124            network_policies,
125            cluster_replicas,
126            introspection_sources,
127            id_allocator,
128            configs,
129            settings,
130            source_references,
131            system_object_mappings,
132            system_configurations,
133            default_privileges,
134            system_privileges,
135            storage_collection_metadata,
136            unfinalized_shards,
137            txn_wal_shard,
138        }: Snapshot,
139        upper: mz_repr::Timestamp,
140    ) -> Result<Transaction<'a>, CatalogError> {
141        Ok(Transaction {
142            durable_catalog,
143            databases: TableTransaction::new_with_uniqueness_fn(
144                databases,
145                |a: &DatabaseValue, b| a.name == b.name,
146            )?,
147            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
148                a.database_id == b.database_id && a.name == b.name
149            })?,
150            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
151                a.schema_id == b.schema_id && a.name == b.name && {
152                    // `item_type` is slow, only compute if needed.
153                    let a_type = a.item_type();
154                    let b_type = b.item_type();
155                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
156                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
157                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
158                }
159            })?,
160            comments: TableTransaction::new(comments)?,
161            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
162                a.name == b.name
163            })?,
164            role_auth: TableTransaction::new(role_auth)?,
165            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
166                a.name == b.name
167            })?,
168            network_policies: TableTransaction::new_with_uniqueness_fn(
169                network_policies,
170                |a: &NetworkPolicyValue, b| a.name == b.name,
171            )?,
172            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
173                cluster_replicas,
174                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
175            )?,
176            introspection_sources: TableTransaction::new(introspection_sources)?,
177            id_allocator: TableTransaction::new(id_allocator)?,
178            configs: TableTransaction::new(configs)?,
179            settings: TableTransaction::new(settings)?,
180            source_references: TableTransaction::new(source_references)?,
181            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
182            system_configurations: TableTransaction::new(system_configurations)?,
183            default_privileges: TableTransaction::new(default_privileges)?,
184            system_privileges: TableTransaction::new(system_privileges)?,
185            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
186            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
187            // Uniqueness violations for this value occur at the key rather than
188            // the value (the key is the unit struct `()` so this is a singleton
189            // value).
190            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
191            audit_log_updates: Vec::new(),
192            upper,
193            op_id: 0,
194        })
195    }
196
197    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
198        let key = ItemKey { id: *id };
199        self.items
200            .get(&key)
201            .map(|v| DurableType::from_key_value(key, v.clone()))
202    }
203
204    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
205        self.items
206            .items()
207            .into_iter()
208            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
209            .sorted_by_key(|Item { id, .. }| *id)
210    }
211
212    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
213        self.insert_audit_log_events([event]);
214    }
215
216    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
217        let events = events
218            .into_iter()
219            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
220        self.audit_log_updates.extend(events);
221    }
222
223    pub fn insert_user_database(
224        &mut self,
225        database_name: &str,
226        owner_id: RoleId,
227        privileges: Vec<MzAclItem>,
228        temporary_oids: &HashSet<u32>,
229    ) -> Result<(DatabaseId, u32), CatalogError> {
230        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
231        let id = DatabaseId::User(id);
232        let oid = self.allocate_oid(temporary_oids)?;
233        self.insert_database(id, database_name, owner_id, privileges, oid)?;
234        Ok((id, oid))
235    }
236
237    pub(crate) fn insert_database(
238        &mut self,
239        id: DatabaseId,
240        database_name: &str,
241        owner_id: RoleId,
242        privileges: Vec<MzAclItem>,
243        oid: u32,
244    ) -> Result<u32, CatalogError> {
245        match self.databases.insert(
246            DatabaseKey { id },
247            DatabaseValue {
248                name: database_name.to_string(),
249                owner_id,
250                privileges,
251                oid,
252            },
253            self.op_id,
254        ) {
255            Ok(_) => Ok(oid),
256            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
257        }
258    }
259
260    pub fn insert_user_schema(
261        &mut self,
262        database_id: DatabaseId,
263        schema_name: &str,
264        owner_id: RoleId,
265        privileges: Vec<MzAclItem>,
266        temporary_oids: &HashSet<u32>,
267    ) -> Result<(SchemaId, u32), CatalogError> {
268        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
269        let id = SchemaId::User(id);
270        let oid = self.allocate_oid(temporary_oids)?;
271        self.insert_schema(
272            id,
273            Some(database_id),
274            schema_name.to_string(),
275            owner_id,
276            privileges,
277            oid,
278        )?;
279        Ok((id, oid))
280    }
281
282    pub fn insert_system_schema(
283        &mut self,
284        schema_id: u64,
285        schema_name: &str,
286        owner_id: RoleId,
287        privileges: Vec<MzAclItem>,
288        oid: u32,
289    ) -> Result<(), CatalogError> {
290        let id = SchemaId::System(schema_id);
291        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
292    }
293
294    pub(crate) fn insert_schema(
295        &mut self,
296        schema_id: SchemaId,
297        database_id: Option<DatabaseId>,
298        schema_name: String,
299        owner_id: RoleId,
300        privileges: Vec<MzAclItem>,
301        oid: u32,
302    ) -> Result<(), CatalogError> {
303        match self.schemas.insert(
304            SchemaKey { id: schema_id },
305            SchemaValue {
306                database_id,
307                name: schema_name.clone(),
308                owner_id,
309                privileges,
310                oid,
311            },
312            self.op_id,
313        ) {
314            Ok(_) => Ok(()),
315            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
316        }
317    }
318
319    pub fn insert_builtin_role(
320        &mut self,
321        id: RoleId,
322        name: String,
323        attributes: RoleAttributes,
324        membership: RoleMembership,
325        vars: RoleVars,
326        oid: u32,
327    ) -> Result<RoleId, CatalogError> {
328        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
329        self.insert_role(id, name, attributes, membership, vars, oid)?;
330        Ok(id)
331    }
332
333    pub fn insert_user_role(
334        &mut self,
335        name: String,
336        attributes: RoleAttributes,
337        membership: RoleMembership,
338        vars: RoleVars,
339        temporary_oids: &HashSet<u32>,
340    ) -> Result<(RoleId, u32), CatalogError> {
341        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
342        let id = RoleId::User(id);
343        let oid = self.allocate_oid(temporary_oids)?;
344        self.insert_role(id, name, attributes, membership, vars, oid)?;
345        Ok((id, oid))
346    }
347
348    fn insert_role(
349        &mut self,
350        id: RoleId,
351        name: String,
352        attributes: RoleAttributes,
353        membership: RoleMembership,
354        vars: RoleVars,
355        oid: u32,
356    ) -> Result<(), CatalogError> {
357        if let Some(ref password) = attributes.password {
358            let hash =
359                mz_auth::hash::scram256_hash(password).expect("password hash should be valid");
360            match self.role_auth.insert(
361                RoleAuthKey { role_id: id },
362                RoleAuthValue {
363                    password_hash: Some(hash),
364                    updated_at: SYSTEM_TIME(),
365                },
366                self.op_id,
367            ) {
368                Ok(_) => {}
369                Err(_) => {
370                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
371                }
372            }
373        }
374
375        match self.roles.insert(
376            RoleKey { id },
377            RoleValue {
378                name: name.clone(),
379                attributes,
380                membership,
381                vars,
382                oid,
383            },
384            self.op_id,
385        ) {
386            Ok(_) => Ok(()),
387            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
388        }
389    }
390
391    /// Panics if any introspection source id is not a system id
392    pub fn insert_user_cluster(
393        &mut self,
394        cluster_id: ClusterId,
395        cluster_name: &str,
396        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
397        owner_id: RoleId,
398        privileges: Vec<MzAclItem>,
399        config: ClusterConfig,
400        temporary_oids: &HashSet<u32>,
401    ) -> Result<(), CatalogError> {
402        self.insert_cluster(
403            cluster_id,
404            cluster_name,
405            introspection_source_indexes,
406            owner_id,
407            privileges,
408            config,
409            temporary_oids,
410        )
411    }
412
413    /// Panics if any introspection source id is not a system id
414    pub fn insert_system_cluster(
415        &mut self,
416        cluster_name: &str,
417        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
418        privileges: Vec<MzAclItem>,
419        owner_id: RoleId,
420        config: ClusterConfig,
421        temporary_oids: &HashSet<u32>,
422    ) -> Result<(), CatalogError> {
423        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
424        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
425        self.insert_cluster(
426            cluster_id,
427            cluster_name,
428            introspection_source_indexes,
429            owner_id,
430            privileges,
431            config,
432            temporary_oids,
433        )
434    }
435
436    fn insert_cluster(
437        &mut self,
438        cluster_id: ClusterId,
439        cluster_name: &str,
440        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
441        owner_id: RoleId,
442        privileges: Vec<MzAclItem>,
443        config: ClusterConfig,
444        temporary_oids: &HashSet<u32>,
445    ) -> Result<(), CatalogError> {
446        if let Err(_) = self.clusters.insert(
447            ClusterKey { id: cluster_id },
448            ClusterValue {
449                name: cluster_name.to_string(),
450                owner_id,
451                privileges,
452                config,
453            },
454            self.op_id,
455        ) {
456            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
457        };
458
459        let amount = usize_to_u64(introspection_source_indexes.len());
460        let oids = self.allocate_oids(amount, temporary_oids)?;
461        let introspection_source_indexes: Vec<_> = introspection_source_indexes
462            .into_iter()
463            .zip(oids)
464            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
465            .collect();
466        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
467            let introspection_source_index = IntrospectionSourceIndex {
468                cluster_id,
469                name: builtin.name.to_string(),
470                item_id,
471                index_id,
472                oid,
473            };
474            let (key, value) = introspection_source_index.into_key_value();
475            self.introspection_sources
476                .insert(key, value, self.op_id)
477                .expect("no uniqueness violation");
478        }
479
480        Ok(())
481    }
482
483    pub fn rename_cluster(
484        &mut self,
485        cluster_id: ClusterId,
486        cluster_name: &str,
487        cluster_to_name: &str,
488    ) -> Result<(), CatalogError> {
489        let key = ClusterKey { id: cluster_id };
490
491        match self.clusters.update(
492            |k, v| {
493                if *k == key {
494                    let mut value = v.clone();
495                    value.name = cluster_to_name.to_string();
496                    Some(value)
497                } else {
498                    None
499                }
500            },
501            self.op_id,
502        )? {
503            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
504            Diff::ONE => Ok(()),
505            n => panic!(
506                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
507            ),
508        }
509    }
510
511    pub fn rename_cluster_replica(
512        &mut self,
513        replica_id: ReplicaId,
514        replica_name: &QualifiedReplica,
515        replica_to_name: &str,
516    ) -> Result<(), CatalogError> {
517        let key = ClusterReplicaKey { id: replica_id };
518
519        match self.cluster_replicas.update(
520            |k, v| {
521                if *k == key {
522                    let mut value = v.clone();
523                    value.name = replica_to_name.to_string();
524                    Some(value)
525                } else {
526                    None
527                }
528            },
529            self.op_id,
530        )? {
531            Diff::ZERO => {
532                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
533            }
534            Diff::ONE => Ok(()),
535            n => panic!(
536                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
537            ),
538        }
539    }
540
541    pub fn insert_cluster_replica(
542        &mut self,
543        cluster_id: ClusterId,
544        replica_name: &str,
545        config: ReplicaConfig,
546        owner_id: RoleId,
547    ) -> Result<ReplicaId, CatalogError> {
548        let replica_id = match cluster_id {
549            ClusterId::System(_) => self.allocate_system_replica_id()?,
550            ClusterId::User(_) => self.allocate_user_replica_id()?,
551        };
552        self.insert_cluster_replica_with_id(
553            cluster_id,
554            replica_id,
555            replica_name,
556            config,
557            owner_id,
558        )?;
559        Ok(replica_id)
560    }
561
562    pub(crate) fn insert_cluster_replica_with_id(
563        &mut self,
564        cluster_id: ClusterId,
565        replica_id: ReplicaId,
566        replica_name: &str,
567        config: ReplicaConfig,
568        owner_id: RoleId,
569    ) -> Result<(), CatalogError> {
570        if let Err(_) = self.cluster_replicas.insert(
571            ClusterReplicaKey { id: replica_id },
572            ClusterReplicaValue {
573                cluster_id,
574                name: replica_name.into(),
575                config,
576                owner_id,
577            },
578            self.op_id,
579        ) {
580            let cluster = self
581                .clusters
582                .get(&ClusterKey { id: cluster_id })
583                .expect("cluster exists");
584            return Err(SqlCatalogError::DuplicateReplica(
585                replica_name.to_string(),
586                cluster.name.to_string(),
587            )
588            .into());
589        };
590        Ok(())
591    }
592
593    pub fn insert_user_network_policy(
594        &mut self,
595        name: String,
596        rules: Vec<NetworkPolicyRule>,
597        privileges: Vec<MzAclItem>,
598        owner_id: RoleId,
599        temporary_oids: &HashSet<u32>,
600    ) -> Result<NetworkPolicyId, CatalogError> {
601        let oid = self.allocate_oid(temporary_oids)?;
602        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
603        let id = NetworkPolicyId::User(id);
604        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
605    }
606
607    pub fn insert_network_policy(
608        &mut self,
609        id: NetworkPolicyId,
610        name: String,
611        rules: Vec<NetworkPolicyRule>,
612        privileges: Vec<MzAclItem>,
613        owner_id: RoleId,
614        oid: u32,
615    ) -> Result<NetworkPolicyId, CatalogError> {
616        match self.network_policies.insert(
617            NetworkPolicyKey { id },
618            NetworkPolicyValue {
619                name: name.clone(),
620                rules,
621                privileges,
622                owner_id,
623                oid,
624            },
625            self.op_id,
626        ) {
627            Ok(_) => Ok(id),
628            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
629        }
630    }
631
632    /// Updates persisted information about persisted introspection source
633    /// indexes.
634    ///
635    /// Panics if provided id is not a system id.
636    pub fn update_introspection_source_index_gids(
637        &mut self,
638        mappings: impl Iterator<
639            Item = (
640                ClusterId,
641                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
642            ),
643        >,
644    ) -> Result<(), CatalogError> {
645        for (cluster_id, updates) in mappings {
646            for (name, item_id, index_id, oid) in updates {
647                let introspection_source_index = IntrospectionSourceIndex {
648                    cluster_id,
649                    name,
650                    item_id,
651                    index_id,
652                    oid,
653                };
654                let (key, value) = introspection_source_index.into_key_value();
655
656                let prev = self
657                    .introspection_sources
658                    .set(key, Some(value), self.op_id)?;
659                if prev.is_none() {
660                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
661                        "{index_id}"
662                    ))
663                    .into());
664                }
665            }
666        }
667        Ok(())
668    }
669
670    pub fn insert_user_item(
671        &mut self,
672        id: CatalogItemId,
673        global_id: GlobalId,
674        schema_id: SchemaId,
675        item_name: &str,
676        create_sql: String,
677        owner_id: RoleId,
678        privileges: Vec<MzAclItem>,
679        temporary_oids: &HashSet<u32>,
680        versions: BTreeMap<RelationVersion, GlobalId>,
681    ) -> Result<u32, CatalogError> {
682        let oid = self.allocate_oid(temporary_oids)?;
683        self.insert_item(
684            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
685        )?;
686        Ok(oid)
687    }
688
689    pub fn insert_item(
690        &mut self,
691        id: CatalogItemId,
692        oid: u32,
693        global_id: GlobalId,
694        schema_id: SchemaId,
695        item_name: &str,
696        create_sql: String,
697        owner_id: RoleId,
698        privileges: Vec<MzAclItem>,
699        extra_versions: BTreeMap<RelationVersion, GlobalId>,
700    ) -> Result<(), CatalogError> {
701        match self.items.insert(
702            ItemKey { id },
703            ItemValue {
704                schema_id,
705                name: item_name.to_string(),
706                create_sql,
707                owner_id,
708                privileges,
709                oid,
710                global_id,
711                extra_versions,
712            },
713            self.op_id,
714        ) {
715            Ok(_) => Ok(()),
716            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
717        }
718    }
719
720    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
721        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
722    }
723
724    pub fn get_and_increment_id_by(
725        &mut self,
726        key: String,
727        amount: u64,
728    ) -> Result<Vec<u64>, CatalogError> {
729        assert!(
730            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
731            "system item IDs cannot be allocated outside of bootstrap"
732        );
733
734        let current_id = self
735            .id_allocator
736            .items()
737            .get(&IdAllocKey { name: key.clone() })
738            .unwrap_or_else(|| panic!("{key} id allocator missing"))
739            .next_id;
740        let next_id = current_id
741            .checked_add(amount)
742            .ok_or(SqlCatalogError::IdExhaustion)?;
743        let prev = self.id_allocator.set(
744            IdAllocKey { name: key },
745            Some(IdAllocValue { next_id }),
746            self.op_id,
747        )?;
748        assert_eq!(
749            prev,
750            Some(IdAllocValue {
751                next_id: current_id
752            })
753        );
754        Ok((current_id..next_id).collect())
755    }
756
757    pub fn allocate_system_item_ids(
758        &mut self,
759        amount: u64,
760    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
761        assert!(
762            !self.durable_catalog.is_bootstrap_complete(),
763            "we can only allocate system item IDs during bootstrap"
764        );
765        Ok(self
766            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
767            .into_iter()
768            // TODO(alter_table): Use separate ID allocators.
769            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
770            .collect())
771    }
772
773    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
774    /// from the `cluster_id` and `log_variant`.
775    ///
776    /// Introspection source indexes are a special edge case of items. They are considered system
777    /// items, but they are the only system item that can be created by the user at any time. All
778    /// other system items can only be created by the system during the startup of an upgrade.
779    ///
780    /// Furthermore, all other system item IDs are allocated deterministically in the same order
781    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
782    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
783    /// only one of them can successfully write the IDs down to the catalog. This removes the need
784    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
785    ///
786    /// Since introspection IDs can be allocated at any time, read-only instances would either need
787    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
788    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
789    ///
790    /// Introspection source index IDs are 64 bit integers, with the following format (not to
791    /// scale):
792    ///
793    /// -------------------------------------------------------------
794    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
795    /// |--------------------|------------------------|-------------|
796    /// |       8-bits       |         48-bits        |   8-bits    |
797    /// -------------------------------------------------------------
798    ///
799    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
800    ///                          to.
801    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
802    ///                          belongs to.
803    /// Log Variant:             A unique number indicating the log variant this index is on.
804    pub fn allocate_introspection_source_index_id(
805        cluster_id: &ClusterId,
806        log_variant: LogVariant,
807    ) -> (CatalogItemId, GlobalId) {
808        let cluster_variant: u8 = match cluster_id {
809            ClusterId::System(_) => 1,
810            ClusterId::User(_) => 2,
811        };
812        let cluster_id: u64 = cluster_id.inner_id();
813        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
814        assert_eq!(
815            CLUSTER_ID_MASK & cluster_id,
816            0,
817            "invalid cluster ID: {cluster_id}"
818        );
819        let log_variant: u8 = match log_variant {
820            LogVariant::Timely(TimelyLog::Operates) => 1,
821            LogVariant::Timely(TimelyLog::Channels) => 2,
822            LogVariant::Timely(TimelyLog::Elapsed) => 3,
823            LogVariant::Timely(TimelyLog::Histogram) => 4,
824            LogVariant::Timely(TimelyLog::Addresses) => 5,
825            LogVariant::Timely(TimelyLog::Parks) => 6,
826            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
827            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
828            LogVariant::Timely(TimelyLog::Reachability) => 9,
829            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
830            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
831            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
832            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
833            LogVariant::Differential(DifferentialLog::Sharing) => 14,
834            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
835            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
836            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
837            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
838            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
839            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
840            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
841            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
842            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
843            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
844            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
845            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
846            LogVariant::Compute(ComputeLog::ShutdownDuration) => 27,
847            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
848            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
849            LogVariant::Compute(ComputeLog::LirMapping) => 30,
850            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
851        };
852
853        let mut id: u64 = u64::from(cluster_variant) << 56;
854        id |= cluster_id << 8;
855        id |= u64::from(log_variant);
856
857        (
858            CatalogItemId::IntrospectionSourceIndex(id),
859            GlobalId::IntrospectionSourceIndex(id),
860        )
861    }
862
863    pub fn allocate_user_item_ids(
864        &mut self,
865        amount: u64,
866    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
867        Ok(self
868            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
869            .into_iter()
870            // TODO(alter_table): Use separate ID allocators.
871            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
872            .collect())
873    }
874
875    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
876        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
877        Ok(ReplicaId::User(id))
878    }
879
880    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
881        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
882        Ok(ReplicaId::System(id))
883    }
884
885    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
886        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
887    }
888
889    pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
890        self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
891    }
892
893    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
894    /// object.
895    #[mz_ore::instrument]
896    fn allocate_oids(
897        &mut self,
898        amount: u64,
899        temporary_oids: &HashSet<u32>,
900    ) -> Result<Vec<u32>, CatalogError> {
901        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
902        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
903        struct UserOid(u32);
904
905        impl UserOid {
906            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
907                if oid < FIRST_USER_OID {
908                    Err(anyhow!("invalid user OID {oid}"))
909                } else {
910                    Ok(UserOid(oid))
911                }
912            }
913        }
914
915        impl std::ops::AddAssign<u32> for UserOid {
916            fn add_assign(&mut self, rhs: u32) {
917                let (res, overflow) = self.0.overflowing_add(rhs);
918                self.0 = if overflow { FIRST_USER_OID + res } else { res };
919            }
920        }
921
922        if amount > u32::MAX.into() {
923            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
924        }
925
926        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
927        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
928        // However, benchmarking shows that this doesn't make a noticeable difference and the other
929        // approach requires making sure that allocator always stays in-sync which can be
930        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
931        let mut allocated_oids = HashSet::with_capacity(
932            self.databases.len()
933                + self.schemas.len()
934                + self.roles.len()
935                + self.items.len()
936                + self.introspection_sources.len()
937                + temporary_oids.len(),
938        );
939        self.databases.for_values(|_, value| {
940            allocated_oids.insert(value.oid);
941        });
942        self.schemas.for_values(|_, value| {
943            allocated_oids.insert(value.oid);
944        });
945        self.roles.for_values(|_, value| {
946            allocated_oids.insert(value.oid);
947        });
948        self.items.for_values(|_, value| {
949            allocated_oids.insert(value.oid);
950        });
951        self.introspection_sources.for_values(|_, value| {
952            allocated_oids.insert(value.oid);
953        });
954
955        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
956
957        let start_oid: u32 = self
958            .id_allocator
959            .items()
960            .get(&IdAllocKey {
961                name: OID_ALLOC_KEY.to_string(),
962            })
963            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
964            .next_id
965            .try_into()
966            .expect("we should never persist an oid outside of the u32 range");
967        let mut current_oid = UserOid::new(start_oid)
968            .expect("we should never persist an oid outside of user OID range");
969        let mut oids = Vec::new();
970        while oids.len() < u64_to_usize(amount) {
971            if !is_allocated(current_oid.0) {
972                oids.push(current_oid.0);
973            }
974            current_oid += 1;
975
976            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
977                // We've exhausted all possible OIDs and still don't have `amount`.
978                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
979            }
980        }
981
982        let next_id = current_oid.0;
983        let prev = self.id_allocator.set(
984            IdAllocKey {
985                name: OID_ALLOC_KEY.to_string(),
986            },
987            Some(IdAllocValue {
988                next_id: next_id.into(),
989            }),
990            self.op_id,
991        )?;
992        assert_eq!(
993            prev,
994            Some(IdAllocValue {
995                next_id: start_oid.into(),
996            })
997        );
998
999        Ok(oids)
1000    }
1001
1002    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1003    /// object.
1004    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1005        self.allocate_oids(1, temporary_oids)
1006            .map(|oids| oids.into_element())
1007    }
1008
1009    pub(crate) fn insert_id_allocator(
1010        &mut self,
1011        name: String,
1012        next_id: u64,
1013    ) -> Result<(), CatalogError> {
1014        match self.id_allocator.insert(
1015            IdAllocKey { name: name.clone() },
1016            IdAllocValue { next_id },
1017            self.op_id,
1018        ) {
1019            Ok(_) => Ok(()),
1020            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1021        }
1022    }
1023
1024    /// Removes the database `id` from the transaction.
1025    ///
1026    /// Returns an error if `id` is not found.
1027    ///
1028    /// Runtime is linear with respect to the total number of databases in the catalog.
1029    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1030    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1031        let prev = self
1032            .databases
1033            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1034        if prev.is_some() {
1035            Ok(())
1036        } else {
1037            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1038        }
1039    }
1040
1041    /// Removes all databases in `databases` from the transaction.
1042    ///
1043    /// Returns an error if any id in `databases` is not found.
1044    ///
1045    /// NOTE: On error, there still may be some databases removed from the transaction. It
1046    /// is up to the caller to either abort the transaction or commit.
1047    pub fn remove_databases(
1048        &mut self,
1049        databases: &BTreeSet<DatabaseId>,
1050    ) -> Result<(), CatalogError> {
1051        if databases.is_empty() {
1052            return Ok(());
1053        }
1054
1055        let to_remove = databases
1056            .iter()
1057            .map(|id| (DatabaseKey { id: *id }, None))
1058            .collect();
1059        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1060        prev.retain(|_k, val| val.is_none());
1061
1062        if !prev.is_empty() {
1063            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1064            return Err(SqlCatalogError::UnknownDatabase(err).into());
1065        }
1066
1067        Ok(())
1068    }
1069
1070    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1071    ///
1072    /// Returns an error if `(database_id, schema_id)` is not found.
1073    ///
1074    /// Runtime is linear with respect to the total number of schemas in the catalog.
1075    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1076    pub fn remove_schema(
1077        &mut self,
1078        database_id: &Option<DatabaseId>,
1079        schema_id: &SchemaId,
1080    ) -> Result<(), CatalogError> {
1081        let prev = self
1082            .schemas
1083            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1084        if prev.is_some() {
1085            Ok(())
1086        } else {
1087            let database_name = match database_id {
1088                Some(id) => format!("{id}."),
1089                None => "".to_string(),
1090            };
1091            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1092        }
1093    }
1094
1095    /// Removes all schemas in `schemas` from the transaction.
1096    ///
1097    /// Returns an error if any id in `schemas` is not found.
1098    ///
1099    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1100    /// is up to the caller to either abort the transaction or commit.
1101    pub fn remove_schemas(
1102        &mut self,
1103        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1104    ) -> Result<(), CatalogError> {
1105        if schemas.is_empty() {
1106            return Ok(());
1107        }
1108
1109        let to_remove = schemas
1110            .iter()
1111            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1112            .collect();
1113        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1114        prev.retain(|_k, v| v.is_none());
1115
1116        if !prev.is_empty() {
1117            let err = prev
1118                .keys()
1119                .map(|k| {
1120                    let db_spec = schemas.get(&k.id).expect("should_exist");
1121                    let db_name = match db_spec {
1122                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1123                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1124                    };
1125                    format!("{}.{}", db_name, k.id)
1126                })
1127                .join(", ");
1128
1129            return Err(SqlCatalogError::UnknownSchema(err).into());
1130        }
1131
1132        Ok(())
1133    }
1134
1135    pub fn remove_source_references(
1136        &mut self,
1137        source_id: CatalogItemId,
1138    ) -> Result<(), CatalogError> {
1139        let deleted = self
1140            .source_references
1141            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1142            .is_some();
1143        if deleted {
1144            Ok(())
1145        } else {
1146            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1147        }
1148    }
1149
1150    /// Removes all user roles in `roles` from the transaction.
1151    ///
1152    /// Returns an error if any id in `roles` is not found.
1153    ///
1154    /// NOTE: On error, there still may be some roles removed from the transaction. It
1155    /// is up to the caller to either abort the transaction or commit.
1156    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1157        assert!(
1158            roles.iter().all(|id| id.is_user()),
1159            "cannot delete non-user roles"
1160        );
1161        self.remove_roles(roles)
1162    }
1163
1164    /// Removes all roles in `roles` from the transaction.
1165    ///
1166    /// Returns an error if any id in `roles` is not found.
1167    ///
1168    /// NOTE: On error, there still may be some roles removed from the transaction. It
1169    /// is up to the caller to either abort the transaction or commit.
1170    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1171        if roles.is_empty() {
1172            return Ok(());
1173        }
1174
1175        let to_remove_keys = roles
1176            .iter()
1177            .map(|role_id| RoleKey { id: *role_id })
1178            .collect::<Vec<_>>();
1179
1180        let to_remove_roles = to_remove_keys
1181            .iter()
1182            .map(|role_key| (role_key.clone(), None))
1183            .collect();
1184
1185        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1186
1187        let to_remove_role_auth = to_remove_keys
1188            .iter()
1189            .map(|role_key| {
1190                (
1191                    RoleAuthKey {
1192                        role_id: role_key.id,
1193                    },
1194                    None,
1195                )
1196            })
1197            .collect();
1198
1199        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1200
1201        prev.retain(|_k, v| v.is_none());
1202        if !prev.is_empty() {
1203            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1204            return Err(SqlCatalogError::UnknownRole(err).into());
1205        }
1206
1207        role_auth_prev.retain(|_k, v| v.is_none());
1208        // The reason we don't to the same check as above is that the role auth table
1209        // is not required to have all roles in the role table.
1210
1211        Ok(())
1212    }
1213
1214    /// Removes all cluster in `clusters` from the transaction.
1215    ///
1216    /// Returns an error if any id in `clusters` is not found.
1217    ///
1218    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1219    /// the caller to either abort the transaction or commit.
1220    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1221        if clusters.is_empty() {
1222            return Ok(());
1223        }
1224
1225        let to_remove = clusters
1226            .iter()
1227            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1228            .collect();
1229        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1230
1231        prev.retain(|_k, v| v.is_none());
1232        if !prev.is_empty() {
1233            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1234            return Err(SqlCatalogError::UnknownCluster(err).into());
1235        }
1236
1237        // Cascade delete introspection sources and cluster replicas.
1238        //
1239        // TODO(benesch): this doesn't seem right. Cascade deletions should
1240        // be entirely the domain of the higher catalog layer, not the
1241        // storage layer.
1242        self.cluster_replicas
1243            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1244        self.introspection_sources
1245            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1246
1247        Ok(())
1248    }
1249
1250    /// Removes the cluster replica `id` from the transaction.
1251    ///
1252    /// Returns an error if `id` is not found.
1253    ///
1254    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1255    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1256    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1257        let deleted = self
1258            .cluster_replicas
1259            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1260            .is_some();
1261        if deleted {
1262            Ok(())
1263        } else {
1264            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1265        }
1266    }
1267
1268    /// Removes all cluster replicas in `replicas` from the transaction.
1269    ///
1270    /// Returns an error if any id in `replicas` is not found.
1271    ///
1272    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1273    /// is up to the caller to either abort the transaction or commit.
1274    pub fn remove_cluster_replicas(
1275        &mut self,
1276        replicas: &BTreeSet<ReplicaId>,
1277    ) -> Result<(), CatalogError> {
1278        if replicas.is_empty() {
1279            return Ok(());
1280        }
1281
1282        let to_remove = replicas
1283            .iter()
1284            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1285            .collect();
1286        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1287
1288        prev.retain(|_k, v| v.is_none());
1289        if !prev.is_empty() {
1290            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1291            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1292        }
1293
1294        Ok(())
1295    }
1296
1297    /// Removes item `id` from the transaction.
1298    ///
1299    /// Returns an error if `id` is not found.
1300    ///
1301    /// Runtime is linear with respect to the total number of items in the catalog.
1302    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1303    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1304        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1305        if prev.is_some() {
1306            Ok(())
1307        } else {
1308            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1309        }
1310    }
1311
1312    /// Removes all items in `ids` from the transaction.
1313    ///
1314    /// Returns an error if any id in `ids` is not found.
1315    ///
1316    /// NOTE: On error, there still may be some items removed from the transaction. It is
1317    /// up to the caller to either abort the transaction or commit.
1318    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1319        if ids.is_empty() {
1320            return Ok(());
1321        }
1322
1323        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1324        let n = self.items.delete_by_keys(ks, self.op_id).len();
1325        if n == ids.len() {
1326            Ok(())
1327        } else {
1328            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1329            let mut unknown = ids.difference(&item_ids);
1330            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1331        }
1332    }
1333
1334    /// Removes all system object mappings in `descriptions` from the transaction.
1335    ///
1336    /// Returns an error if any description in `descriptions` is not found.
1337    ///
1338    /// NOTE: On error, there still may be some items removed from the transaction. It is
1339    /// up to the caller to either abort the transaction or commit.
1340    pub fn remove_system_object_mappings(
1341        &mut self,
1342        descriptions: BTreeSet<SystemObjectDescription>,
1343    ) -> Result<(), CatalogError> {
1344        if descriptions.is_empty() {
1345            return Ok(());
1346        }
1347
1348        let ks: Vec<_> = descriptions
1349            .clone()
1350            .into_iter()
1351            .map(|desc| GidMappingKey {
1352                schema_name: desc.schema_name,
1353                object_type: desc.object_type,
1354                object_name: desc.object_name,
1355            })
1356            .collect();
1357        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1358
1359        if n == descriptions.len() {
1360            Ok(())
1361        } else {
1362            let item_descriptions = self
1363                .system_gid_mapping
1364                .items()
1365                .keys()
1366                .map(|k| SystemObjectDescription {
1367                    schema_name: k.schema_name.clone(),
1368                    object_type: k.object_type.clone(),
1369                    object_name: k.object_name.clone(),
1370                })
1371                .collect();
1372            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1373                format!(
1374                    "{} {}.{}",
1375                    desc.object_type, desc.schema_name, desc.object_name
1376                )
1377            });
1378            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1379        }
1380    }
1381
1382    /// Removes all introspection source indexes in `indexes` from the transaction.
1383    ///
1384    /// Returns an error if any index in `indexes` is not found.
1385    ///
1386    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1387    /// up to the caller to either abort the transaction or commit.
1388    pub fn remove_introspection_source_indexes(
1389        &mut self,
1390        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1391    ) -> Result<(), CatalogError> {
1392        if introspection_source_indexes.is_empty() {
1393            return Ok(());
1394        }
1395
1396        let ks: Vec<_> = introspection_source_indexes
1397            .clone()
1398            .into_iter()
1399            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1400            .collect();
1401        let n = self
1402            .introspection_sources
1403            .delete_by_keys(ks, self.op_id)
1404            .len();
1405        if n == introspection_source_indexes.len() {
1406            Ok(())
1407        } else {
1408            let txn_indexes = self
1409                .introspection_sources
1410                .items()
1411                .keys()
1412                .map(|k| (k.cluster_id, k.name.clone()))
1413                .collect();
1414            let mut unknown = introspection_source_indexes
1415                .difference(&txn_indexes)
1416                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1417            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1418        }
1419    }
1420
1421    /// Updates item `id` in the transaction to `item_name` and `item`.
1422    ///
1423    /// Returns an error if `id` is not found.
1424    ///
1425    /// Runtime is linear with respect to the total number of items in the catalog.
1426    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1427    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1428        let updated =
1429            self.items
1430                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1431        if updated {
1432            Ok(())
1433        } else {
1434            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1435        }
1436    }
1437
1438    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1439    /// corresponding value in `items`.
1440    ///
1441    /// Returns an error if any id in `items` is not found.
1442    ///
1443    /// NOTE: On error, there still may be some items updated in the transaction. It is
1444    /// up to the caller to either abort the transaction or commit.
1445    pub fn update_items(
1446        &mut self,
1447        items: BTreeMap<CatalogItemId, Item>,
1448    ) -> Result<(), CatalogError> {
1449        if items.is_empty() {
1450            return Ok(());
1451        }
1452
1453        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1454        let kvs: Vec<_> = items
1455            .clone()
1456            .into_iter()
1457            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1458            .collect();
1459        let n = self.items.update_by_keys(kvs, self.op_id)?;
1460        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1461        if n == update_ids.len() {
1462            Ok(())
1463        } else {
1464            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1465            let mut unknown = update_ids.difference(&item_ids);
1466            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1467        }
1468    }
1469
1470    /// Updates role `id` in the transaction to `role`.
1471    ///
1472    /// Returns an error if `id` is not found.
1473    ///
1474    /// Runtime is linear with respect to the total number of items in the catalog.
1475    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1476    /// You should model it after [`Self::update_items`].
1477    pub fn update_role(&mut self, id: RoleId, role: Role) -> Result<(), CatalogError> {
1478        let key = RoleKey { id };
1479        if self.roles.get(&key).is_some() {
1480            let auth_key = RoleAuthKey { role_id: id };
1481
1482            if let Some(ref password) = role.attributes.password {
1483                let hash =
1484                    mz_auth::hash::scram256_hash(password).expect("password hash should be valid");
1485                let value = RoleAuthValue {
1486                    password_hash: Some(hash),
1487                    updated_at: SYSTEM_TIME(),
1488                };
1489
1490                if self.role_auth.get(&auth_key).is_some() {
1491                    self.role_auth
1492                        .update_by_key(auth_key.clone(), value, self.op_id)?;
1493                } else {
1494                    self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1495                }
1496            } else if self.role_auth.get(&auth_key).is_some() {
1497                // If the role is being updated to not have a password, we need to
1498                // remove the password hash from the role_auth catalog.
1499                let value = RoleAuthValue {
1500                    password_hash: None,
1501                    updated_at: SYSTEM_TIME(),
1502                };
1503
1504                self.role_auth
1505                    .update_by_key(auth_key.clone(), value, self.op_id)?;
1506            }
1507
1508            self.roles
1509                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1510
1511            Ok(())
1512        } else {
1513            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1514        }
1515    }
1516
1517    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1518    /// corresponding value in `roles`.
1519    ///
1520    /// This function does *not* write role_authentication information to the catalog.
1521    /// It is purely for updating the role itself.
1522    ///
1523    /// Returns an error if any id in `roles` is not found.
1524    ///
1525    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1526    /// up to the caller to either abort the transaction or commit.
1527    pub fn update_roles_without_auth(
1528        &mut self,
1529        roles: BTreeMap<RoleId, Role>,
1530    ) -> Result<(), CatalogError> {
1531        if roles.is_empty() {
1532            return Ok(());
1533        }
1534
1535        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1536        let kvs: Vec<_> = roles
1537            .into_iter()
1538            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1539            .collect();
1540        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1541        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1542
1543        if n == update_role_ids.len() {
1544            Ok(())
1545        } else {
1546            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1547            let mut unknown = update_role_ids.difference(&role_ids);
1548            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1549        }
1550    }
1551
1552    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1553    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1554    ///
1555    /// Panics if provided id is not a system id.
1556    pub fn update_system_object_mappings(
1557        &mut self,
1558        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1559    ) -> Result<(), CatalogError> {
1560        if mappings.is_empty() {
1561            return Ok(());
1562        }
1563
1564        let n = self.system_gid_mapping.update(
1565            |_k, v| {
1566                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1567                    let (_, new_value) = mapping.clone().into_key_value();
1568                    Some(new_value)
1569                } else {
1570                    None
1571                }
1572            },
1573            self.op_id,
1574        )?;
1575
1576        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1577            != mappings.len()
1578        {
1579            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1580            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1581        }
1582
1583        Ok(())
1584    }
1585
1586    /// Updates cluster `id` in the transaction to `cluster`.
1587    ///
1588    /// Returns an error if `id` is not found.
1589    ///
1590    /// Runtime is linear with respect to the total number of clusters in the catalog.
1591    /// DO NOT call this function in a loop.
1592    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1593        let updated = self.clusters.update_by_key(
1594            ClusterKey { id },
1595            cluster.into_key_value().1,
1596            self.op_id,
1597        )?;
1598        if updated {
1599            Ok(())
1600        } else {
1601            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1602        }
1603    }
1604
1605    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1606    ///
1607    /// Returns an error if `replica_id` is not found.
1608    ///
1609    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1610    /// DO NOT call this function in a loop.
1611    pub fn update_cluster_replica(
1612        &mut self,
1613        replica_id: ReplicaId,
1614        replica: ClusterReplica,
1615    ) -> Result<(), CatalogError> {
1616        let updated = self.cluster_replicas.update_by_key(
1617            ClusterReplicaKey { id: replica_id },
1618            replica.into_key_value().1,
1619            self.op_id,
1620        )?;
1621        if updated {
1622            Ok(())
1623        } else {
1624            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1625        }
1626    }
1627
1628    /// Updates database `id` in the transaction to `database`.
1629    ///
1630    /// Returns an error if `id` is not found.
1631    ///
1632    /// Runtime is linear with respect to the total number of databases in the catalog.
1633    /// DO NOT call this function in a loop.
1634    pub fn update_database(
1635        &mut self,
1636        id: DatabaseId,
1637        database: Database,
1638    ) -> Result<(), CatalogError> {
1639        let updated = self.databases.update_by_key(
1640            DatabaseKey { id },
1641            database.into_key_value().1,
1642            self.op_id,
1643        )?;
1644        if updated {
1645            Ok(())
1646        } else {
1647            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1648        }
1649    }
1650
1651    /// Updates schema `schema_id` in the transaction to `schema`.
1652    ///
1653    /// Returns an error if `schema_id` is not found.
1654    ///
1655    /// Runtime is linear with respect to the total number of schemas in the catalog.
1656    /// DO NOT call this function in a loop.
1657    pub fn update_schema(
1658        &mut self,
1659        schema_id: SchemaId,
1660        schema: Schema,
1661    ) -> Result<(), CatalogError> {
1662        let updated = self.schemas.update_by_key(
1663            SchemaKey { id: schema_id },
1664            schema.into_key_value().1,
1665            self.op_id,
1666        )?;
1667        if updated {
1668            Ok(())
1669        } else {
1670            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1671        }
1672    }
1673
1674    /// Updates `network_policy_id` in the transaction to `network policy`.
1675    ///
1676    /// Returns an error if `id` is not found.
1677    ///
1678    /// Runtime is linear with respect to the total number of databases in the catalog.
1679    /// DO NOT call this function in a loop.
1680    pub fn update_network_policy(
1681        &mut self,
1682        id: NetworkPolicyId,
1683        network_policy: NetworkPolicy,
1684    ) -> Result<(), CatalogError> {
1685        let updated = self.network_policies.update_by_key(
1686            NetworkPolicyKey { id },
1687            network_policy.into_key_value().1,
1688            self.op_id,
1689        )?;
1690        if updated {
1691            Ok(())
1692        } else {
1693            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1694        }
1695    }
1696    /// Removes all network policies in `network policies` from the transaction.
1697    ///
1698    /// Returns an error if any id in `network policy` is not found.
1699    ///
1700    /// NOTE: On error, there still may be some roles removed from the transaction. It
1701    /// is up to the caller to either abort the transaction or commit.
1702    pub fn remove_network_policies(
1703        &mut self,
1704        network_policies: &BTreeSet<NetworkPolicyId>,
1705    ) -> Result<(), CatalogError> {
1706        if network_policies.is_empty() {
1707            return Ok(());
1708        }
1709
1710        let to_remove = network_policies
1711            .iter()
1712            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1713            .collect();
1714        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1715        assert!(
1716            prev.iter().all(|(k, _)| k.id.is_user()),
1717            "cannot delete non-user network policy"
1718        );
1719
1720        prev.retain(|_k, v| v.is_none());
1721        if !prev.is_empty() {
1722            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1723            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1724        }
1725
1726        Ok(())
1727    }
1728    /// Set persisted default privilege.
1729    ///
1730    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1731    pub fn set_default_privilege(
1732        &mut self,
1733        role_id: RoleId,
1734        database_id: Option<DatabaseId>,
1735        schema_id: Option<SchemaId>,
1736        object_type: ObjectType,
1737        grantee: RoleId,
1738        privileges: Option<AclMode>,
1739    ) -> Result<(), CatalogError> {
1740        self.default_privileges.set(
1741            DefaultPrivilegesKey {
1742                role_id,
1743                database_id,
1744                schema_id,
1745                object_type,
1746                grantee,
1747            },
1748            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1749            self.op_id,
1750        )?;
1751        Ok(())
1752    }
1753
1754    /// Set persisted default privileges.
1755    pub fn set_default_privileges(
1756        &mut self,
1757        default_privileges: Vec<DefaultPrivilege>,
1758    ) -> Result<(), CatalogError> {
1759        if default_privileges.is_empty() {
1760            return Ok(());
1761        }
1762
1763        let default_privileges = default_privileges
1764            .into_iter()
1765            .map(DurableType::into_key_value)
1766            .map(|(k, v)| (k, Some(v)))
1767            .collect();
1768        self.default_privileges
1769            .set_many(default_privileges, self.op_id)?;
1770        Ok(())
1771    }
1772
1773    /// Set persisted system privilege.
1774    ///
1775    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1776    pub fn set_system_privilege(
1777        &mut self,
1778        grantee: RoleId,
1779        grantor: RoleId,
1780        acl_mode: Option<AclMode>,
1781    ) -> Result<(), CatalogError> {
1782        self.system_privileges.set(
1783            SystemPrivilegesKey { grantee, grantor },
1784            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1785            self.op_id,
1786        )?;
1787        Ok(())
1788    }
1789
1790    /// Set persisted system privileges.
1791    pub fn set_system_privileges(
1792        &mut self,
1793        system_privileges: Vec<MzAclItem>,
1794    ) -> Result<(), CatalogError> {
1795        if system_privileges.is_empty() {
1796            return Ok(());
1797        }
1798
1799        let system_privileges = system_privileges
1800            .into_iter()
1801            .map(DurableType::into_key_value)
1802            .map(|(k, v)| (k, Some(v)))
1803            .collect();
1804        self.system_privileges
1805            .set_many(system_privileges, self.op_id)?;
1806        Ok(())
1807    }
1808
1809    /// Set persisted setting.
1810    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1811        self.settings.set(
1812            SettingKey { name },
1813            value.map(|value| SettingValue { value }),
1814            self.op_id,
1815        )?;
1816        Ok(())
1817    }
1818
1819    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1820        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1821    }
1822
1823    /// Insert persisted introspection source index.
1824    pub fn insert_introspection_source_indexes(
1825        &mut self,
1826        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1827        temporary_oids: &HashSet<u32>,
1828    ) -> Result<(), CatalogError> {
1829        if introspection_source_indexes.is_empty() {
1830            return Ok(());
1831        }
1832
1833        let amount = usize_to_u64(introspection_source_indexes.len());
1834        let oids = self.allocate_oids(amount, temporary_oids)?;
1835        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1836            .into_iter()
1837            .zip(oids)
1838            .map(
1839                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1840                    cluster_id,
1841                    name,
1842                    item_id,
1843                    index_id,
1844                    oid,
1845                },
1846            )
1847            .collect();
1848
1849        for introspection_source_index in introspection_source_indexes {
1850            let (key, value) = introspection_source_index.into_key_value();
1851            self.introspection_sources.insert(key, value, self.op_id)?;
1852        }
1853
1854        Ok(())
1855    }
1856
1857    /// Set persisted system object mappings.
1858    pub fn set_system_object_mappings(
1859        &mut self,
1860        mappings: Vec<SystemObjectMapping>,
1861    ) -> Result<(), CatalogError> {
1862        if mappings.is_empty() {
1863            return Ok(());
1864        }
1865
1866        let mappings = mappings
1867            .into_iter()
1868            .map(DurableType::into_key_value)
1869            .map(|(k, v)| (k, Some(v)))
1870            .collect();
1871        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1872        Ok(())
1873    }
1874
1875    /// Set persisted replica.
1876    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1877        if replicas.is_empty() {
1878            return Ok(());
1879        }
1880
1881        let replicas = replicas
1882            .into_iter()
1883            .map(DurableType::into_key_value)
1884            .map(|(k, v)| (k, Some(v)))
1885            .collect();
1886        self.cluster_replicas.set_many(replicas, self.op_id)?;
1887        Ok(())
1888    }
1889
1890    /// Set persisted configuration.
1891    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1892        match value {
1893            Some(value) => {
1894                let config = Config { key, value };
1895                let (key, value) = config.into_key_value();
1896                self.configs.set(key, Some(value), self.op_id)?;
1897            }
1898            None => {
1899                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1900            }
1901        }
1902        Ok(())
1903    }
1904
1905    /// Get the value of a persisted config.
1906    pub fn get_config(&self, key: String) -> Option<u64> {
1907        self.configs
1908            .get(&ConfigKey { key })
1909            .map(|entry| entry.value)
1910    }
1911
1912    /// Get the value of a persisted setting.
1913    fn get_setting(&self, name: String) -> Option<&str> {
1914        self.settings
1915            .get(&SettingKey { name })
1916            .map(|entry| &*entry.value)
1917    }
1918
1919    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1920        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1921            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1922    }
1923
1924    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1925        self.set_setting(
1926            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1927            Some(shard_id.to_string()),
1928        )
1929    }
1930
1931    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1932        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1933            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1934    }
1935
1936    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1937        self.set_setting(
1938            EXPRESSION_CACHE_SHARD_KEY.to_string(),
1939            Some(shard_id.to_string()),
1940        )
1941    }
1942
1943    /// Updates the catalog `enable_0dt_deployment` "config" value to
1944    /// match the `enable_0dt_deployment` "system var" value.
1945    ///
1946    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1947    /// but use it in boot before Launch Darkly is available.
1948    pub fn set_enable_0dt_deployment(&mut self, value: bool) -> Result<(), CatalogError> {
1949        self.set_config(ENABLE_0DT_DEPLOYMENT.into(), Some(u64::from(value)))
1950    }
1951
1952    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
1953    /// match the `with_0dt_deployment_max_wait` "system var" value.
1954    ///
1955    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1956    /// but use it in boot before Launch Darkly is available.
1957    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1958        self.set_config(
1959            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1960            Some(
1961                value
1962                    .as_millis()
1963                    .try_into()
1964                    .expect("max wait fits into u64"),
1965            ),
1966        )
1967    }
1968
1969    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
1970    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
1971    /// value.
1972    ///
1973    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1974    /// but use it in boot before Launch Darkly is available.
1975    pub fn set_0dt_deployment_ddl_check_interval(
1976        &mut self,
1977        value: Duration,
1978    ) -> Result<(), CatalogError> {
1979        self.set_config(
1980            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
1981            Some(
1982                value
1983                    .as_millis()
1984                    .try_into()
1985                    .expect("ddl check interval fits into u64"),
1986            ),
1987        )
1988    }
1989
1990    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
1991    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
1992    ///
1993    /// These are mirrored so that we can toggle the flag with Launch Darkly,
1994    /// but use it in boot before Launch Darkly is available.
1995    pub fn set_enable_0dt_deployment_panic_after_timeout(
1996        &mut self,
1997        value: bool,
1998    ) -> Result<(), CatalogError> {
1999        self.set_config(
2000            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2001            Some(u64::from(value)),
2002        )
2003    }
2004
2005    /// Removes the catalog `enable_0dt_deployment` "config" value to
2006    /// match the `enable_0dt_deployment` "system var" value.
2007    ///
2008    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2009    /// but use it in boot before LaunchDarkly is available.
2010    pub fn reset_enable_0dt_deployment(&mut self) -> Result<(), CatalogError> {
2011        self.set_config(ENABLE_0DT_DEPLOYMENT.into(), None)
2012    }
2013
2014    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2015    /// match the `with_0dt_deployment_max_wait` "system var" value.
2016    ///
2017    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2018    /// but use it in boot before LaunchDarkly is available.
2019    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2020        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2021    }
2022
2023    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2024    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2025    /// value.
2026    ///
2027    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2028    /// use it in boot before LaunchDarkly is available.
2029    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2030        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2031    }
2032
2033    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2034    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2035    /// var" value.
2036    ///
2037    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2038    /// use it in boot before LaunchDarkly is available.
2039    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2040        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2041    }
2042
2043    /// Updates the catalog `system_config_synced` "config" value to true.
2044    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2045        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2046    }
2047
2048    pub fn update_comment(
2049        &mut self,
2050        object_id: CommentObjectId,
2051        sub_component: Option<usize>,
2052        comment: Option<String>,
2053    ) -> Result<(), CatalogError> {
2054        let key = CommentKey {
2055            object_id,
2056            sub_component,
2057        };
2058        let value = comment.map(|c| CommentValue { comment: c });
2059        self.comments.set(key, value, self.op_id)?;
2060
2061        Ok(())
2062    }
2063
2064    pub fn drop_comments(
2065        &mut self,
2066        object_ids: &BTreeSet<CommentObjectId>,
2067    ) -> Result<(), CatalogError> {
2068        if object_ids.is_empty() {
2069            return Ok(());
2070        }
2071
2072        self.comments
2073            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2074        Ok(())
2075    }
2076
2077    pub fn update_source_references(
2078        &mut self,
2079        source_id: CatalogItemId,
2080        references: Vec<SourceReference>,
2081        updated_at: u64,
2082    ) -> Result<(), CatalogError> {
2083        let key = SourceReferencesKey { source_id };
2084        let value = SourceReferencesValue {
2085            references,
2086            updated_at,
2087        };
2088        self.source_references.set(key, Some(value), self.op_id)?;
2089        Ok(())
2090    }
2091
2092    /// Upserts persisted system configuration `name` to `value`.
2093    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2094        let key = ServerConfigurationKey {
2095            name: name.to_string(),
2096        };
2097        let value = ServerConfigurationValue { value };
2098        self.system_configurations
2099            .set(key, Some(value), self.op_id)?;
2100        Ok(())
2101    }
2102
2103    /// Removes persisted system configuration `name`.
2104    pub fn remove_system_config(&mut self, name: &str) {
2105        let key = ServerConfigurationKey {
2106            name: name.to_string(),
2107        };
2108        self.system_configurations
2109            .set(key, None, self.op_id)
2110            .expect("cannot have uniqueness violation");
2111    }
2112
2113    /// Removes all persisted system configurations.
2114    pub fn clear_system_configs(&mut self) {
2115        self.system_configurations.delete(|_k, _v| true, self.op_id);
2116    }
2117
2118    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2119        match self.configs.insert(
2120            ConfigKey { key: key.clone() },
2121            ConfigValue { value },
2122            self.op_id,
2123        ) {
2124            Ok(_) => Ok(()),
2125            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2126        }
2127    }
2128
2129    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2130        self.clusters
2131            .items()
2132            .into_iter()
2133            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2134    }
2135
2136    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2137        self.cluster_replicas
2138            .items()
2139            .into_iter()
2140            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2141    }
2142
2143    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2144        self.roles
2145            .items()
2146            .into_iter()
2147            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2148    }
2149
2150    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2151        self.network_policies
2152            .items()
2153            .into_iter()
2154            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2155    }
2156
2157    pub fn get_system_object_mappings(
2158        &self,
2159    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2160        self.system_gid_mapping
2161            .items()
2162            .into_iter()
2163            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2164    }
2165
2166    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2167        self.schemas
2168            .items()
2169            .into_iter()
2170            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2171    }
2172
2173    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2174        self.system_configurations
2175            .items()
2176            .into_iter()
2177            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2178    }
2179
2180    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2181        let key = SchemaKey { id: *id };
2182        self.schemas
2183            .get(&key)
2184            .map(|v| DurableType::from_key_value(key, v.clone()))
2185    }
2186
2187    pub fn get_introspection_source_indexes(
2188        &self,
2189        cluster_id: ClusterId,
2190    ) -> BTreeMap<&str, (GlobalId, u32)> {
2191        self.introspection_sources
2192            .items()
2193            .into_iter()
2194            .filter(|(k, _v)| k.cluster_id == cluster_id)
2195            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2196            .collect()
2197    }
2198
2199    pub fn get_catalog_content_version(&self) -> Option<&str> {
2200        self.settings
2201            .get(&SettingKey {
2202                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2203            })
2204            .map(|value| &*value.value)
2205    }
2206
2207    /// Commit the current operation within the transaction. This does not cause anything to be
2208    /// written durably, but signals to the current transaction that we are moving on to the next
2209    /// operation.
2210    ///
2211    /// Returns the updates of the committed operation.
2212    #[must_use]
2213    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2214        let updates = self.get_op_updates();
2215        self.commit_op();
2216        updates
2217    }
2218
2219    fn get_op_updates(&self) -> Vec<StateUpdate> {
2220        fn get_collection_op_updates<'a, T>(
2221            table_txn: &'a TableTransaction<T::Key, T::Value>,
2222            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2223            op: Timestamp,
2224        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2225        where
2226            T::Key: Ord + Eq + Clone + Debug,
2227            T::Value: Ord + Clone + Debug,
2228            T: DurableType,
2229        {
2230            table_txn
2231                .pending
2232                .iter()
2233                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2234                .filter_map(move |(k, v)| {
2235                    if v.ts == op {
2236                        let key = k.clone();
2237                        let value = v.value.clone();
2238                        let diff = v.diff.clone().try_into().expect("invalid diff");
2239                        let update = DurableType::from_key_value(key, value);
2240                        let kind = kind_fn(update);
2241                        Some((kind, diff))
2242                    } else {
2243                        None
2244                    }
2245                })
2246        }
2247
2248        fn get_large_collection_op_updates<'a, T>(
2249            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2250            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2251            op: Timestamp,
2252        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2253        where
2254            T::Key: Ord + Eq + Clone + Debug,
2255            T: DurableType<Value = ()>,
2256        {
2257            collection.iter().filter_map(move |(k, diff, ts)| {
2258                if *ts == op {
2259                    let key = k.clone();
2260                    let diff = diff.clone().try_into().expect("invalid diff");
2261                    let update = DurableType::from_key_value(key, ());
2262                    let kind = kind_fn(update);
2263                    Some((kind, diff))
2264                } else {
2265                    None
2266                }
2267            })
2268        }
2269
2270        let Transaction {
2271            durable_catalog: _,
2272            databases,
2273            schemas,
2274            items,
2275            comments,
2276            roles,
2277            role_auth,
2278            clusters,
2279            network_policies,
2280            cluster_replicas,
2281            introspection_sources,
2282            system_gid_mapping,
2283            system_configurations,
2284            default_privileges,
2285            source_references,
2286            system_privileges,
2287            audit_log_updates,
2288            storage_collection_metadata,
2289            unfinalized_shards,
2290            // Not representable as a `StateUpdate`.
2291            id_allocator: _,
2292            configs: _,
2293            settings: _,
2294            txn_wal_shard: _,
2295            upper,
2296            op_id: _,
2297        } = &self;
2298
2299        let updates = std::iter::empty()
2300            .chain(get_collection_op_updates(
2301                roles,
2302                StateUpdateKind::Role,
2303                self.op_id,
2304            ))
2305            .chain(get_collection_op_updates(
2306                role_auth,
2307                StateUpdateKind::RoleAuth,
2308                self.op_id,
2309            ))
2310            .chain(get_collection_op_updates(
2311                databases,
2312                StateUpdateKind::Database,
2313                self.op_id,
2314            ))
2315            .chain(get_collection_op_updates(
2316                schemas,
2317                StateUpdateKind::Schema,
2318                self.op_id,
2319            ))
2320            .chain(get_collection_op_updates(
2321                default_privileges,
2322                StateUpdateKind::DefaultPrivilege,
2323                self.op_id,
2324            ))
2325            .chain(get_collection_op_updates(
2326                system_privileges,
2327                StateUpdateKind::SystemPrivilege,
2328                self.op_id,
2329            ))
2330            .chain(get_collection_op_updates(
2331                system_configurations,
2332                StateUpdateKind::SystemConfiguration,
2333                self.op_id,
2334            ))
2335            .chain(get_collection_op_updates(
2336                clusters,
2337                StateUpdateKind::Cluster,
2338                self.op_id,
2339            ))
2340            .chain(get_collection_op_updates(
2341                network_policies,
2342                StateUpdateKind::NetworkPolicy,
2343                self.op_id,
2344            ))
2345            .chain(get_collection_op_updates(
2346                introspection_sources,
2347                StateUpdateKind::IntrospectionSourceIndex,
2348                self.op_id,
2349            ))
2350            .chain(get_collection_op_updates(
2351                cluster_replicas,
2352                StateUpdateKind::ClusterReplica,
2353                self.op_id,
2354            ))
2355            .chain(get_collection_op_updates(
2356                system_gid_mapping,
2357                StateUpdateKind::SystemObjectMapping,
2358                self.op_id,
2359            ))
2360            .chain(get_collection_op_updates(
2361                items,
2362                StateUpdateKind::Item,
2363                self.op_id,
2364            ))
2365            .chain(get_collection_op_updates(
2366                comments,
2367                StateUpdateKind::Comment,
2368                self.op_id,
2369            ))
2370            .chain(get_collection_op_updates(
2371                source_references,
2372                StateUpdateKind::SourceReferences,
2373                self.op_id,
2374            ))
2375            .chain(get_collection_op_updates(
2376                storage_collection_metadata,
2377                StateUpdateKind::StorageCollectionMetadata,
2378                self.op_id,
2379            ))
2380            .chain(get_collection_op_updates(
2381                unfinalized_shards,
2382                StateUpdateKind::UnfinalizedShard,
2383                self.op_id,
2384            ))
2385            .chain(get_large_collection_op_updates(
2386                audit_log_updates,
2387                StateUpdateKind::AuditLog,
2388                self.op_id,
2389            ))
2390            .map(|(kind, diff)| StateUpdate {
2391                kind,
2392                ts: upper.clone(),
2393                diff,
2394            })
2395            .collect();
2396
2397        updates
2398    }
2399
2400    pub fn is_savepoint(&self) -> bool {
2401        self.durable_catalog.is_savepoint()
2402    }
2403
2404    fn commit_op(&mut self) {
2405        self.op_id += 1;
2406    }
2407
2408    pub fn op_id(&self) -> Timestamp {
2409        self.op_id
2410    }
2411
2412    pub fn upper(&self) -> mz_repr::Timestamp {
2413        self.upper
2414    }
2415
2416    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2417        let audit_log_updates = self
2418            .audit_log_updates
2419            .into_iter()
2420            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2421            .collect();
2422
2423        let txn_batch = TransactionBatch {
2424            databases: self.databases.pending(),
2425            schemas: self.schemas.pending(),
2426            items: self.items.pending(),
2427            comments: self.comments.pending(),
2428            roles: self.roles.pending(),
2429            role_auth: self.role_auth.pending(),
2430            clusters: self.clusters.pending(),
2431            cluster_replicas: self.cluster_replicas.pending(),
2432            network_policies: self.network_policies.pending(),
2433            introspection_sources: self.introspection_sources.pending(),
2434            id_allocator: self.id_allocator.pending(),
2435            configs: self.configs.pending(),
2436            source_references: self.source_references.pending(),
2437            settings: self.settings.pending(),
2438            system_gid_mapping: self.system_gid_mapping.pending(),
2439            system_configurations: self.system_configurations.pending(),
2440            default_privileges: self.default_privileges.pending(),
2441            system_privileges: self.system_privileges.pending(),
2442            storage_collection_metadata: self.storage_collection_metadata.pending(),
2443            unfinalized_shards: self.unfinalized_shards.pending(),
2444            txn_wal_shard: self.txn_wal_shard.pending(),
2445            audit_log_updates,
2446            upper: self.upper,
2447        };
2448        (txn_batch, self.durable_catalog)
2449    }
2450
2451    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2452    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2453    /// before proceeding. In general, this must be fatal to the calling process. We do not
2454    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2455    ///
2456    /// The transaction is committed at `commit_ts`.
2457    ///
2458    /// Returns what the upper was directly after the transaction committed.
2459    ///
2460    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2461    /// catalog is not writeable.
2462    #[mz_ore::instrument(level = "debug")]
2463    pub(crate) async fn commit_internal(
2464        self,
2465        commit_ts: mz_repr::Timestamp,
2466    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2467        let (mut txn_batch, durable_catalog) = self.into_parts();
2468        let TransactionBatch {
2469            databases,
2470            schemas,
2471            items,
2472            comments,
2473            roles,
2474            role_auth,
2475            clusters,
2476            cluster_replicas,
2477            network_policies,
2478            introspection_sources,
2479            id_allocator,
2480            configs,
2481            source_references,
2482            settings,
2483            system_gid_mapping,
2484            system_configurations,
2485            default_privileges,
2486            system_privileges,
2487            storage_collection_metadata,
2488            unfinalized_shards,
2489            txn_wal_shard,
2490            audit_log_updates,
2491            upper,
2492        } = &mut txn_batch;
2493        // Consolidate in memory because it will likely be faster than consolidating after the
2494        // transaction has been made durable.
2495        differential_dataflow::consolidation::consolidate_updates(databases);
2496        differential_dataflow::consolidation::consolidate_updates(schemas);
2497        differential_dataflow::consolidation::consolidate_updates(items);
2498        differential_dataflow::consolidation::consolidate_updates(comments);
2499        differential_dataflow::consolidation::consolidate_updates(roles);
2500        differential_dataflow::consolidation::consolidate_updates(role_auth);
2501        differential_dataflow::consolidation::consolidate_updates(clusters);
2502        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2503        differential_dataflow::consolidation::consolidate_updates(network_policies);
2504        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2505        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2506        differential_dataflow::consolidation::consolidate_updates(configs);
2507        differential_dataflow::consolidation::consolidate_updates(settings);
2508        differential_dataflow::consolidation::consolidate_updates(source_references);
2509        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2510        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2511        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2512        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2513        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2514        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2515        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2516        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2517
2518        assert!(
2519            commit_ts >= *upper,
2520            "expected commit ts, {}, to be greater than or equal to upper, {}",
2521            commit_ts,
2522            upper
2523        );
2524        let upper = durable_catalog
2525            .commit_transaction(txn_batch, commit_ts)
2526            .await?;
2527        Ok((durable_catalog, upper))
2528    }
2529
2530    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2531    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2532    /// before proceeding. In general, this must be fatal to the calling process. We do not
2533    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2534    ///
2535    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2536    /// catalog is not writeable.
2537    ///
2538    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2539    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2540    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2541    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2542    ///
2543    /// An alternative implementation would be for the caller to explicitly consume their updates
2544    /// after committing and only then apply the updates in-memory. While this removes assumptions
2545    /// about the caller in this method, in practice it results in duplicate work on every commit.
2546    #[mz_ore::instrument(level = "debug")]
2547    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2548        let op_updates = self.get_op_updates();
2549        assert!(
2550            op_updates.is_empty(),
2551            "unconsumed transaction updates: {op_updates:?}"
2552        );
2553
2554        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2555        // Drain all the updates from the commit since it is assumed that they were already applied.
2556        let updates = durable_storage.sync_updates(upper).await?;
2557        // Writable and savepoint catalogs should have consumed all updates before committing a
2558        // transaction, otherwise the commit was performed with an out of date state.
2559        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2560        // updates before committing.
2561        soft_assert_no_log!(
2562            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2563            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2564        );
2565        Ok(())
2566    }
2567}
2568
2569use crate::durable::async_trait;
2570
2571use super::objects::{RoleAuthKey, RoleAuthValue};
2572
2573#[async_trait]
2574impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2575    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2576        self.storage_collection_metadata
2577            .items()
2578            .into_iter()
2579            .map(
2580                |(
2581                    StorageCollectionMetadataKey { id },
2582                    StorageCollectionMetadataValue { shard },
2583                )| { (*id, shard.clone()) },
2584            )
2585            .collect()
2586    }
2587
2588    fn insert_collection_metadata(
2589        &mut self,
2590        metadata: BTreeMap<GlobalId, ShardId>,
2591    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2592        for (id, shard) in metadata {
2593            self.storage_collection_metadata
2594                .insert(
2595                    StorageCollectionMetadataKey { id },
2596                    StorageCollectionMetadataValue {
2597                        shard: shard.clone(),
2598                    },
2599                    self.op_id,
2600                )
2601                .map_err(|err| match err {
2602                    DurableCatalogError::DuplicateKey => {
2603                        StorageError::CollectionMetadataAlreadyExists(id)
2604                    }
2605                    DurableCatalogError::UniquenessViolation => {
2606                        StorageError::PersistShardAlreadyInUse(shard)
2607                    }
2608                    err => StorageError::Generic(anyhow::anyhow!(err)),
2609                })?;
2610        }
2611        Ok(())
2612    }
2613
2614    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2615        let ks: Vec<_> = ids
2616            .into_iter()
2617            .map(|id| StorageCollectionMetadataKey { id })
2618            .collect();
2619        self.storage_collection_metadata
2620            .delete_by_keys(ks, self.op_id)
2621            .into_iter()
2622            .map(
2623                |(
2624                    StorageCollectionMetadataKey { id },
2625                    StorageCollectionMetadataValue { shard },
2626                )| (id, shard),
2627            )
2628            .collect()
2629    }
2630
2631    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2632        self.unfinalized_shards
2633            .items()
2634            .into_iter()
2635            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2636            .collect()
2637    }
2638
2639    fn insert_unfinalized_shards(
2640        &mut self,
2641        s: BTreeSet<ShardId>,
2642    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2643        for shard in s {
2644            match self
2645                .unfinalized_shards
2646                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2647            {
2648                // Inserting duplicate keys has no effect.
2649                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2650                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2651            };
2652        }
2653        Ok(())
2654    }
2655
2656    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2657        let ks: Vec<_> = shards
2658            .into_iter()
2659            .map(|shard| UnfinalizedShardKey { shard })
2660            .collect();
2661        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2662    }
2663
2664    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2665        self.txn_wal_shard
2666            .values()
2667            .iter()
2668            .next()
2669            .map(|TxnWalShardValue { shard }| *shard)
2670    }
2671
2672    fn write_txn_wal_shard(
2673        &mut self,
2674        shard: ShardId,
2675    ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2676        self.txn_wal_shard
2677            .insert((), TxnWalShardValue { shard }, self.op_id)
2678            .map_err(|err| match err {
2679                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2680                err => StorageError::Generic(anyhow::anyhow!(err)),
2681            })
2682    }
2683}
2684
2685/// Describes a set of changes to apply as the result of a catalog transaction.
2686#[derive(Debug, Clone, Default, PartialEq)]
2687pub struct TransactionBatch {
2688    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2689    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2690    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2691    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2692    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2693    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2694    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2695    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2696    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2697    pub(crate) introspection_sources: Vec<(
2698        proto::ClusterIntrospectionSourceIndexKey,
2699        proto::ClusterIntrospectionSourceIndexValue,
2700        Diff,
2701    )>,
2702    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2703    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2704    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2705    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2706    pub(crate) system_configurations: Vec<(
2707        proto::ServerConfigurationKey,
2708        proto::ServerConfigurationValue,
2709        Diff,
2710    )>,
2711    pub(crate) default_privileges: Vec<(
2712        proto::DefaultPrivilegesKey,
2713        proto::DefaultPrivilegesValue,
2714        Diff,
2715    )>,
2716    pub(crate) source_references: Vec<(
2717        proto::SourceReferencesKey,
2718        proto::SourceReferencesValue,
2719        Diff,
2720    )>,
2721    pub(crate) system_privileges: Vec<(
2722        proto::SystemPrivilegesKey,
2723        proto::SystemPrivilegesValue,
2724        Diff,
2725    )>,
2726    pub(crate) storage_collection_metadata: Vec<(
2727        proto::StorageCollectionMetadataKey,
2728        proto::StorageCollectionMetadataValue,
2729        Diff,
2730    )>,
2731    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2732    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2733    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2734    /// The upper of the catalog when the transaction started.
2735    pub(crate) upper: mz_repr::Timestamp,
2736}
2737
2738impl TransactionBatch {
2739    pub fn is_empty(&self) -> bool {
2740        let TransactionBatch {
2741            databases,
2742            schemas,
2743            items,
2744            comments,
2745            roles,
2746            role_auth,
2747            clusters,
2748            cluster_replicas,
2749            network_policies,
2750            introspection_sources,
2751            id_allocator,
2752            configs,
2753            settings,
2754            source_references,
2755            system_gid_mapping,
2756            system_configurations,
2757            default_privileges,
2758            system_privileges,
2759            storage_collection_metadata,
2760            unfinalized_shards,
2761            txn_wal_shard,
2762            audit_log_updates,
2763            upper: _,
2764        } = self;
2765        databases.is_empty()
2766            && schemas.is_empty()
2767            && items.is_empty()
2768            && comments.is_empty()
2769            && roles.is_empty()
2770            && role_auth.is_empty()
2771            && clusters.is_empty()
2772            && cluster_replicas.is_empty()
2773            && network_policies.is_empty()
2774            && introspection_sources.is_empty()
2775            && id_allocator.is_empty()
2776            && configs.is_empty()
2777            && settings.is_empty()
2778            && source_references.is_empty()
2779            && system_gid_mapping.is_empty()
2780            && system_configurations.is_empty()
2781            && default_privileges.is_empty()
2782            && system_privileges.is_empty()
2783            && storage_collection_metadata.is_empty()
2784            && unfinalized_shards.is_empty()
2785            && txn_wal_shard.is_empty()
2786            && audit_log_updates.is_empty()
2787    }
2788}
2789
2790#[derive(Debug, Clone, PartialEq, Eq)]
2791struct TransactionUpdate<V> {
2792    value: V,
2793    ts: Timestamp,
2794    diff: Diff,
2795}
2796
2797/// Utility trait to check for plan validity.
2798trait UniqueName {
2799    /// Does the item have a unique name? If yes, we can check for name equality in validity
2800    /// checking.
2801    const HAS_UNIQUE_NAME: bool;
2802    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2803    fn unique_name(&self) -> &str;
2804}
2805
2806mod unique_name {
2807    use crate::durable::objects::*;
2808
2809    macro_rules! impl_unique_name {
2810        ($($t:ty),* $(,)?) => {
2811            $(
2812                impl crate::durable::transaction::UniqueName for $t {
2813                    const HAS_UNIQUE_NAME: bool = true;
2814                    fn unique_name(&self) -> &str {
2815                        &self.name
2816                    }
2817                }
2818            )*
2819        };
2820    }
2821
2822    macro_rules! impl_no_unique_name {
2823        ($($t:ty),* $(,)?) => {
2824            $(
2825                impl crate::durable::transaction::UniqueName for $t {
2826                    const HAS_UNIQUE_NAME: bool = false;
2827                    fn unique_name(&self) -> &str {
2828                       ""
2829                    }
2830                }
2831            )*
2832        };
2833    }
2834
2835    impl_unique_name! {
2836        ClusterReplicaValue,
2837        ClusterValue,
2838        DatabaseValue,
2839        ItemValue,
2840        NetworkPolicyValue,
2841        RoleValue,
2842        SchemaValue,
2843    }
2844
2845    impl_no_unique_name!(
2846        (),
2847        ClusterIntrospectionSourceIndexValue,
2848        CommentValue,
2849        ConfigValue,
2850        DefaultPrivilegesValue,
2851        GidMappingValue,
2852        IdAllocValue,
2853        ServerConfigurationValue,
2854        SettingValue,
2855        SourceReferencesValue,
2856        StorageCollectionMetadataValue,
2857        SystemPrivilegesValue,
2858        TxnWalShardValue,
2859        RoleAuthValue,
2860    );
2861
2862    #[cfg(test)]
2863    mod test {
2864        impl_no_unique_name!(String,);
2865    }
2866}
2867
2868/// TableTransaction emulates some features of a typical SQL transaction over
2869/// table for a Collection.
2870///
2871/// It supports:
2872/// - uniqueness constraints
2873/// - transactional reads and writes (including read-your-writes before commit)
2874///
2875/// `K` is the primary key type. Multiple entries with the same key are disallowed.
2876/// `V` is the an arbitrary value type.
2877#[derive(Debug, PartialEq, Eq)]
2878struct TableTransaction<K, V> {
2879    initial: BTreeMap<K, V>,
2880    // The desired updates to keys after commit.
2881    // Invariant: Value is sorted by `ts`.
2882    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2883    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2884}
2885
2886impl<K, V> TableTransaction<K, V>
2887where
2888    K: Ord + Eq + Clone + Debug,
2889    V: Ord + Clone + Debug + UniqueName,
2890{
2891    /// Create a new TableTransaction with initial data.
2892    ///
2893    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
2894    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
2895    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
2896    /// over.
2897    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2898    where
2899        K: RustType<KP>,
2900        V: RustType<VP>,
2901    {
2902        let initial = initial
2903            .into_iter()
2904            .map(RustType::from_proto)
2905            .collect::<Result<_, _>>()?;
2906
2907        Ok(Self {
2908            initial,
2909            pending: BTreeMap::new(),
2910            uniqueness_violation: None,
2911        })
2912    }
2913
2914    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
2915    /// that determines whether there is a uniqueness violation among two values.
2916    fn new_with_uniqueness_fn<KP, VP>(
2917        initial: BTreeMap<KP, VP>,
2918        uniqueness_violation: fn(a: &V, b: &V) -> bool,
2919    ) -> Result<Self, TryFromProtoError>
2920    where
2921        K: RustType<KP>,
2922        V: RustType<VP>,
2923    {
2924        let initial = initial
2925            .into_iter()
2926            .map(RustType::from_proto)
2927            .collect::<Result<_, _>>()?;
2928
2929        Ok(Self {
2930            initial,
2931            pending: BTreeMap::new(),
2932            uniqueness_violation: Some(uniqueness_violation),
2933        })
2934    }
2935
2936    /// Consumes and returns the pending changes and their diffs. `Diff` is
2937    /// guaranteed to be 1 or -1.
2938    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2939    where
2940        K: RustType<KP>,
2941        V: RustType<VP>,
2942    {
2943        soft_assert_no_log!(self.verify().is_ok());
2944        // Pending describes the desired final state for some keys. K,V pairs should be
2945        // retracted if they already exist and were deleted or are being updated.
2946        self.pending
2947            .into_iter()
2948            .flat_map(|(k, v)| {
2949                let mut v: Vec<_> = v
2950                    .into_iter()
2951                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2952                    .collect();
2953                differential_dataflow::consolidation::consolidate(&mut v);
2954                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2955            })
2956            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2957            .collect()
2958    }
2959
2960    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
2961    ///
2962    /// Runtime is O(n^2), where n is the number of items in `self`, if
2963    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
2964    fn verify(&self) -> Result<(), DurableCatalogError> {
2965        if let Some(uniqueness_violation) = self.uniqueness_violation {
2966            // Compare each value to each other value and ensure they are unique.
2967            let items = self.values();
2968            if V::HAS_UNIQUE_NAME {
2969                let by_name: BTreeMap<_, _> = items
2970                    .iter()
2971                    .enumerate()
2972                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2973                    .collect();
2974                for (i, vi) in items.iter().enumerate() {
2975                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
2976                        if i != *j && uniqueness_violation(vi, *vj) {
2977                            return Err(DurableCatalogError::UniquenessViolation);
2978                        }
2979                    }
2980                }
2981            } else {
2982                for (i, vi) in items.iter().enumerate() {
2983                    for (j, vj) in items.iter().enumerate() {
2984                        if i != j && uniqueness_violation(vi, vj) {
2985                            return Err(DurableCatalogError::UniquenessViolation);
2986                        }
2987                    }
2988                }
2989            }
2990        }
2991        soft_assert_no_log!(
2992            self.pending
2993                .values()
2994                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
2995            "pending should be sorted by timestamp: {:?}",
2996            self.pending
2997        );
2998        Ok(())
2999    }
3000
3001    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3002    ///
3003    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3004    /// items in `keys`.
3005    fn verify_keys<'a>(
3006        &self,
3007        keys: impl IntoIterator<Item = &'a K>,
3008    ) -> Result<(), DurableCatalogError>
3009    where
3010        K: 'a,
3011    {
3012        if let Some(uniqueness_violation) = self.uniqueness_violation {
3013            let entries: Vec<_> = keys
3014                .into_iter()
3015                .filter_map(|key| self.get(key).map(|value| (key, value)))
3016                .collect();
3017            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3018            for (ki, vi) in self.items() {
3019                for (kj, vj) in &entries {
3020                    if ki != *kj && uniqueness_violation(vi, vj) {
3021                        return Err(DurableCatalogError::UniquenessViolation);
3022                    }
3023                }
3024            }
3025        }
3026        soft_assert_no_log!(self.verify().is_ok());
3027        Ok(())
3028    }
3029
3030    /// Iterates over the items viewable in the current transaction in arbitrary
3031    /// order and applies `f` on all key, value pairs.
3032    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3033        let mut seen = BTreeSet::new();
3034        for k in self.pending.keys() {
3035            seen.insert(k);
3036            let v = self.get(k);
3037            // Deleted items don't exist so shouldn't be visited, but still suppress
3038            // visiting the key later.
3039            if let Some(v) = v {
3040                f(k, v);
3041            }
3042        }
3043        for (k, v) in self.initial.iter() {
3044            // Add on initial items that don't have updates.
3045            if !seen.contains(k) {
3046                f(k, v);
3047            }
3048        }
3049    }
3050
3051    /// Returns the current value of `k`.
3052    fn get(&self, k: &K) -> Option<&V> {
3053        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3054        let mut updates = Vec::with_capacity(pending.len() + 1);
3055        if let Some(initial) = self.initial.get(k) {
3056            updates.push((initial, Diff::ONE));
3057        }
3058        updates.extend(
3059            pending
3060                .into_iter()
3061                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3062        );
3063
3064        differential_dataflow::consolidation::consolidate(&mut updates);
3065        assert!(updates.len() <= 1);
3066        updates.into_iter().next().map(|(v, _)| v)
3067    }
3068
3069    /// Returns the items viewable in the current transaction. The items are
3070    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3071    /// [`Self::for_values`].
3072    // Used by tests.
3073    #[cfg(test)]
3074    fn items_cloned(&self) -> BTreeMap<K, V> {
3075        let mut items = BTreeMap::new();
3076        self.for_values(|k, v| {
3077            items.insert(k.clone(), v.clone());
3078        });
3079        items
3080    }
3081
3082    /// Returns the items viewable in the current transaction as references. Returns a map
3083    /// of references.
3084    fn items(&self) -> BTreeMap<&K, &V> {
3085        let mut items = BTreeMap::new();
3086        self.for_values(|k, v| {
3087            items.insert(k, v);
3088        });
3089        items
3090    }
3091
3092    /// Returns the values viewable in the current transaction as references.
3093    fn values(&self) -> BTreeSet<&V> {
3094        let mut items = BTreeSet::new();
3095        self.for_values(|_, v| {
3096            items.insert(v);
3097        });
3098        items
3099    }
3100
3101    /// Returns the number of items viewable in the current transaction.
3102    fn len(&self) -> usize {
3103        let mut count = 0;
3104        self.for_values(|_, _| {
3105            count += 1;
3106        });
3107        count
3108    }
3109
3110    /// Iterates over the items viewable in the current transaction, and provides a
3111    /// map where additional pending items can be inserted, which will be appended
3112    /// to current pending items. Does not verify uniqueness.
3113    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3114        &mut self,
3115        mut f: F,
3116    ) {
3117        let mut pending = BTreeMap::new();
3118        self.for_values(|k, v| f(&mut pending, k, v));
3119        for (k, updates) in pending {
3120            self.pending.entry(k).or_default().extend(updates);
3121        }
3122    }
3123
3124    /// Inserts a new k,v pair.
3125    ///
3126    /// Returns an error if the uniqueness check failed or the key already exists.
3127    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3128        let mut violation = None;
3129        self.for_values(|for_k, for_v| {
3130            if &k == for_k {
3131                violation = Some(DurableCatalogError::DuplicateKey);
3132            }
3133            if let Some(uniqueness_violation) = self.uniqueness_violation {
3134                if uniqueness_violation(for_v, &v) {
3135                    violation = Some(DurableCatalogError::UniquenessViolation);
3136                }
3137            }
3138        });
3139        if let Some(violation) = violation {
3140            return Err(violation);
3141        }
3142        self.pending.entry(k).or_default().push(TransactionUpdate {
3143            value: v,
3144            ts,
3145            diff: Diff::ONE,
3146        });
3147        soft_assert_no_log!(self.verify().is_ok());
3148        Ok(())
3149    }
3150
3151    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3152    /// value should be updated, otherwise `None`. Returns the number of changed
3153    /// entries.
3154    ///
3155    /// Returns an error if the uniqueness check failed.
3156    ///
3157    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3158    /// better performance.
3159    fn update<F: Fn(&K, &V) -> Option<V>>(
3160        &mut self,
3161        f: F,
3162        ts: Timestamp,
3163    ) -> Result<Diff, DurableCatalogError> {
3164        let mut changed = Diff::ZERO;
3165        let mut keys = BTreeSet::new();
3166        // Keep a copy of pending in case of uniqueness violation.
3167        let pending = self.pending.clone();
3168        self.for_values_mut(|p, k, v| {
3169            if let Some(next) = f(k, v) {
3170                changed += Diff::ONE;
3171                keys.insert(k.clone());
3172                let updates = p.entry(k.clone()).or_default();
3173                updates.push(TransactionUpdate {
3174                    value: v.clone(),
3175                    ts,
3176                    diff: Diff::MINUS_ONE,
3177                });
3178                updates.push(TransactionUpdate {
3179                    value: next,
3180                    ts,
3181                    diff: Diff::ONE,
3182                });
3183            }
3184        });
3185        // Check for uniqueness violation.
3186        if let Err(err) = self.verify_keys(&keys) {
3187            self.pending = pending;
3188            Err(err)
3189        } else {
3190            Ok(changed)
3191        }
3192    }
3193
3194    /// Updates `k`, `v` pair if `k` already exists in `self`.
3195    ///
3196    /// Returns `true` if `k` was updated, `false` otherwise.
3197    /// Returns an error if the uniqueness check failed.
3198    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3199        if let Some(cur_v) = self.get(&k) {
3200            if v != *cur_v {
3201                self.set(k, Some(v), ts)?;
3202            }
3203            Ok(true)
3204        } else {
3205            Ok(false)
3206        }
3207    }
3208
3209    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3210    ///
3211    /// Returns the number of changed entries.
3212    /// Returns an error if the uniqueness check failed.
3213    fn update_by_keys(
3214        &mut self,
3215        kvs: impl IntoIterator<Item = (K, V)>,
3216        ts: Timestamp,
3217    ) -> Result<Diff, DurableCatalogError> {
3218        let kvs: Vec<_> = kvs
3219            .into_iter()
3220            .filter_map(|(k, v)| match self.get(&k) {
3221                // Record if updating this entry would be a no-op.
3222                Some(cur_v) => Some((*cur_v == v, k, v)),
3223                None => None,
3224            })
3225            .collect();
3226        let changed = kvs.len();
3227        let changed =
3228            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3229        let kvs = kvs
3230            .into_iter()
3231            // Filter out no-ops to save some work.
3232            .filter(|(no_op, _, _)| !no_op)
3233            .map(|(_, k, v)| (k, Some(v)))
3234            .collect();
3235        self.set_many(kvs, ts)?;
3236        Ok(changed)
3237    }
3238
3239    /// Set the value for a key. Returns the previous entry if the key existed,
3240    /// otherwise None.
3241    ///
3242    /// Returns an error if the uniqueness check failed.
3243    ///
3244    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3245    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3246        let prev = self.get(&k).cloned();
3247        let entry = self.pending.entry(k.clone()).or_default();
3248        let restore_len = entry.len();
3249
3250        match (v, prev.clone()) {
3251            (Some(v), Some(prev)) => {
3252                entry.push(TransactionUpdate {
3253                    value: prev,
3254                    ts,
3255                    diff: Diff::MINUS_ONE,
3256                });
3257                entry.push(TransactionUpdate {
3258                    value: v,
3259                    ts,
3260                    diff: Diff::ONE,
3261                });
3262            }
3263            (Some(v), None) => {
3264                entry.push(TransactionUpdate {
3265                    value: v,
3266                    ts,
3267                    diff: Diff::ONE,
3268                });
3269            }
3270            (None, Some(prev)) => {
3271                entry.push(TransactionUpdate {
3272                    value: prev,
3273                    ts,
3274                    diff: Diff::MINUS_ONE,
3275                });
3276            }
3277            (None, None) => {}
3278        }
3279
3280        // Check for uniqueness violation.
3281        if let Err(err) = self.verify_keys([&k]) {
3282            // Revert self.pending to the state it was in before calling this
3283            // function.
3284            let pending = self.pending.get_mut(&k).expect("inserted above");
3285            pending.truncate(restore_len);
3286            Err(err)
3287        } else {
3288            Ok(prev)
3289        }
3290    }
3291
3292    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3293    /// otherwise None.
3294    ///
3295    /// Returns an error if any uniqueness check failed.
3296    fn set_many(
3297        &mut self,
3298        kvs: BTreeMap<K, Option<V>>,
3299        ts: Timestamp,
3300    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3301        if kvs.is_empty() {
3302            return Ok(BTreeMap::new());
3303        }
3304
3305        let mut prevs = BTreeMap::new();
3306        let mut restores = BTreeMap::new();
3307
3308        for (k, v) in kvs {
3309            let prev = self.get(&k).cloned();
3310            let entry = self.pending.entry(k.clone()).or_default();
3311            restores.insert(k.clone(), entry.len());
3312
3313            match (v, prev.clone()) {
3314                (Some(v), Some(prev)) => {
3315                    entry.push(TransactionUpdate {
3316                        value: prev,
3317                        ts,
3318                        diff: Diff::MINUS_ONE,
3319                    });
3320                    entry.push(TransactionUpdate {
3321                        value: v,
3322                        ts,
3323                        diff: Diff::ONE,
3324                    });
3325                }
3326                (Some(v), None) => {
3327                    entry.push(TransactionUpdate {
3328                        value: v,
3329                        ts,
3330                        diff: Diff::ONE,
3331                    });
3332                }
3333                (None, Some(prev)) => {
3334                    entry.push(TransactionUpdate {
3335                        value: prev,
3336                        ts,
3337                        diff: Diff::MINUS_ONE,
3338                    });
3339                }
3340                (None, None) => {}
3341            }
3342
3343            prevs.insert(k, prev);
3344        }
3345
3346        // Check for uniqueness violation.
3347        if let Err(err) = self.verify_keys(prevs.keys()) {
3348            for (k, restore_len) in restores {
3349                // Revert self.pending to the state it was in before calling this
3350                // function.
3351                let pending = self.pending.get_mut(&k).expect("inserted above");
3352                pending.truncate(restore_len);
3353            }
3354            Err(err)
3355        } else {
3356            Ok(prevs)
3357        }
3358    }
3359
3360    /// Deletes items for which `f` returns true. Returns the keys and values of
3361    /// the deleted entries.
3362    ///
3363    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3364    /// better performance.
3365    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3366        let mut deleted = Vec::new();
3367        self.for_values_mut(|p, k, v| {
3368            if f(k, v) {
3369                deleted.push((k.clone(), v.clone()));
3370                p.entry(k.clone()).or_default().push(TransactionUpdate {
3371                    value: v.clone(),
3372                    ts,
3373                    diff: Diff::MINUS_ONE,
3374                });
3375            }
3376        });
3377        soft_assert_no_log!(self.verify().is_ok());
3378        deleted
3379    }
3380
3381    /// Deletes item with key `k`.
3382    ///
3383    /// Returns the value of the deleted entry, if it existed.
3384    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3385        self.set(k, None, ts)
3386            .expect("deleting an entry cannot violate uniqueness")
3387    }
3388
3389    /// Deletes items with key in `ks`.
3390    ///
3391    /// Returns the keys and values of the deleted entries.
3392    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3393        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3394        let prevs = self
3395            .set_many(kvs, ts)
3396            .expect("deleting entries cannot violate uniqueness");
3397        prevs
3398            .into_iter()
3399            .filter_map(|(k, v)| v.map(|v| (k, v)))
3400            .collect()
3401    }
3402}
3403
3404#[cfg(test)]
3405#[allow(clippy::unwrap_used)]
3406mod tests {
3407    use super::*;
3408    use mz_ore::{assert_none, assert_ok};
3409
3410    use mz_ore::now::SYSTEM_TIME;
3411    use mz_persist_client::PersistClient;
3412
3413    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3414    use crate::memory;
3415
3416    #[mz_ore::test]
3417    fn test_table_transaction_simple() {
3418        fn uniqueness_violation(a: &String, b: &String) -> bool {
3419            a == b
3420        }
3421        let mut table = TableTransaction::new_with_uniqueness_fn(
3422            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3423            uniqueness_violation,
3424        )
3425        .unwrap();
3426
3427        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3428        // for DurableCatalogError.
3429        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3430        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3431        assert!(
3432            table
3433                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3434                .is_err()
3435        );
3436        assert!(
3437            table
3438                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3439                .is_err()
3440        );
3441    }
3442
3443    #[mz_ore::test]
3444    fn test_table_transaction() {
3445        fn uniqueness_violation(a: &String, b: &String) -> bool {
3446            a == b
3447        }
3448        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3449
3450        fn commit(
3451            table: &mut BTreeMap<Vec<u8>, String>,
3452            mut pending: Vec<(Vec<u8>, String, Diff)>,
3453        ) {
3454            // Sort by diff so that we process retractions first.
3455            pending.sort_by(|a, b| a.2.cmp(&b.2));
3456            for (k, v, diff) in pending {
3457                if diff == Diff::MINUS_ONE {
3458                    let prev = table.remove(&k);
3459                    assert_eq!(prev, Some(v));
3460                } else if diff == Diff::ONE {
3461                    let prev = table.insert(k, v);
3462                    assert_eq!(prev, None);
3463                } else {
3464                    panic!("unexpected diff: {diff}");
3465                }
3466            }
3467        }
3468
3469        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3470        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3471        let mut table_txn =
3472            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3473        assert_eq!(table_txn.items_cloned(), table);
3474        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3475        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3476        assert_eq!(
3477            table_txn.items_cloned(),
3478            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3479        );
3480        assert_eq!(
3481            table_txn
3482                .update(|_k, _v| Some("v3".to_string()), 2)
3483                .unwrap(),
3484            Diff::ONE
3485        );
3486
3487        // Uniqueness violation.
3488        table_txn
3489            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3490            .unwrap_err();
3491
3492        table_txn
3493            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3494            .unwrap();
3495        assert_eq!(
3496            table_txn.items_cloned(),
3497            BTreeMap::from([
3498                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3499                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3500            ])
3501        );
3502        let err = table_txn
3503            .update(|_k, _v| Some("v1".to_string()), 5)
3504            .unwrap_err();
3505        assert!(
3506            matches!(err, DurableCatalogError::UniquenessViolation),
3507            "unexpected err: {err:?}"
3508        );
3509        let pending = table_txn.pending();
3510        assert_eq!(
3511            pending,
3512            vec![
3513                (
3514                    1i64.to_le_bytes().to_vec(),
3515                    "v1".to_string(),
3516                    Diff::MINUS_ONE
3517                ),
3518                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3519                (
3520                    2i64.to_le_bytes().to_vec(),
3521                    "v2".to_string(),
3522                    Diff::MINUS_ONE
3523                ),
3524                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3525            ]
3526        );
3527        commit(&mut table, pending);
3528        assert_eq!(
3529            table,
3530            BTreeMap::from([
3531                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3532                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3533            ])
3534        );
3535
3536        let mut table_txn =
3537            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3538        // Deleting then creating an item that has a uniqueness violation should work.
3539        assert_eq!(
3540            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3541            1
3542        );
3543        table_txn
3544            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3545            .unwrap();
3546        // Uniqueness violation in value.
3547        table_txn
3548            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3549            .unwrap_err();
3550        // Key already exists, expect error.
3551        table_txn
3552            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3553            .unwrap_err();
3554        assert_eq!(
3555            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3556            1
3557        );
3558        // Both the inserts work now because the key and uniqueness violation are gone.
3559        table_txn
3560            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3561            .unwrap();
3562        table_txn
3563            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3564            .unwrap();
3565        let pending = table_txn.pending();
3566        assert_eq!(
3567            pending,
3568            vec![
3569                (
3570                    1i64.to_le_bytes().to_vec(),
3571                    "v3".to_string(),
3572                    Diff::MINUS_ONE
3573                ),
3574                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3575                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3576            ]
3577        );
3578        commit(&mut table, pending);
3579        assert_eq!(
3580            table,
3581            BTreeMap::from([
3582                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3583                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3584                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3585            ])
3586        );
3587
3588        let mut table_txn =
3589            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3590        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3591        table_txn
3592            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3593            .unwrap();
3594
3595        commit(&mut table, table_txn.pending());
3596        assert_eq!(
3597            table,
3598            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3599        );
3600
3601        let mut table_txn =
3602            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3603        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3604        table_txn
3605            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3606            .unwrap();
3607        commit(&mut table, table_txn.pending());
3608        assert_eq!(
3609            table,
3610            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3611        );
3612
3613        // Verify we don't try to delete v3 or v4 during commit.
3614        let mut table_txn =
3615            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3616        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3617        table_txn
3618            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3619            .unwrap();
3620        table_txn
3621            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3622            .unwrap_err();
3623        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3624        table_txn
3625            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3626            .unwrap();
3627        commit(&mut table, table_txn.pending());
3628        assert_eq!(
3629            table.clone().into_iter().collect::<Vec<_>>(),
3630            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3631        );
3632
3633        // Test `set`.
3634        let mut table_txn =
3635            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3636        // Uniqueness violation.
3637        table_txn
3638            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3639            .unwrap_err();
3640        table_txn
3641            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3642            .unwrap();
3643        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3644        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3645        let pending = table_txn.pending();
3646        assert_eq!(
3647            pending,
3648            vec![
3649                (
3650                    1i64.to_le_bytes().to_vec(),
3651                    "v5".to_string(),
3652                    Diff::MINUS_ONE
3653                ),
3654                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3655            ]
3656        );
3657        commit(&mut table, pending);
3658        assert_eq!(
3659            table,
3660            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3661        );
3662
3663        // Duplicate `set`.
3664        let mut table_txn =
3665            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3666        table_txn
3667            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3668            .unwrap();
3669        let pending = table_txn.pending::<Vec<u8>, String>();
3670        assert!(pending.is_empty());
3671
3672        // Test `set_many`.
3673        let mut table_txn =
3674            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3675        // Uniqueness violation.
3676        table_txn
3677            .set_many(
3678                BTreeMap::from([
3679                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3680                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3681                ]),
3682                0,
3683            )
3684            .unwrap_err();
3685        table_txn
3686            .set_many(
3687                BTreeMap::from([
3688                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3689                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3690                ]),
3691                1,
3692            )
3693            .unwrap();
3694        table_txn
3695            .set_many(
3696                BTreeMap::from([
3697                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3698                    (3i64.to_le_bytes().to_vec(), None),
3699                ]),
3700                2,
3701            )
3702            .unwrap();
3703        let pending = table_txn.pending();
3704        assert_eq!(
3705            pending,
3706            vec![
3707                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3708                (
3709                    3i64.to_le_bytes().to_vec(),
3710                    "v6".to_string(),
3711                    Diff::MINUS_ONE
3712                ),
3713                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3714            ]
3715        );
3716        commit(&mut table, pending);
3717        assert_eq!(
3718            table,
3719            BTreeMap::from([
3720                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3721                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3722            ])
3723        );
3724
3725        // Duplicate `set_many`.
3726        let mut table_txn =
3727            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3728        table_txn
3729            .set_many(
3730                BTreeMap::from([
3731                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3732                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3733                ]),
3734                0,
3735            )
3736            .unwrap();
3737        let pending = table_txn.pending::<Vec<u8>, String>();
3738        assert!(pending.is_empty());
3739        commit(&mut table, pending);
3740        assert_eq!(
3741            table,
3742            BTreeMap::from([
3743                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3744                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3745            ])
3746        );
3747
3748        // Test `update_by_key`
3749        let mut table_txn =
3750            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3751        // Uniqueness violation.
3752        table_txn
3753            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3754            .unwrap_err();
3755        assert!(
3756            table_txn
3757                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3758                .unwrap()
3759        );
3760        assert!(
3761            !table_txn
3762                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3763                .unwrap()
3764        );
3765        let pending = table_txn.pending();
3766        assert_eq!(
3767            pending,
3768            vec![
3769                (
3770                    1i64.to_le_bytes().to_vec(),
3771                    "v6".to_string(),
3772                    Diff::MINUS_ONE
3773                ),
3774                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3775            ]
3776        );
3777        commit(&mut table, pending);
3778        assert_eq!(
3779            table,
3780            BTreeMap::from([
3781                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3782                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3783            ])
3784        );
3785
3786        // Duplicate `update_by_key`.
3787        let mut table_txn =
3788            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3789        assert!(
3790            table_txn
3791                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3792                .unwrap()
3793        );
3794        let pending = table_txn.pending::<Vec<u8>, String>();
3795        assert!(pending.is_empty());
3796        commit(&mut table, pending);
3797        assert_eq!(
3798            table,
3799            BTreeMap::from([
3800                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3801                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3802            ])
3803        );
3804
3805        // Test `update_by_keys`
3806        let mut table_txn =
3807            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3808        // Uniqueness violation.
3809        table_txn
3810            .update_by_keys(
3811                [
3812                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3813                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3814                ],
3815                0,
3816            )
3817            .unwrap_err();
3818        let n = table_txn
3819            .update_by_keys(
3820                [
3821                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3822                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3823                ],
3824                1,
3825            )
3826            .unwrap();
3827        assert_eq!(n, Diff::ONE);
3828        let n = table_txn
3829            .update_by_keys(
3830                [
3831                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3832                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3833                ],
3834                2,
3835            )
3836            .unwrap();
3837        assert_eq!(n, Diff::ZERO);
3838        let pending = table_txn.pending();
3839        assert_eq!(
3840            pending,
3841            vec![
3842                (
3843                    1i64.to_le_bytes().to_vec(),
3844                    "v8".to_string(),
3845                    Diff::MINUS_ONE
3846                ),
3847                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3848            ]
3849        );
3850        commit(&mut table, pending);
3851        assert_eq!(
3852            table,
3853            BTreeMap::from([
3854                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3855                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3856            ])
3857        );
3858
3859        // Duplicate `update_by_keys`.
3860        let mut table_txn =
3861            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3862        let n = table_txn
3863            .update_by_keys(
3864                [
3865                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3866                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3867                ],
3868                0,
3869            )
3870            .unwrap();
3871        assert_eq!(n, Diff::from(2));
3872        let pending = table_txn.pending::<Vec<u8>, String>();
3873        assert!(pending.is_empty());
3874        commit(&mut table, pending);
3875        assert_eq!(
3876            table,
3877            BTreeMap::from([
3878                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3879                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3880            ])
3881        );
3882
3883        // Test `delete_by_key`
3884        let mut table_txn =
3885            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3886        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3887        assert_eq!(prev, Some("v9".to_string()));
3888        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3889        assert_none!(prev);
3890        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3891        assert_none!(prev);
3892        let pending = table_txn.pending();
3893        assert_eq!(
3894            pending,
3895            vec![(
3896                1i64.to_le_bytes().to_vec(),
3897                "v9".to_string(),
3898                Diff::MINUS_ONE
3899            ),]
3900        );
3901        commit(&mut table, pending);
3902        assert_eq!(
3903            table,
3904            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3905        );
3906
3907        // Test `delete_by_keys`
3908        let mut table_txn =
3909            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3910        let prevs = table_txn.delete_by_keys(
3911            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3912            0,
3913        );
3914        assert_eq!(
3915            prevs,
3916            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3917        );
3918        let prevs = table_txn.delete_by_keys(
3919            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3920            1,
3921        );
3922        assert_eq!(prevs, vec![]);
3923        let prevs = table_txn.delete_by_keys(
3924            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3925            2,
3926        );
3927        assert_eq!(prevs, vec![]);
3928        let pending = table_txn.pending();
3929        assert_eq!(
3930            pending,
3931            vec![(
3932                42i64.to_le_bytes().to_vec(),
3933                "v7".to_string(),
3934                Diff::MINUS_ONE
3935            ),]
3936        );
3937        commit(&mut table, pending);
3938        assert_eq!(table, BTreeMap::new());
3939    }
3940
3941    #[mz_ore::test(tokio::test)]
3942    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3943    async fn test_savepoint() {
3944        let persist_client = PersistClient::new_for_tests().await;
3945        let state_builder =
3946            TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
3947
3948        // Initialize catalog.
3949        let _ = state_builder
3950            .clone()
3951            .unwrap_build()
3952            .await
3953            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3954            .await
3955            .unwrap()
3956            .0;
3957        let mut savepoint_state = state_builder
3958            .unwrap_build()
3959            .await
3960            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3961            .await
3962            .unwrap()
3963            .0;
3964
3965        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3966        assert!(!initial_snapshot.is_empty());
3967
3968        let db_name = "db";
3969        let db_owner = RoleId::User(42);
3970        let db_privileges = Vec::new();
3971        let mut txn = savepoint_state.transaction().await.unwrap();
3972        let (db_id, db_oid) = txn
3973            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
3974            .unwrap();
3975        let commit_ts = txn.upper();
3976        txn.commit_internal(commit_ts).await.unwrap();
3977        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
3978        let update = updates.into_element();
3979
3980        assert_eq!(update.diff, StateDiff::Addition);
3981
3982        let db = match update.kind {
3983            memory::objects::StateUpdateKind::Database(db) => db,
3984            update => panic!("unexpected update: {update:?}"),
3985        };
3986
3987        assert_eq!(db_id, db.id);
3988        assert_eq!(db_oid, db.oid);
3989        assert_eq!(db_name, db.name);
3990        assert_eq!(db_owner, db.owner_id);
3991        assert_eq!(db_privileges, db.privileges);
3992    }
3993
3994    #[mz_ore::test]
3995    fn test_allocate_introspection_source_index_id() {
3996        let cluster_variant: u8 = 0b0000_0001;
3997        let cluster_id_inner: u64 =
3998            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
3999        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4000
4001        let cluster_id = ClusterId::System(cluster_id_inner);
4002        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4003
4004        let introspection_source_index_id: u64 =
4005            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4006
4007        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4008        {
4009            let mut cluster_variant_mask = 0xFF << 56;
4010            cluster_variant_mask &= introspection_source_index_id;
4011            cluster_variant_mask >>= 56;
4012            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4013        }
4014
4015        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4016        {
4017            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4018            cluster_id_inner_mask &= introspection_source_index_id;
4019            cluster_id_inner_mask >>= 8;
4020            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4021        }
4022
4023        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4024        {
4025            let mut log_variant_mask = 0xFF;
4026            log_variant_mask &= introspection_source_index_id;
4027            assert_eq!(
4028                log_variant_mask,
4029                u64::from(timely_messages_received_log_variant)
4030            );
4031        }
4032
4033        let (catalog_item_id, global_id) =
4034            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4035
4036        assert_eq!(
4037            catalog_item_id,
4038            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4039        );
4040        assert_eq!(
4041            global_id,
4042            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4043        );
4044    }
4045}