Skip to main content

mz_catalog/durable/
transaction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34    CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35    RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46    ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47    WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51    AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52    ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53    ClusterReplicaValue, ClusterSystemConfiguration, ClusterSystemConfigurationKey,
54    ClusterSystemConfigurationValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey,
55    ConfigValue, Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey,
56    DefaultPrivilegesValue, DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
57    IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
58    ReplicaConfig, ReplicaSystemConfiguration, ReplicaSystemConfigurationKey,
59    ReplicaSystemConfigurationValue, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
60    ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
61    SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
62    StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
63    SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
64};
65use crate::durable::{
66    AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
67    DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
68    EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
69    SCHEMA_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
70    SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
71    USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
72};
73use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
74
75type Timestamp = u64;
76
77/// A [`Transaction`] batches multiple catalog operations together and commits them atomically.
78/// An operation also logically groups multiple catalog updates together.
79#[derive(Derivative)]
80#[derivative(Debug)]
81pub struct Transaction<'a> {
82    #[derivative(Debug = "ignore")]
83    #[derivative(PartialEq = "ignore")]
84    durable_catalog: &'a mut dyn DurableCatalogState,
85    databases: TableTransaction<DatabaseKey, DatabaseValue>,
86    schemas: TableTransaction<SchemaKey, SchemaValue>,
87    items: TableTransaction<ItemKey, ItemValue>,
88    comments: TableTransaction<CommentKey, CommentValue>,
89    roles: TableTransaction<RoleKey, RoleValue>,
90    role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
91    clusters: TableTransaction<ClusterKey, ClusterValue>,
92    cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
93    introspection_sources:
94        TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
95    id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
96    configs: TableTransaction<ConfigKey, ConfigValue>,
97    settings: TableTransaction<SettingKey, SettingValue>,
98    system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
99    system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
100    cluster_system_configurations:
101        TableTransaction<ClusterSystemConfigurationKey, ClusterSystemConfigurationValue>,
102    replica_system_configurations:
103        TableTransaction<ReplicaSystemConfigurationKey, ReplicaSystemConfigurationValue>,
104    default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
105    source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
106    system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
107    network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
108    storage_collection_metadata:
109        TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
110    unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
111    txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
112    // Don't make this a table transaction so that it's not read into the
113    // in-memory cache.
114    audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
115    /// The upper of `durable_catalog` at the start of the transaction.
116    upper: mz_repr::Timestamp,
117    /// The ID of the current operation of this transaction.
118    op_id: Timestamp,
119}
120
121impl<'a> Transaction<'a> {
122    pub fn new(
123        durable_catalog: &'a mut dyn DurableCatalogState,
124        Snapshot {
125            databases,
126            schemas,
127            roles,
128            role_auth,
129            items,
130            comments,
131            clusters,
132            network_policies,
133            cluster_replicas,
134            introspection_sources,
135            id_allocator,
136            configs,
137            settings,
138            source_references,
139            system_object_mappings,
140            system_configurations,
141            cluster_system_configurations,
142            replica_system_configurations,
143            default_privileges,
144            system_privileges,
145            storage_collection_metadata,
146            unfinalized_shards,
147            txn_wal_shard,
148        }: Snapshot,
149        upper: mz_repr::Timestamp,
150    ) -> Result<Transaction<'a>, CatalogError> {
151        Ok(Transaction {
152            durable_catalog,
153            databases: TableTransaction::new_with_uniqueness_fn(
154                databases,
155                |a: &DatabaseValue, b| a.name == b.name,
156            )?,
157            schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
158                a.database_id == b.database_id && a.name == b.name
159            })?,
160            items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
161                a.schema_id == b.schema_id && a.name == b.name && {
162                    // `item_type` is slow, only compute if needed.
163                    let a_type = a.item_type();
164                    let b_type = b.item_type();
165                    (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
166                        || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
167                        || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
168                }
169            })?,
170            comments: TableTransaction::new(comments)?,
171            roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
172                a.name == b.name
173            })?,
174            role_auth: TableTransaction::new(role_auth)?,
175            clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
176                a.name == b.name
177            })?,
178            network_policies: TableTransaction::new_with_uniqueness_fn(
179                network_policies,
180                |a: &NetworkPolicyValue, b| a.name == b.name,
181            )?,
182            cluster_replicas: TableTransaction::new_with_uniqueness_fn(
183                cluster_replicas,
184                |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
185            )?,
186            introspection_sources: TableTransaction::new(introspection_sources)?,
187            id_allocator: TableTransaction::new(id_allocator)?,
188            configs: TableTransaction::new(configs)?,
189            settings: TableTransaction::new(settings)?,
190            source_references: TableTransaction::new(source_references)?,
191            system_gid_mapping: TableTransaction::new(system_object_mappings)?,
192            system_configurations: TableTransaction::new(system_configurations)?,
193            cluster_system_configurations: TableTransaction::new(cluster_system_configurations)?,
194            replica_system_configurations: TableTransaction::new(replica_system_configurations)?,
195            default_privileges: TableTransaction::new(default_privileges)?,
196            system_privileges: TableTransaction::new(system_privileges)?,
197            storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
198            unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
199            // Uniqueness violations for this value occur at the key rather than
200            // the value (the key is the unit struct `()` so this is a singleton
201            // value).
202            txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
203            audit_log_updates: Vec::new(),
204            upper,
205            op_id: 0,
206        })
207    }
208
209    pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
210        let key = ItemKey { id: *id };
211        self.items
212            .get(&key)
213            .map(|v| DurableType::from_key_value(key, v.clone()))
214    }
215
216    pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
217        self.items
218            .items()
219            .into_iter()
220            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
221            .sorted_by_key(|Item { id, .. }| *id)
222    }
223
224    pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
225        self.insert_audit_log_events([event]);
226    }
227
228    pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
229        let events = events
230            .into_iter()
231            .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
232        self.audit_log_updates.extend(events);
233    }
234
235    pub fn insert_user_database(
236        &mut self,
237        database_name: &str,
238        owner_id: RoleId,
239        privileges: Vec<MzAclItem>,
240        temporary_oids: &HashSet<u32>,
241    ) -> Result<(DatabaseId, u32), CatalogError> {
242        let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
243        let id = DatabaseId::User(id);
244        let oid = self.allocate_oid(temporary_oids)?;
245        self.insert_database(id, database_name, owner_id, privileges, oid)?;
246        Ok((id, oid))
247    }
248
249    pub(crate) fn insert_database(
250        &mut self,
251        id: DatabaseId,
252        database_name: &str,
253        owner_id: RoleId,
254        privileges: Vec<MzAclItem>,
255        oid: u32,
256    ) -> Result<u32, CatalogError> {
257        match self.databases.insert(
258            DatabaseKey { id },
259            DatabaseValue {
260                name: database_name.to_string(),
261                owner_id,
262                privileges,
263                oid,
264            },
265            self.op_id,
266        ) {
267            Ok(_) => Ok(oid),
268            Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
269        }
270    }
271
272    pub fn insert_user_schema(
273        &mut self,
274        database_id: DatabaseId,
275        schema_name: &str,
276        owner_id: RoleId,
277        privileges: Vec<MzAclItem>,
278        temporary_oids: &HashSet<u32>,
279    ) -> Result<(SchemaId, u32), CatalogError> {
280        let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
281        let id = SchemaId::User(id);
282        let oid = self.allocate_oid(temporary_oids)?;
283        self.insert_schema(
284            id,
285            Some(database_id),
286            schema_name.to_string(),
287            owner_id,
288            privileges,
289            oid,
290        )?;
291        Ok((id, oid))
292    }
293
294    pub fn insert_system_schema(
295        &mut self,
296        schema_id: u64,
297        schema_name: &str,
298        owner_id: RoleId,
299        privileges: Vec<MzAclItem>,
300        oid: u32,
301    ) -> Result<(), CatalogError> {
302        let id = SchemaId::System(schema_id);
303        self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
304    }
305
306    pub(crate) fn insert_schema(
307        &mut self,
308        schema_id: SchemaId,
309        database_id: Option<DatabaseId>,
310        schema_name: String,
311        owner_id: RoleId,
312        privileges: Vec<MzAclItem>,
313        oid: u32,
314    ) -> Result<(), CatalogError> {
315        match self.schemas.insert(
316            SchemaKey { id: schema_id },
317            SchemaValue {
318                database_id,
319                name: schema_name.clone(),
320                owner_id,
321                privileges,
322                oid,
323            },
324            self.op_id,
325        ) {
326            Ok(_) => Ok(()),
327            Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
328        }
329    }
330
331    pub fn insert_builtin_role(
332        &mut self,
333        id: RoleId,
334        name: String,
335        attributes: RoleAttributesRaw,
336        membership: RoleMembership,
337        vars: RoleVars,
338        oid: u32,
339    ) -> Result<RoleId, CatalogError> {
340        soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
341        self.insert_role(id, name, attributes, membership, vars, oid)?;
342        Ok(id)
343    }
344
345    pub fn insert_user_role(
346        &mut self,
347        name: String,
348        attributes: RoleAttributesRaw,
349        membership: RoleMembership,
350        vars: RoleVars,
351        temporary_oids: &HashSet<u32>,
352    ) -> Result<(RoleId, u32), CatalogError> {
353        let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
354        let id = RoleId::User(id);
355        let oid = self.allocate_oid(temporary_oids)?;
356        self.insert_role(id, name, attributes, membership, vars, oid)?;
357        Ok((id, oid))
358    }
359
360    fn insert_role(
361        &mut self,
362        id: RoleId,
363        name: String,
364        attributes: RoleAttributesRaw,
365        membership: RoleMembership,
366        vars: RoleVars,
367        oid: u32,
368    ) -> Result<(), CatalogError> {
369        if let Some(ref password) = attributes.password {
370            let hash = mz_auth::hash::scram256_hash(
371                password,
372                &attributes
373                    .scram_iterations
374                    .or_else(|| {
375                        soft_panic_or_log!(
376                            "Hash iterations must be set if a password is provided."
377                        );
378                        None
379                    })
380                    // This should never happen, but rather than panicking we'll
381                    // set a known secure value as a fallback.
382                    .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
383            )
384            .expect("password hash should be valid");
385            match self.role_auth.insert(
386                RoleAuthKey { role_id: id },
387                RoleAuthValue {
388                    password_hash: Some(hash),
389                    updated_at: SYSTEM_TIME(),
390                },
391                self.op_id,
392            ) {
393                Ok(_) => {}
394                Err(_) => {
395                    return Err(SqlCatalogError::RoleAlreadyExists(name).into());
396                }
397            }
398        }
399
400        match self.roles.insert(
401            RoleKey { id },
402            RoleValue {
403                name: name.clone(),
404                attributes: attributes.into(),
405                membership,
406                vars,
407                oid,
408            },
409            self.op_id,
410        ) {
411            Ok(_) => Ok(()),
412            Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
413        }
414    }
415
416    /// Panics if any introspection source id is not a system id
417    pub fn insert_user_cluster(
418        &mut self,
419        cluster_id: ClusterId,
420        cluster_name: &str,
421        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
422        owner_id: RoleId,
423        privileges: Vec<MzAclItem>,
424        config: ClusterConfig,
425        temporary_oids: &HashSet<u32>,
426    ) -> Result<(), CatalogError> {
427        self.insert_cluster(
428            cluster_id,
429            cluster_name,
430            introspection_source_indexes,
431            owner_id,
432            privileges,
433            config,
434            temporary_oids,
435        )
436    }
437
438    /// Panics if any introspection source id is not a system id
439    pub fn insert_system_cluster(
440        &mut self,
441        cluster_name: &str,
442        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
443        privileges: Vec<MzAclItem>,
444        owner_id: RoleId,
445        config: ClusterConfig,
446        temporary_oids: &HashSet<u32>,
447    ) -> Result<ClusterId, CatalogError> {
448        let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
449        let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
450        self.insert_cluster(
451            cluster_id,
452            cluster_name,
453            introspection_source_indexes,
454            owner_id,
455            privileges,
456            config,
457            temporary_oids,
458        )?;
459        Ok(cluster_id)
460    }
461
462    fn insert_cluster(
463        &mut self,
464        cluster_id: ClusterId,
465        cluster_name: &str,
466        introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
467        owner_id: RoleId,
468        privileges: Vec<MzAclItem>,
469        config: ClusterConfig,
470        temporary_oids: &HashSet<u32>,
471    ) -> Result<(), CatalogError> {
472        if let Err(_) = self.clusters.insert(
473            ClusterKey { id: cluster_id },
474            ClusterValue {
475                name: cluster_name.to_string(),
476                owner_id,
477                privileges,
478                config,
479            },
480            self.op_id,
481        ) {
482            return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
483        };
484
485        let amount = usize_to_u64(introspection_source_indexes.len());
486        let oids = self.allocate_oids(amount, temporary_oids)?;
487        let introspection_source_indexes: Vec<_> = introspection_source_indexes
488            .into_iter()
489            .zip_eq(oids)
490            .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
491            .collect();
492        for (builtin, item_id, index_id, oid) in introspection_source_indexes {
493            let introspection_source_index = IntrospectionSourceIndex {
494                cluster_id,
495                name: builtin.name.to_string(),
496                item_id,
497                index_id,
498                oid,
499            };
500            let (key, value) = introspection_source_index.into_key_value();
501            self.introspection_sources
502                .insert(key, value, self.op_id)
503                .expect("no uniqueness violation");
504        }
505
506        Ok(())
507    }
508
509    pub fn rename_cluster(
510        &mut self,
511        cluster_id: ClusterId,
512        cluster_name: &str,
513        cluster_to_name: &str,
514    ) -> Result<(), CatalogError> {
515        let key = ClusterKey { id: cluster_id };
516
517        match self.clusters.update(
518            |k, v| {
519                if *k == key {
520                    let mut value = v.clone();
521                    value.name = cluster_to_name.to_string();
522                    Some(value)
523                } else {
524                    None
525                }
526            },
527            self.op_id,
528        )? {
529            Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
530            Diff::ONE => Ok(()),
531            n => panic!(
532                "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
533            ),
534        }
535    }
536
537    pub fn rename_cluster_replica(
538        &mut self,
539        replica_id: ReplicaId,
540        replica_name: &QualifiedReplica,
541        replica_to_name: &str,
542    ) -> Result<(), CatalogError> {
543        let key = ClusterReplicaKey { id: replica_id };
544
545        match self.cluster_replicas.update(
546            |k, v| {
547                if *k == key {
548                    let mut value = v.clone();
549                    value.name = replica_to_name.to_string();
550                    Some(value)
551                } else {
552                    None
553                }
554            },
555            self.op_id,
556        )? {
557            Diff::ZERO => {
558                Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
559            }
560            Diff::ONE => Ok(()),
561            n => panic!(
562                "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
563            ),
564        }
565    }
566
567    pub fn insert_cluster_replica(
568        &mut self,
569        cluster_id: ClusterId,
570        replica_name: &str,
571        config: ReplicaConfig,
572        owner_id: RoleId,
573    ) -> Result<ReplicaId, CatalogError> {
574        let replica_id = match cluster_id {
575            ClusterId::System(_) => self.allocate_system_replica_id()?,
576            ClusterId::User(_) => self.allocate_user_replica_id()?,
577        };
578        self.insert_cluster_replica_with_id(
579            cluster_id,
580            replica_id,
581            replica_name,
582            config,
583            owner_id,
584        )?;
585        Ok(replica_id)
586    }
587
588    pub(crate) fn insert_cluster_replica_with_id(
589        &mut self,
590        cluster_id: ClusterId,
591        replica_id: ReplicaId,
592        replica_name: &str,
593        config: ReplicaConfig,
594        owner_id: RoleId,
595    ) -> Result<(), CatalogError> {
596        if let Err(_) = self.cluster_replicas.insert(
597            ClusterReplicaKey { id: replica_id },
598            ClusterReplicaValue {
599                cluster_id,
600                name: replica_name.into(),
601                config,
602                owner_id,
603            },
604            self.op_id,
605        ) {
606            let cluster = self
607                .clusters
608                .get(&ClusterKey { id: cluster_id })
609                .expect("cluster exists");
610            return Err(SqlCatalogError::DuplicateReplica(
611                replica_name.to_string(),
612                cluster.name.to_string(),
613            )
614            .into());
615        };
616        Ok(())
617    }
618
619    pub fn insert_user_network_policy(
620        &mut self,
621        name: String,
622        rules: Vec<NetworkPolicyRule>,
623        privileges: Vec<MzAclItem>,
624        owner_id: RoleId,
625        temporary_oids: &HashSet<u32>,
626    ) -> Result<NetworkPolicyId, CatalogError> {
627        let oid = self.allocate_oid(temporary_oids)?;
628        let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
629        let id = NetworkPolicyId::User(id);
630        self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
631    }
632
633    pub fn insert_network_policy(
634        &mut self,
635        id: NetworkPolicyId,
636        name: String,
637        rules: Vec<NetworkPolicyRule>,
638        privileges: Vec<MzAclItem>,
639        owner_id: RoleId,
640        oid: u32,
641    ) -> Result<NetworkPolicyId, CatalogError> {
642        match self.network_policies.insert(
643            NetworkPolicyKey { id },
644            NetworkPolicyValue {
645                name: name.clone(),
646                rules,
647                privileges,
648                owner_id,
649                oid,
650            },
651            self.op_id,
652        ) {
653            Ok(_) => Ok(id),
654            Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
655        }
656    }
657
658    /// Updates persisted information about persisted introspection source
659    /// indexes.
660    ///
661    /// Panics if provided id is not a system id.
662    pub fn update_introspection_source_index_gids(
663        &mut self,
664        mappings: impl Iterator<
665            Item = (
666                ClusterId,
667                impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
668            ),
669        >,
670    ) -> Result<(), CatalogError> {
671        for (cluster_id, updates) in mappings {
672            for (name, item_id, index_id, oid) in updates {
673                let introspection_source_index = IntrospectionSourceIndex {
674                    cluster_id,
675                    name,
676                    item_id,
677                    index_id,
678                    oid,
679                };
680                let (key, value) = introspection_source_index.into_key_value();
681
682                let prev = self
683                    .introspection_sources
684                    .set(key, Some(value), self.op_id)?;
685                if prev.is_none() {
686                    return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
687                        "{index_id}"
688                    ))
689                    .into());
690                }
691            }
692        }
693        Ok(())
694    }
695
696    pub fn insert_user_item(
697        &mut self,
698        id: CatalogItemId,
699        global_id: GlobalId,
700        schema_id: SchemaId,
701        item_name: &str,
702        create_sql: String,
703        owner_id: RoleId,
704        privileges: Vec<MzAclItem>,
705        temporary_oids: &HashSet<u32>,
706        versions: BTreeMap<RelationVersion, GlobalId>,
707    ) -> Result<u32, CatalogError> {
708        let oid = self.allocate_oid(temporary_oids)?;
709        self.insert_item(
710            id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
711        )?;
712        Ok(oid)
713    }
714
715    pub fn insert_item(
716        &mut self,
717        id: CatalogItemId,
718        oid: u32,
719        global_id: GlobalId,
720        schema_id: SchemaId,
721        item_name: &str,
722        create_sql: String,
723        owner_id: RoleId,
724        privileges: Vec<MzAclItem>,
725        extra_versions: BTreeMap<RelationVersion, GlobalId>,
726    ) -> Result<(), CatalogError> {
727        match self.items.insert(
728            ItemKey { id },
729            ItemValue {
730                schema_id,
731                name: item_name.to_string(),
732                create_sql,
733                owner_id,
734                privileges,
735                oid,
736                global_id,
737                extra_versions,
738            },
739            self.op_id,
740        ) {
741            Ok(_) => Ok(()),
742            Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
743        }
744    }
745
746    pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
747        Ok(self.get_and_increment_id_by(key, 1)?.into_element())
748    }
749
750    pub fn get_and_increment_id_by(
751        &mut self,
752        key: String,
753        amount: u64,
754    ) -> Result<Vec<u64>, CatalogError> {
755        assert!(
756            key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
757            "system item IDs cannot be allocated outside of bootstrap"
758        );
759
760        let current_id = self
761            .id_allocator
762            .items()
763            .get(&IdAllocKey { name: key.clone() })
764            .unwrap_or_else(|| panic!("{key} id allocator missing"))
765            .next_id;
766        let next_id = current_id
767            .checked_add(amount)
768            .ok_or(SqlCatalogError::IdExhaustion)?;
769        let prev = self.id_allocator.set(
770            IdAllocKey { name: key },
771            Some(IdAllocValue { next_id }),
772            self.op_id,
773        )?;
774        assert_eq!(
775            prev,
776            Some(IdAllocValue {
777                next_id: current_id
778            })
779        );
780        Ok((current_id..next_id).collect())
781    }
782
783    pub fn allocate_system_item_ids(
784        &mut self,
785        amount: u64,
786    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
787        assert!(
788            !self.durable_catalog.is_bootstrap_complete(),
789            "we can only allocate system item IDs during bootstrap"
790        );
791        Ok(self
792            .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
793            .into_iter()
794            // TODO(alter_table): Use separate ID allocators.
795            .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
796            .collect())
797    }
798
799    /// Allocates an ID for an introspection source index. These IDs are deterministically derived
800    /// from the `cluster_id` and `log_variant`.
801    ///
802    /// Introspection source indexes are a special edge case of items. They are considered system
803    /// items, but they are the only system item that can be created by the user at any time. All
804    /// other system items can only be created by the system during the startup of an upgrade.
805    ///
806    /// Furthermore, all other system item IDs are allocated deterministically in the same order
807    /// during startup. Therefore, all read-only `environmentd` processes during an upgrade will
808    /// allocate the same system IDs to the same items, and due to the way catalog fencing works,
809    /// only one of them can successfully write the IDs down to the catalog. This removes the need
810    /// for `environmentd` processes to coordinate system IDs allocated during read-only mode.
811    ///
812    /// Since introspection IDs can be allocated at any time, read-only instances would either need
813    /// to coordinate across processes when allocating a new ID or allocate them deterministically.
814    /// We opted to allocate the IDs deterministically to avoid the overhead of coordination.
815    ///
816    /// Introspection source index IDs are 64 bit integers, with the following format (not to
817    /// scale):
818    ///
819    /// -------------------------------------------------------------
820    /// | Cluster ID Variant | Cluster ID Inner Value | Log Variant |
821    /// |--------------------|------------------------|-------------|
822    /// |       8-bits       |         48-bits        |   8-bits    |
823    /// -------------------------------------------------------------
824    ///
825    /// Cluster ID Variant:      A unique number indicating the variant of cluster the index belongs
826    ///                          to.
827    /// Cluster ID Inner Value:  A per variant unique number indicating the cluster the index
828    ///                          belongs to.
829    /// Log Variant:             A unique number indicating the log variant this index is on.
830    pub fn allocate_introspection_source_index_id(
831        cluster_id: &ClusterId,
832        log_variant: LogVariant,
833    ) -> (CatalogItemId, GlobalId) {
834        let cluster_variant: u8 = match cluster_id {
835            ClusterId::System(_) => 1,
836            ClusterId::User(_) => 2,
837        };
838        let cluster_id: u64 = cluster_id.inner_id();
839        const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
840        assert_eq!(
841            CLUSTER_ID_MASK & cluster_id,
842            0,
843            "invalid cluster ID: {cluster_id}"
844        );
845        let log_variant: u8 = match log_variant {
846            LogVariant::Timely(TimelyLog::Operates) => 1,
847            LogVariant::Timely(TimelyLog::Channels) => 2,
848            LogVariant::Timely(TimelyLog::Elapsed) => 3,
849            LogVariant::Timely(TimelyLog::Histogram) => 4,
850            LogVariant::Timely(TimelyLog::Addresses) => 5,
851            LogVariant::Timely(TimelyLog::Parks) => 6,
852            LogVariant::Timely(TimelyLog::MessagesSent) => 7,
853            LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
854            LogVariant::Timely(TimelyLog::Reachability) => 9,
855            LogVariant::Timely(TimelyLog::BatchesSent) => 10,
856            LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
857            LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
858            LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
859            LogVariant::Differential(DifferentialLog::Sharing) => 14,
860            LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
861            LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
862            LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
863            LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
864            LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
865            LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
866            LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
867            LogVariant::Compute(ComputeLog::PeekDuration) => 22,
868            LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
869            LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
870            LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
871            LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
872            LogVariant::Compute(ComputeLog::ErrorCount) => 28,
873            LogVariant::Compute(ComputeLog::HydrationTime) => 29,
874            LogVariant::Compute(ComputeLog::LirMapping) => 30,
875            LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
876            LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
877            LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
878        };
879
880        let mut id: u64 = u64::from(cluster_variant) << 56;
881        id |= cluster_id << 8;
882        id |= u64::from(log_variant);
883
884        (
885            CatalogItemId::IntrospectionSourceIndex(id),
886            GlobalId::IntrospectionSourceIndex(id),
887        )
888    }
889
890    pub fn allocate_user_item_ids(
891        &mut self,
892        amount: u64,
893    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
894        Ok(self
895            .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
896            .into_iter()
897            // TODO(alter_table): Use separate ID allocators.
898            .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
899            .collect())
900    }
901
902    pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
903        let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
904        Ok(ReplicaId::User(id))
905    }
906
907    pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
908        let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
909        Ok(ReplicaId::System(id))
910    }
911
912    pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
913        self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
914    }
915
916    /// Allocates `amount` OIDs. OIDs can be recycled if they aren't currently assigned to any
917    /// object.
918    #[mz_ore::instrument]
919    fn allocate_oids(
920        &mut self,
921        amount: u64,
922        temporary_oids: &HashSet<u32>,
923    ) -> Result<Vec<u32>, CatalogError> {
924        /// Struct representing an OID for a user object. Allocated OIDs can be recycled, so when we've
925        /// allocated [`u32::MAX`] we'll wrap back around to [`FIRST_USER_OID`].
926        struct UserOid(u32);
927
928        impl UserOid {
929            fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
930                if oid < FIRST_USER_OID {
931                    Err(anyhow!("invalid user OID {oid}"))
932                } else {
933                    Ok(UserOid(oid))
934                }
935            }
936        }
937
938        impl std::ops::AddAssign<u32> for UserOid {
939            fn add_assign(&mut self, rhs: u32) {
940                let (res, overflow) = self.0.overflowing_add(rhs);
941                self.0 = if overflow { FIRST_USER_OID + res } else { res };
942            }
943        }
944
945        if amount > u32::MAX.into() {
946            return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
947        }
948
949        // This is potentially slow to do everytime we allocate an OID. A faster approach might be
950        // to have an ID allocator that is updated everytime an OID is allocated or de-allocated.
951        // However, benchmarking shows that this doesn't make a noticeable difference and the other
952        // approach requires making sure that allocator always stays in-sync which can be
953        // error-prone. If DDL starts slowing down, this is a good place to try and optimize.
954        let mut allocated_oids = HashSet::with_capacity(
955            self.databases.len()
956                + self.schemas.len()
957                + self.roles.len()
958                + self.items.len()
959                + self.introspection_sources.len()
960                + temporary_oids.len(),
961        );
962        self.databases.for_values(|_, value| {
963            allocated_oids.insert(value.oid);
964        });
965        self.schemas.for_values(|_, value| {
966            allocated_oids.insert(value.oid);
967        });
968        self.roles.for_values(|_, value| {
969            allocated_oids.insert(value.oid);
970        });
971        self.items.for_values(|_, value| {
972            allocated_oids.insert(value.oid);
973        });
974        self.introspection_sources.for_values(|_, value| {
975            allocated_oids.insert(value.oid);
976        });
977
978        let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
979
980        let start_oid: u32 = self
981            .id_allocator
982            .items()
983            .get(&IdAllocKey {
984                name: OID_ALLOC_KEY.to_string(),
985            })
986            .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
987            .next_id
988            .try_into()
989            .expect("we should never persist an oid outside of the u32 range");
990        let mut current_oid = UserOid::new(start_oid)
991            .expect("we should never persist an oid outside of user OID range");
992        let mut oids = Vec::new();
993        while oids.len() < u64_to_usize(amount) {
994            if !is_allocated(current_oid.0) {
995                oids.push(current_oid.0);
996            }
997            current_oid += 1;
998
999            if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
1000                // We've exhausted all possible OIDs and still don't have `amount`.
1001                return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
1002            }
1003        }
1004
1005        let next_id = current_oid.0;
1006        let prev = self.id_allocator.set(
1007            IdAllocKey {
1008                name: OID_ALLOC_KEY.to_string(),
1009            },
1010            Some(IdAllocValue {
1011                next_id: next_id.into(),
1012            }),
1013            self.op_id,
1014        )?;
1015        assert_eq!(
1016            prev,
1017            Some(IdAllocValue {
1018                next_id: start_oid.into(),
1019            })
1020        );
1021
1022        Ok(oids)
1023    }
1024
1025    /// Allocates a single OID. OIDs can be recycled if they aren't currently assigned to any
1026    /// object.
1027    pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1028        self.allocate_oids(1, temporary_oids)
1029            .map(|oids| oids.into_element())
1030    }
1031
1032    /// Exports the current state of this transaction as a [`Snapshot`].
1033    ///
1034    /// This merges each `TableTransaction`'s initial data with its pending
1035    /// changes to produce the current view, then converts back to proto types.
1036    /// Used to persist transaction state between incremental DDL dry runs so
1037    /// the next dry run's fresh `Transaction` starts in sync with the
1038    /// accumulated `CatalogState`.
1039    pub fn current_snapshot(&self) -> Snapshot {
1040        Snapshot {
1041            databases: self.databases.current_items_proto(),
1042            schemas: self.schemas.current_items_proto(),
1043            roles: self.roles.current_items_proto(),
1044            role_auth: self.role_auth.current_items_proto(),
1045            items: self.items.current_items_proto(),
1046            comments: self.comments.current_items_proto(),
1047            clusters: self.clusters.current_items_proto(),
1048            network_policies: self.network_policies.current_items_proto(),
1049            cluster_replicas: self.cluster_replicas.current_items_proto(),
1050            introspection_sources: self.introspection_sources.current_items_proto(),
1051            id_allocator: self.id_allocator.current_items_proto(),
1052            configs: self.configs.current_items_proto(),
1053            settings: self.settings.current_items_proto(),
1054            system_object_mappings: self.system_gid_mapping.current_items_proto(),
1055            system_configurations: self.system_configurations.current_items_proto(),
1056            cluster_system_configurations: self.cluster_system_configurations.current_items_proto(),
1057            replica_system_configurations: self.replica_system_configurations.current_items_proto(),
1058            default_privileges: self.default_privileges.current_items_proto(),
1059            source_references: self.source_references.current_items_proto(),
1060            system_privileges: self.system_privileges.current_items_proto(),
1061            storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1062            unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1063            txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1064        }
1065    }
1066
1067    pub(crate) fn insert_id_allocator(
1068        &mut self,
1069        name: String,
1070        next_id: u64,
1071    ) -> Result<(), CatalogError> {
1072        match self.id_allocator.insert(
1073            IdAllocKey { name: name.clone() },
1074            IdAllocValue { next_id },
1075            self.op_id,
1076        ) {
1077            Ok(_) => Ok(()),
1078            Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1079        }
1080    }
1081
1082    /// Removes the database `id` from the transaction.
1083    ///
1084    /// Returns an error if `id` is not found.
1085    ///
1086    /// Runtime is linear with respect to the total number of databases in the catalog.
1087    /// DO NOT call this function in a loop, use [`Self::remove_databases`] instead.
1088    pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1089        let prev = self
1090            .databases
1091            .set(DatabaseKey { id: *id }, None, self.op_id)?;
1092        if prev.is_some() {
1093            Ok(())
1094        } else {
1095            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1096        }
1097    }
1098
1099    /// Removes all databases in `databases` from the transaction.
1100    ///
1101    /// Returns an error if any id in `databases` is not found.
1102    ///
1103    /// NOTE: On error, there still may be some databases removed from the transaction. It
1104    /// is up to the caller to either abort the transaction or commit.
1105    pub fn remove_databases(
1106        &mut self,
1107        databases: &BTreeSet<DatabaseId>,
1108    ) -> Result<(), CatalogError> {
1109        if databases.is_empty() {
1110            return Ok(());
1111        }
1112
1113        let to_remove = databases
1114            .iter()
1115            .map(|id| (DatabaseKey { id: *id }, None))
1116            .collect();
1117        let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1118        prev.retain(|_k, val| val.is_none());
1119
1120        if !prev.is_empty() {
1121            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1122            return Err(SqlCatalogError::UnknownDatabase(err).into());
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// Removes the schema identified by `database_id` and `schema_id` from the transaction.
1129    ///
1130    /// Returns an error if `(database_id, schema_id)` is not found.
1131    ///
1132    /// Runtime is linear with respect to the total number of schemas in the catalog.
1133    /// DO NOT call this function in a loop, use [`Self::remove_schemas`] instead.
1134    pub fn remove_schema(
1135        &mut self,
1136        database_id: &Option<DatabaseId>,
1137        schema_id: &SchemaId,
1138    ) -> Result<(), CatalogError> {
1139        let prev = self
1140            .schemas
1141            .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1142        if prev.is_some() {
1143            Ok(())
1144        } else {
1145            let database_name = match database_id {
1146                Some(id) => format!("{id}."),
1147                None => "".to_string(),
1148            };
1149            Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1150        }
1151    }
1152
1153    /// Removes all schemas in `schemas` from the transaction.
1154    ///
1155    /// Returns an error if any id in `schemas` is not found.
1156    ///
1157    /// NOTE: On error, there still may be some schemas removed from the transaction. It
1158    /// is up to the caller to either abort the transaction or commit.
1159    pub fn remove_schemas(
1160        &mut self,
1161        schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1162    ) -> Result<(), CatalogError> {
1163        if schemas.is_empty() {
1164            return Ok(());
1165        }
1166
1167        let to_remove = schemas
1168            .iter()
1169            .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1170            .collect();
1171        let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1172        prev.retain(|_k, v| v.is_none());
1173
1174        if !prev.is_empty() {
1175            let err = prev
1176                .keys()
1177                .map(|k| {
1178                    let db_spec = schemas.get(&k.id).expect("should_exist");
1179                    let db_name = match db_spec {
1180                        ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1181                        ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1182                    };
1183                    format!("{}.{}", db_name, k.id)
1184                })
1185                .join(", ");
1186
1187            return Err(SqlCatalogError::UnknownSchema(err).into());
1188        }
1189
1190        Ok(())
1191    }
1192
1193    pub fn remove_source_references(
1194        &mut self,
1195        source_id: CatalogItemId,
1196    ) -> Result<(), CatalogError> {
1197        let deleted = self
1198            .source_references
1199            .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1200            .is_some();
1201        if deleted {
1202            Ok(())
1203        } else {
1204            Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1205        }
1206    }
1207
1208    /// Removes all user roles in `roles` from the transaction.
1209    ///
1210    /// Returns an error if any id in `roles` is not found.
1211    ///
1212    /// NOTE: On error, there still may be some roles removed from the transaction. It
1213    /// is up to the caller to either abort the transaction or commit.
1214    pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1215        assert!(
1216            roles.iter().all(|id| id.is_user()),
1217            "cannot delete non-user roles"
1218        );
1219        self.remove_roles(roles)
1220    }
1221
1222    /// Removes all roles in `roles` from the transaction.
1223    ///
1224    /// Returns an error if any id in `roles` is not found.
1225    ///
1226    /// NOTE: On error, there still may be some roles removed from the transaction. It
1227    /// is up to the caller to either abort the transaction or commit.
1228    pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1229        if roles.is_empty() {
1230            return Ok(());
1231        }
1232
1233        let to_remove_keys = roles
1234            .iter()
1235            .map(|role_id| RoleKey { id: *role_id })
1236            .collect::<Vec<_>>();
1237
1238        let to_remove_roles = to_remove_keys
1239            .iter()
1240            .map(|role_key| (role_key.clone(), None))
1241            .collect();
1242
1243        let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1244
1245        let to_remove_role_auth = to_remove_keys
1246            .iter()
1247            .map(|role_key| {
1248                (
1249                    RoleAuthKey {
1250                        role_id: role_key.id,
1251                    },
1252                    None,
1253                )
1254            })
1255            .collect();
1256
1257        let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1258
1259        prev.retain(|_k, v| v.is_none());
1260        if !prev.is_empty() {
1261            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1262            return Err(SqlCatalogError::UnknownRole(err).into());
1263        }
1264
1265        role_auth_prev.retain(|_k, v| v.is_none());
1266        // The reason we don't to the same check as above is that the role auth table
1267        // is not required to have all roles in the role table.
1268
1269        Ok(())
1270    }
1271
1272    /// Removes all cluster in `clusters` from the transaction.
1273    ///
1274    /// Returns an error if any id in `clusters` is not found.
1275    ///
1276    /// NOTE: On error, there still may be some clusters removed from the transaction. It is up to
1277    /// the caller to either abort the transaction or commit.
1278    pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1279        if clusters.is_empty() {
1280            return Ok(());
1281        }
1282
1283        let to_remove = clusters
1284            .iter()
1285            .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1286            .collect();
1287        let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1288
1289        prev.retain(|_k, v| v.is_none());
1290        if !prev.is_empty() {
1291            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1292            return Err(SqlCatalogError::UnknownCluster(err).into());
1293        }
1294
1295        // Cascade delete introspection sources and cluster replicas.
1296        //
1297        // TODO(benesch): this doesn't seem right. Cascade deletions should
1298        // be entirely the domain of the higher catalog layer, not the
1299        // storage layer.
1300        self.cluster_replicas
1301            .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1302        self.introspection_sources
1303            .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1304
1305        Ok(())
1306    }
1307
1308    /// Removes the cluster replica `id` from the transaction.
1309    ///
1310    /// Returns an error if `id` is not found.
1311    ///
1312    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1313    /// DO NOT call this function in a loop, use [`Self::remove_cluster_replicas`] instead.
1314    pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1315        let deleted = self
1316            .cluster_replicas
1317            .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1318            .is_some();
1319        if deleted {
1320            Ok(())
1321        } else {
1322            Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1323        }
1324    }
1325
1326    /// Removes all cluster replicas in `replicas` from the transaction.
1327    ///
1328    /// Returns an error if any id in `replicas` is not found.
1329    ///
1330    /// NOTE: On error, there still may be some cluster replicas removed from the transaction. It
1331    /// is up to the caller to either abort the transaction or commit.
1332    pub fn remove_cluster_replicas(
1333        &mut self,
1334        replicas: &BTreeSet<ReplicaId>,
1335    ) -> Result<(), CatalogError> {
1336        if replicas.is_empty() {
1337            return Ok(());
1338        }
1339
1340        let to_remove = replicas
1341            .iter()
1342            .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1343            .collect();
1344        let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1345
1346        prev.retain(|_k, v| v.is_none());
1347        if !prev.is_empty() {
1348            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1349            return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1350        }
1351
1352        Ok(())
1353    }
1354
1355    /// Removes item `id` from the transaction.
1356    ///
1357    /// Returns an error if `id` is not found.
1358    ///
1359    /// Runtime is linear with respect to the total number of items in the catalog.
1360    /// DO NOT call this function in a loop, use [`Self::remove_items`] instead.
1361    pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1362        let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1363        if prev.is_some() {
1364            Ok(())
1365        } else {
1366            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1367        }
1368    }
1369
1370    /// Removes all items in `ids` from the transaction.
1371    ///
1372    /// Returns an error if any id in `ids` is not found.
1373    ///
1374    /// NOTE: On error, there still may be some items removed from the transaction. It is
1375    /// up to the caller to either abort the transaction or commit.
1376    pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1377        if ids.is_empty() {
1378            return Ok(());
1379        }
1380
1381        let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1382        let n = self.items.delete_by_keys(ks, self.op_id).len();
1383        if n == ids.len() {
1384            Ok(())
1385        } else {
1386            let item_ids = self.items.items().keys().map(|k| k.id).collect();
1387            let mut unknown = ids.difference(&item_ids);
1388            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1389        }
1390    }
1391
1392    /// Removes all system object mappings in `descriptions` from the transaction.
1393    ///
1394    /// Returns an error if any description in `descriptions` is not found.
1395    ///
1396    /// NOTE: On error, there still may be some items removed from the transaction. It is
1397    /// up to the caller to either abort the transaction or commit.
1398    pub fn remove_system_object_mappings(
1399        &mut self,
1400        descriptions: BTreeSet<SystemObjectDescription>,
1401    ) -> Result<(), CatalogError> {
1402        if descriptions.is_empty() {
1403            return Ok(());
1404        }
1405
1406        let ks: Vec<_> = descriptions
1407            .clone()
1408            .into_iter()
1409            .map(|desc| GidMappingKey {
1410                schema_name: desc.schema_name,
1411                object_type: desc.object_type,
1412                object_name: desc.object_name,
1413            })
1414            .collect();
1415        let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1416
1417        if n == descriptions.len() {
1418            Ok(())
1419        } else {
1420            let item_descriptions = self
1421                .system_gid_mapping
1422                .items()
1423                .keys()
1424                .map(|k| SystemObjectDescription {
1425                    schema_name: k.schema_name.clone(),
1426                    object_type: k.object_type.clone(),
1427                    object_name: k.object_name.clone(),
1428                })
1429                .collect();
1430            let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1431                format!(
1432                    "{} {}.{}",
1433                    desc.object_type, desc.schema_name, desc.object_name
1434                )
1435            });
1436            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1437        }
1438    }
1439
1440    /// Removes all introspection source indexes in `indexes` from the transaction.
1441    ///
1442    /// Returns an error if any index in `indexes` is not found.
1443    ///
1444    /// NOTE: On error, there still may be some indexes removed from the transaction. It is
1445    /// up to the caller to either abort the transaction or commit.
1446    pub fn remove_introspection_source_indexes(
1447        &mut self,
1448        introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1449    ) -> Result<(), CatalogError> {
1450        if introspection_source_indexes.is_empty() {
1451            return Ok(());
1452        }
1453
1454        let ks: Vec<_> = introspection_source_indexes
1455            .clone()
1456            .into_iter()
1457            .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1458            .collect();
1459        let n = self
1460            .introspection_sources
1461            .delete_by_keys(ks, self.op_id)
1462            .len();
1463        if n == introspection_source_indexes.len() {
1464            Ok(())
1465        } else {
1466            let txn_indexes = self
1467                .introspection_sources
1468                .items()
1469                .keys()
1470                .map(|k| (k.cluster_id, k.name.clone()))
1471                .collect();
1472            let mut unknown = introspection_source_indexes
1473                .difference(&txn_indexes)
1474                .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1475            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1476        }
1477    }
1478
1479    /// Updates item `id` in the transaction to `item_name` and `item`.
1480    ///
1481    /// Returns an error if `id` is not found.
1482    ///
1483    /// Runtime is linear with respect to the total number of items in the catalog.
1484    /// DO NOT call this function in a loop, use [`Self::update_items`] instead.
1485    pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1486        let updated =
1487            self.items
1488                .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1489        if updated {
1490            Ok(())
1491        } else {
1492            Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1493        }
1494    }
1495
1496    /// Updates all items with ids matching the keys of `items` in the transaction, to the
1497    /// corresponding value in `items`.
1498    ///
1499    /// Returns an error if any id in `items` is not found.
1500    ///
1501    /// NOTE: On error, there still may be some items updated in the transaction. It is
1502    /// up to the caller to either abort the transaction or commit.
1503    pub fn update_items(
1504        &mut self,
1505        items: BTreeMap<CatalogItemId, Item>,
1506    ) -> Result<(), CatalogError> {
1507        if items.is_empty() {
1508            return Ok(());
1509        }
1510
1511        let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1512        let kvs: Vec<_> = items
1513            .clone()
1514            .into_iter()
1515            .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1516            .collect();
1517        let n = self.items.update_by_keys(kvs, self.op_id)?;
1518        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1519        if n == update_ids.len() {
1520            Ok(())
1521        } else {
1522            let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1523            let mut unknown = update_ids.difference(&item_ids);
1524            Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1525        }
1526    }
1527
1528    /// Updates role `id` in the transaction to `role`.
1529    ///
1530    /// Returns an error if `id` is not found.
1531    ///
1532    /// Runtime is linear with respect to the total number of items in the catalog.
1533    /// DO NOT call this function in a loop, implement and use some `Self::update_roles` instead.
1534    /// You should model it after [`Self::update_items`].
1535    pub fn update_role(
1536        &mut self,
1537        id: RoleId,
1538        role: Role,
1539        password: PasswordAction,
1540    ) -> Result<(), CatalogError> {
1541        let key = RoleKey { id };
1542        if self.roles.get(&key).is_some() {
1543            let auth_key = RoleAuthKey { role_id: id };
1544
1545            match password {
1546                PasswordAction::Set(new_password) => {
1547                    let hash = mz_auth::hash::scram256_hash(
1548                        &new_password.password,
1549                        &new_password.scram_iterations,
1550                    )
1551                    .expect("password hash should be valid");
1552                    let value = RoleAuthValue {
1553                        password_hash: Some(hash),
1554                        updated_at: SYSTEM_TIME(),
1555                    };
1556
1557                    if self.role_auth.get(&auth_key).is_some() {
1558                        self.role_auth
1559                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1560                    } else {
1561                        self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1562                    }
1563                }
1564                PasswordAction::Clear => {
1565                    let value = RoleAuthValue {
1566                        password_hash: None,
1567                        updated_at: SYSTEM_TIME(),
1568                    };
1569                    if self.role_auth.get(&auth_key).is_some() {
1570                        self.role_auth
1571                            .update_by_key(auth_key.clone(), value, self.op_id)?;
1572                    }
1573                }
1574                PasswordAction::NoChange => {}
1575            }
1576
1577            self.roles
1578                .update_by_key(key, role.into_key_value().1, self.op_id)?;
1579
1580            Ok(())
1581        } else {
1582            Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1583        }
1584    }
1585
1586    /// Updates all [`Role`]s with ids matching the keys of `roles` in the transaction, to the
1587    /// corresponding value in `roles`.
1588    ///
1589    /// This function does *not* write role_authentication information to the catalog.
1590    /// It is purely for updating the role itself.
1591    ///
1592    /// Returns an error if any id in `roles` is not found.
1593    ///
1594    /// NOTE: On error, there still may be some roles updated in the transaction. It is
1595    /// up to the caller to either abort the transaction or commit.
1596    pub fn update_roles_without_auth(
1597        &mut self,
1598        roles: BTreeMap<RoleId, Role>,
1599    ) -> Result<(), CatalogError> {
1600        if roles.is_empty() {
1601            return Ok(());
1602        }
1603
1604        let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1605        let kvs: Vec<_> = roles
1606            .into_iter()
1607            .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1608            .collect();
1609        let n = self.roles.update_by_keys(kvs, self.op_id)?;
1610        let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1611
1612        if n == update_role_ids.len() {
1613            Ok(())
1614        } else {
1615            let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1616            let mut unknown = update_role_ids.difference(&role_ids);
1617            Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1618        }
1619    }
1620
1621    /// Updates persisted mapping from system objects to global IDs and fingerprints. Each element
1622    /// of `mappings` should be (old-global-id, new-system-object-mapping).
1623    ///
1624    /// Panics if provided id is not a system id.
1625    pub fn update_system_object_mappings(
1626        &mut self,
1627        mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1628    ) -> Result<(), CatalogError> {
1629        if mappings.is_empty() {
1630            return Ok(());
1631        }
1632
1633        let n = self.system_gid_mapping.update(
1634            |_k, v| {
1635                if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1636                    let (_, new_value) = mapping.clone().into_key_value();
1637                    Some(new_value)
1638                } else {
1639                    None
1640                }
1641            },
1642            self.op_id,
1643        )?;
1644
1645        if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1646            != mappings.len()
1647        {
1648            let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1649            return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1650        }
1651
1652        Ok(())
1653    }
1654
1655    /// Updates cluster `id` in the transaction to `cluster`.
1656    ///
1657    /// Returns an error if `id` is not found.
1658    ///
1659    /// Runtime is linear with respect to the total number of clusters in the catalog.
1660    /// DO NOT call this function in a loop.
1661    pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1662        let updated = self.clusters.update_by_key(
1663            ClusterKey { id },
1664            cluster.into_key_value().1,
1665            self.op_id,
1666        )?;
1667        if updated {
1668            Ok(())
1669        } else {
1670            Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1671        }
1672    }
1673
1674    /// Updates cluster replica `replica_id` in the transaction to `replica`.
1675    ///
1676    /// Returns an error if `replica_id` is not found.
1677    ///
1678    /// Runtime is linear with respect to the total number of cluster replicas in the catalog.
1679    /// DO NOT call this function in a loop.
1680    pub fn update_cluster_replica(
1681        &mut self,
1682        replica_id: ReplicaId,
1683        replica: ClusterReplica,
1684    ) -> Result<(), CatalogError> {
1685        let updated = self.cluster_replicas.update_by_key(
1686            ClusterReplicaKey { id: replica_id },
1687            replica.into_key_value().1,
1688            self.op_id,
1689        )?;
1690        if updated {
1691            Ok(())
1692        } else {
1693            Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1694        }
1695    }
1696
1697    /// Updates database `id` in the transaction to `database`.
1698    ///
1699    /// Returns an error if `id` is not found.
1700    ///
1701    /// Runtime is linear with respect to the total number of databases in the catalog.
1702    /// DO NOT call this function in a loop.
1703    pub fn update_database(
1704        &mut self,
1705        id: DatabaseId,
1706        database: Database,
1707    ) -> Result<(), CatalogError> {
1708        let updated = self.databases.update_by_key(
1709            DatabaseKey { id },
1710            database.into_key_value().1,
1711            self.op_id,
1712        )?;
1713        if updated {
1714            Ok(())
1715        } else {
1716            Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1717        }
1718    }
1719
1720    /// Updates schema `schema_id` in the transaction to `schema`.
1721    ///
1722    /// Returns an error if `schema_id` is not found.
1723    ///
1724    /// Runtime is linear with respect to the total number of schemas in the catalog.
1725    /// DO NOT call this function in a loop.
1726    pub fn update_schema(
1727        &mut self,
1728        schema_id: SchemaId,
1729        schema: Schema,
1730    ) -> Result<(), CatalogError> {
1731        let updated = self.schemas.update_by_key(
1732            SchemaKey { id: schema_id },
1733            schema.into_key_value().1,
1734            self.op_id,
1735        )?;
1736        if updated {
1737            Ok(())
1738        } else {
1739            Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1740        }
1741    }
1742
1743    /// Updates `network_policy_id` in the transaction to `network policy`.
1744    ///
1745    /// Returns an error if `id` is not found.
1746    ///
1747    /// Runtime is linear with respect to the total number of databases in the catalog.
1748    /// DO NOT call this function in a loop.
1749    pub fn update_network_policy(
1750        &mut self,
1751        id: NetworkPolicyId,
1752        network_policy: NetworkPolicy,
1753    ) -> Result<(), CatalogError> {
1754        let updated = self.network_policies.update_by_key(
1755            NetworkPolicyKey { id },
1756            network_policy.into_key_value().1,
1757            self.op_id,
1758        )?;
1759        if updated {
1760            Ok(())
1761        } else {
1762            Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1763        }
1764    }
1765    /// Removes all network policies in `network policies` from the transaction.
1766    ///
1767    /// Returns an error if any id in `network policy` is not found.
1768    ///
1769    /// NOTE: On error, there still may be some roles removed from the transaction. It
1770    /// is up to the caller to either abort the transaction or commit.
1771    pub fn remove_network_policies(
1772        &mut self,
1773        network_policies: &BTreeSet<NetworkPolicyId>,
1774    ) -> Result<(), CatalogError> {
1775        if network_policies.is_empty() {
1776            return Ok(());
1777        }
1778
1779        let to_remove = network_policies
1780            .iter()
1781            .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1782            .collect();
1783        let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1784        assert!(
1785            prev.iter().all(|(k, _)| k.id.is_user()),
1786            "cannot delete non-user network policy"
1787        );
1788
1789        prev.retain(|_k, v| v.is_none());
1790        if !prev.is_empty() {
1791            let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1792            return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1793        }
1794
1795        Ok(())
1796    }
1797    /// Set persisted default privilege.
1798    ///
1799    /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead.
1800    pub fn set_default_privilege(
1801        &mut self,
1802        role_id: RoleId,
1803        database_id: Option<DatabaseId>,
1804        schema_id: Option<SchemaId>,
1805        object_type: ObjectType,
1806        grantee: RoleId,
1807        privileges: Option<AclMode>,
1808    ) -> Result<(), CatalogError> {
1809        self.default_privileges.set(
1810            DefaultPrivilegesKey {
1811                role_id,
1812                database_id,
1813                schema_id,
1814                object_type,
1815                grantee,
1816            },
1817            privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1818            self.op_id,
1819        )?;
1820        Ok(())
1821    }
1822
1823    /// Set persisted default privileges.
1824    pub fn set_default_privileges(
1825        &mut self,
1826        default_privileges: Vec<DefaultPrivilege>,
1827    ) -> Result<(), CatalogError> {
1828        if default_privileges.is_empty() {
1829            return Ok(());
1830        }
1831
1832        let default_privileges = default_privileges
1833            .into_iter()
1834            .map(DurableType::into_key_value)
1835            .map(|(k, v)| (k, Some(v)))
1836            .collect();
1837        self.default_privileges
1838            .set_many(default_privileges, self.op_id)?;
1839        Ok(())
1840    }
1841
1842    /// Set persisted system privilege.
1843    ///
1844    /// DO NOT call this function in a loop, use [`Self::set_system_privileges`] instead.
1845    pub fn set_system_privilege(
1846        &mut self,
1847        grantee: RoleId,
1848        grantor: RoleId,
1849        acl_mode: Option<AclMode>,
1850    ) -> Result<(), CatalogError> {
1851        self.system_privileges.set(
1852            SystemPrivilegesKey { grantee, grantor },
1853            acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1854            self.op_id,
1855        )?;
1856        Ok(())
1857    }
1858
1859    /// Set persisted system privileges.
1860    pub fn set_system_privileges(
1861        &mut self,
1862        system_privileges: Vec<MzAclItem>,
1863    ) -> Result<(), CatalogError> {
1864        if system_privileges.is_empty() {
1865            return Ok(());
1866        }
1867
1868        let system_privileges = system_privileges
1869            .into_iter()
1870            .map(DurableType::into_key_value)
1871            .map(|(k, v)| (k, Some(v)))
1872            .collect();
1873        self.system_privileges
1874            .set_many(system_privileges, self.op_id)?;
1875        Ok(())
1876    }
1877
1878    /// Set persisted setting.
1879    pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1880        self.settings.set(
1881            SettingKey { name },
1882            value.map(|value| SettingValue { value }),
1883            self.op_id,
1884        )?;
1885        Ok(())
1886    }
1887
1888    pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1889        self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1890    }
1891
1892    /// Insert persisted introspection source index.
1893    pub fn insert_introspection_source_indexes(
1894        &mut self,
1895        introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1896        temporary_oids: &HashSet<u32>,
1897    ) -> Result<(), CatalogError> {
1898        if introspection_source_indexes.is_empty() {
1899            return Ok(());
1900        }
1901
1902        let amount = usize_to_u64(introspection_source_indexes.len());
1903        let oids = self.allocate_oids(amount, temporary_oids)?;
1904        let introspection_source_indexes: Vec<_> = introspection_source_indexes
1905            .into_iter()
1906            .zip_eq(oids)
1907            .map(
1908                |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1909                    cluster_id,
1910                    name,
1911                    item_id,
1912                    index_id,
1913                    oid,
1914                },
1915            )
1916            .collect();
1917
1918        for introspection_source_index in introspection_source_indexes {
1919            let (key, value) = introspection_source_index.into_key_value();
1920            self.introspection_sources.insert(key, value, self.op_id)?;
1921        }
1922
1923        Ok(())
1924    }
1925
1926    /// Set persisted system object mappings.
1927    pub fn set_system_object_mappings(
1928        &mut self,
1929        mappings: Vec<SystemObjectMapping>,
1930    ) -> Result<(), CatalogError> {
1931        if mappings.is_empty() {
1932            return Ok(());
1933        }
1934
1935        let mappings = mappings
1936            .into_iter()
1937            .map(DurableType::into_key_value)
1938            .map(|(k, v)| (k, Some(v)))
1939            .collect();
1940        self.system_gid_mapping.set_many(mappings, self.op_id)?;
1941        Ok(())
1942    }
1943
1944    /// Set persisted replica.
1945    pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1946        if replicas.is_empty() {
1947            return Ok(());
1948        }
1949
1950        let replicas = replicas
1951            .into_iter()
1952            .map(DurableType::into_key_value)
1953            .map(|(k, v)| (k, Some(v)))
1954            .collect();
1955        self.cluster_replicas.set_many(replicas, self.op_id)?;
1956        Ok(())
1957    }
1958
1959    /// Set persisted configuration.
1960    pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1961        match value {
1962            Some(value) => {
1963                let config = Config { key, value };
1964                let (key, value) = config.into_key_value();
1965                self.configs.set(key, Some(value), self.op_id)?;
1966            }
1967            None => {
1968                self.configs.set(ConfigKey { key }, None, self.op_id)?;
1969            }
1970        }
1971        Ok(())
1972    }
1973
1974    /// Get the value of a persisted config.
1975    pub fn get_config(&self, key: String) -> Option<u64> {
1976        self.configs
1977            .get(&ConfigKey { key })
1978            .map(|entry| entry.value)
1979    }
1980
1981    /// Get the value of a persisted setting.
1982    pub fn get_setting(&self, name: String) -> Option<&str> {
1983        self.settings
1984            .get(&SettingKey { name })
1985            .map(|entry| &*entry.value)
1986    }
1987
1988    pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1989        self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1990            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1991    }
1992
1993    pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1994        self.set_setting(
1995            BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1996            Some(shard_id.to_string()),
1997        )
1998    }
1999
2000    pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
2001        self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
2002            .map(|shard_id| shard_id.parse().expect("valid ShardId"))
2003    }
2004
2005    pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
2006        self.set_setting(
2007            EXPRESSION_CACHE_SHARD_KEY.to_string(),
2008            Some(shard_id.to_string()),
2009        )
2010    }
2011
2012    /// Updates the catalog `with_0dt_deployment_max_wait` "config" value to
2013    /// match the `with_0dt_deployment_max_wait` "system var" value.
2014    ///
2015    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2016    /// but use it in boot before Launch Darkly is available.
2017    pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2018        self.set_config(
2019            WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2020            Some(
2021                value
2022                    .as_millis()
2023                    .try_into()
2024                    .expect("max wait fits into u64"),
2025            ),
2026        )
2027    }
2028
2029    /// Updates the catalog `with_0dt_deployment_ddl_check_interval` "config"
2030    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2031    /// value.
2032    ///
2033    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2034    /// but use it in boot before Launch Darkly is available.
2035    pub fn set_0dt_deployment_ddl_check_interval(
2036        &mut self,
2037        value: Duration,
2038    ) -> Result<(), CatalogError> {
2039        self.set_config(
2040            WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2041            Some(
2042                value
2043                    .as_millis()
2044                    .try_into()
2045                    .expect("ddl check interval fits into u64"),
2046            ),
2047        )
2048    }
2049
2050    /// Updates the catalog `0dt_deployment_panic_after_timeout` "config" value to
2051    /// match the `0dt_deployment_panic_after_timeout` "system var" value.
2052    ///
2053    /// These are mirrored so that we can toggle the flag with Launch Darkly,
2054    /// but use it in boot before Launch Darkly is available.
2055    pub fn set_enable_0dt_deployment_panic_after_timeout(
2056        &mut self,
2057        value: bool,
2058    ) -> Result<(), CatalogError> {
2059        self.set_config(
2060            ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2061            Some(u64::from(value)),
2062        )
2063    }
2064
2065    /// Removes the catalog `with_0dt_deployment_max_wait` "config" value to
2066    /// match the `with_0dt_deployment_max_wait` "system var" value.
2067    ///
2068    /// These are mirrored so that we can toggle the flag with LaunchDarkly,
2069    /// but use it in boot before LaunchDarkly is available.
2070    pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2071        self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2072    }
2073
2074    /// Removes the catalog `with_0dt_deployment_ddl_check_interval` "config"
2075    /// value to match the `with_0dt_deployment_ddl_check_interval` "system var"
2076    /// value.
2077    ///
2078    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2079    /// use it in boot before LaunchDarkly is available.
2080    pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2081        self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2082    }
2083
2084    /// Removes the catalog `enable_0dt_deployment_panic_after_timeout` "config"
2085    /// value to match the `enable_0dt_deployment_panic_after_timeout` "system
2086    /// var" value.
2087    ///
2088    /// These are mirrored so that we can toggle the flag with LaunchDarkly, but
2089    /// use it in boot before LaunchDarkly is available.
2090    pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2091        self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2092    }
2093
2094    /// Updates the catalog `system_config_synced` "config" value to true.
2095    pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2096        self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2097    }
2098
2099    pub fn update_comment(
2100        &mut self,
2101        object_id: CommentObjectId,
2102        sub_component: Option<usize>,
2103        comment: Option<String>,
2104    ) -> Result<(), CatalogError> {
2105        let key = CommentKey {
2106            object_id,
2107            sub_component,
2108        };
2109        let value = comment.map(|c| CommentValue { comment: c });
2110        self.comments.set(key, value, self.op_id)?;
2111
2112        Ok(())
2113    }
2114
2115    pub fn drop_comments(
2116        &mut self,
2117        object_ids: &BTreeSet<CommentObjectId>,
2118    ) -> Result<(), CatalogError> {
2119        if object_ids.is_empty() {
2120            return Ok(());
2121        }
2122
2123        self.comments
2124            .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2125        Ok(())
2126    }
2127
2128    pub fn update_source_references(
2129        &mut self,
2130        source_id: CatalogItemId,
2131        references: Vec<SourceReference>,
2132        updated_at: u64,
2133    ) -> Result<(), CatalogError> {
2134        let key = SourceReferencesKey { source_id };
2135        let value = SourceReferencesValue {
2136            references,
2137            updated_at,
2138        };
2139        self.source_references.set(key, Some(value), self.op_id)?;
2140        Ok(())
2141    }
2142
2143    /// Upserts persisted system configuration `name` to `value`.
2144    pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2145        let key = ServerConfigurationKey {
2146            name: name.to_string(),
2147        };
2148        let value = ServerConfigurationValue { value };
2149        self.system_configurations
2150            .set(key, Some(value), self.op_id)?;
2151        Ok(())
2152    }
2153
2154    /// Removes persisted system configuration `name`.
2155    pub fn remove_system_config(&mut self, name: &str) {
2156        let key = ServerConfigurationKey {
2157            name: name.to_string(),
2158        };
2159        self.system_configurations
2160            .set(key, None, self.op_id)
2161            .expect("cannot have uniqueness violation");
2162    }
2163
2164    /// Removes all persisted system configurations.
2165    pub fn clear_system_configs(&mut self) {
2166        self.system_configurations.delete(|_k, _v| true, self.op_id);
2167    }
2168
2169    /// Returns the persisted cluster-coherent scoped system configurations.
2170    pub fn get_cluster_system_configurations(
2171        &self,
2172    ) -> impl Iterator<Item = ClusterSystemConfiguration> + use<'_> {
2173        self.cluster_system_configurations
2174            .items()
2175            .into_iter()
2176            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2177    }
2178
2179    /// Upserts the persisted cluster-coherent scoped configuration `name` = `value`
2180    /// for `cluster_id`.
2181    pub fn upsert_cluster_system_config(
2182        &mut self,
2183        cluster_id: ClusterId,
2184        name: &str,
2185        value: String,
2186    ) -> Result<(), CatalogError> {
2187        let key = ClusterSystemConfigurationKey {
2188            cluster_id,
2189            name: name.to_string(),
2190        };
2191        let value = ClusterSystemConfigurationValue { value };
2192        self.cluster_system_configurations
2193            .set(key, Some(value), self.op_id)?;
2194        Ok(())
2195    }
2196
2197    /// Removes the persisted cluster-coherent scoped configuration `name` for
2198    /// `cluster_id`.
2199    pub fn remove_cluster_system_config(&mut self, cluster_id: ClusterId, name: &str) {
2200        let key = ClusterSystemConfigurationKey {
2201            cluster_id,
2202            name: name.to_string(),
2203        };
2204        self.cluster_system_configurations
2205            .set(key, None, self.op_id)
2206            .expect("cannot have uniqueness violation");
2207    }
2208
2209    /// Returns the persisted replica-local scoped system configurations.
2210    pub fn get_replica_system_configurations(
2211        &self,
2212    ) -> impl Iterator<Item = ReplicaSystemConfiguration> + use<'_> {
2213        self.replica_system_configurations
2214            .items()
2215            .into_iter()
2216            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2217    }
2218
2219    /// Upserts the persisted replica-local scoped configuration `name` = `value`
2220    /// for `replica_id`.
2221    pub fn upsert_replica_system_config(
2222        &mut self,
2223        replica_id: ReplicaId,
2224        name: &str,
2225        value: String,
2226    ) -> Result<(), CatalogError> {
2227        let key = ReplicaSystemConfigurationKey {
2228            replica_id,
2229            name: name.to_string(),
2230        };
2231        let value = ReplicaSystemConfigurationValue { value };
2232        self.replica_system_configurations
2233            .set(key, Some(value), self.op_id)?;
2234        Ok(())
2235    }
2236
2237    /// Removes the persisted replica-local scoped configuration `name` for
2238    /// `replica_id`.
2239    pub fn remove_replica_system_config(&mut self, replica_id: ReplicaId, name: &str) {
2240        let key = ReplicaSystemConfigurationKey {
2241            replica_id,
2242            name: name.to_string(),
2243        };
2244        self.replica_system_configurations
2245            .set(key, None, self.op_id)
2246            .expect("cannot have uniqueness violation");
2247    }
2248
2249    pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2250        match self.configs.insert(
2251            ConfigKey { key: key.clone() },
2252            ConfigValue { value },
2253            self.op_id,
2254        ) {
2255            Ok(_) => Ok(()),
2256            Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2257        }
2258    }
2259
2260    pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2261        self.clusters
2262            .items()
2263            .into_iter()
2264            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2265    }
2266
2267    pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2268        self.cluster_replicas
2269            .items()
2270            .into_iter()
2271            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2272    }
2273
2274    pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2275        self.databases
2276            .items()
2277            .into_iter()
2278            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2279    }
2280
2281    pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2282        self.roles
2283            .items()
2284            .into_iter()
2285            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2286    }
2287
2288    pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2289        self.network_policies
2290            .items()
2291            .into_iter()
2292            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2293    }
2294
2295    pub fn get_system_object_mappings(
2296        &self,
2297    ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2298        self.system_gid_mapping
2299            .items()
2300            .into_iter()
2301            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2302    }
2303
2304    pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2305        self.schemas
2306            .items()
2307            .into_iter()
2308            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2309    }
2310
2311    pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2312        self.system_configurations
2313            .items()
2314            .into_iter()
2315            .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2316    }
2317
2318    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2319        let key = SchemaKey { id: *id };
2320        self.schemas
2321            .get(&key)
2322            .map(|v| DurableType::from_key_value(key, v.clone()))
2323    }
2324
2325    pub fn get_introspection_source_indexes(
2326        &self,
2327        cluster_id: ClusterId,
2328    ) -> BTreeMap<&str, (GlobalId, u32)> {
2329        self.introspection_sources
2330            .items()
2331            .into_iter()
2332            .filter(|(k, _v)| k.cluster_id == cluster_id)
2333            .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2334            .collect()
2335    }
2336
2337    pub fn get_catalog_content_version(&self) -> Option<&str> {
2338        self.settings
2339            .get(&SettingKey {
2340                name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2341            })
2342            .map(|value| &*value.value)
2343    }
2344
2345    pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2346        self.settings
2347            .get(&SettingKey {
2348                name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2349            })
2350            .map(|value| value.value.clone())
2351    }
2352
2353    /// Commit the current operation within the transaction. This does not cause anything to be
2354    /// written durably, but signals to the current transaction that we are moving on to the next
2355    /// operation.
2356    ///
2357    /// Returns the updates of the committed operation.
2358    #[must_use]
2359    pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2360        let updates = self.get_op_updates();
2361        self.commit_op();
2362        updates
2363    }
2364
2365    fn get_op_updates(&self) -> Vec<StateUpdate> {
2366        fn get_collection_op_updates<'a, T>(
2367            table_txn: &'a TableTransaction<T::Key, T::Value>,
2368            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2369            op: Timestamp,
2370        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2371        where
2372            T::Key: Ord + Eq + Clone + Debug,
2373            T::Value: Ord + Clone + Debug,
2374            T: DurableType,
2375        {
2376            table_txn
2377                .pending
2378                .iter()
2379                .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2380                .filter_map(move |(k, v)| {
2381                    if v.ts == op {
2382                        let key = k.clone();
2383                        let value = v.value.clone();
2384                        let diff = v.diff.clone().try_into().expect("invalid diff");
2385                        let update = DurableType::from_key_value(key, value);
2386                        let kind = kind_fn(update);
2387                        Some((kind, diff))
2388                    } else {
2389                        None
2390                    }
2391                })
2392        }
2393
2394        fn get_large_collection_op_updates<'a, T>(
2395            collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2396            kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2397            op: Timestamp,
2398        ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2399        where
2400            T::Key: Ord + Eq + Clone + Debug,
2401            T: DurableType<Value = ()>,
2402        {
2403            collection.iter().filter_map(move |(k, diff, ts)| {
2404                if *ts == op {
2405                    let key = k.clone();
2406                    let diff = diff.clone().try_into().expect("invalid diff");
2407                    let update = DurableType::from_key_value(key, ());
2408                    let kind = kind_fn(update);
2409                    Some((kind, diff))
2410                } else {
2411                    None
2412                }
2413            })
2414        }
2415
2416        let Transaction {
2417            durable_catalog: _,
2418            databases,
2419            schemas,
2420            items,
2421            comments,
2422            roles,
2423            role_auth,
2424            clusters,
2425            network_policies,
2426            cluster_replicas,
2427            introspection_sources,
2428            system_gid_mapping,
2429            system_configurations,
2430            cluster_system_configurations,
2431            replica_system_configurations,
2432            default_privileges,
2433            source_references,
2434            system_privileges,
2435            audit_log_updates,
2436            storage_collection_metadata,
2437            unfinalized_shards,
2438            // Not representable as a `StateUpdate`.
2439            id_allocator: _,
2440            configs: _,
2441            settings: _,
2442            txn_wal_shard: _,
2443            upper,
2444            op_id: _,
2445        } = &self;
2446
2447        let updates = std::iter::empty()
2448            .chain(get_collection_op_updates(
2449                roles,
2450                StateUpdateKind::Role,
2451                self.op_id,
2452            ))
2453            .chain(get_collection_op_updates(
2454                role_auth,
2455                StateUpdateKind::RoleAuth,
2456                self.op_id,
2457            ))
2458            .chain(get_collection_op_updates(
2459                databases,
2460                StateUpdateKind::Database,
2461                self.op_id,
2462            ))
2463            .chain(get_collection_op_updates(
2464                schemas,
2465                StateUpdateKind::Schema,
2466                self.op_id,
2467            ))
2468            .chain(get_collection_op_updates(
2469                default_privileges,
2470                StateUpdateKind::DefaultPrivilege,
2471                self.op_id,
2472            ))
2473            .chain(get_collection_op_updates(
2474                system_privileges,
2475                StateUpdateKind::SystemPrivilege,
2476                self.op_id,
2477            ))
2478            .chain(get_collection_op_updates(
2479                system_configurations,
2480                StateUpdateKind::SystemConfiguration,
2481                self.op_id,
2482            ))
2483            .chain(get_collection_op_updates(
2484                cluster_system_configurations,
2485                StateUpdateKind::ClusterSystemConfiguration,
2486                self.op_id,
2487            ))
2488            .chain(get_collection_op_updates(
2489                replica_system_configurations,
2490                StateUpdateKind::ReplicaSystemConfiguration,
2491                self.op_id,
2492            ))
2493            .chain(get_collection_op_updates(
2494                clusters,
2495                StateUpdateKind::Cluster,
2496                self.op_id,
2497            ))
2498            .chain(get_collection_op_updates(
2499                network_policies,
2500                StateUpdateKind::NetworkPolicy,
2501                self.op_id,
2502            ))
2503            .chain(get_collection_op_updates(
2504                introspection_sources,
2505                StateUpdateKind::IntrospectionSourceIndex,
2506                self.op_id,
2507            ))
2508            .chain(get_collection_op_updates(
2509                cluster_replicas,
2510                StateUpdateKind::ClusterReplica,
2511                self.op_id,
2512            ))
2513            .chain(get_collection_op_updates(
2514                system_gid_mapping,
2515                StateUpdateKind::SystemObjectMapping,
2516                self.op_id,
2517            ))
2518            .chain(get_collection_op_updates(
2519                items,
2520                StateUpdateKind::Item,
2521                self.op_id,
2522            ))
2523            .chain(get_collection_op_updates(
2524                comments,
2525                StateUpdateKind::Comment,
2526                self.op_id,
2527            ))
2528            .chain(get_collection_op_updates(
2529                source_references,
2530                StateUpdateKind::SourceReferences,
2531                self.op_id,
2532            ))
2533            .chain(get_collection_op_updates(
2534                storage_collection_metadata,
2535                StateUpdateKind::StorageCollectionMetadata,
2536                self.op_id,
2537            ))
2538            .chain(get_collection_op_updates(
2539                unfinalized_shards,
2540                StateUpdateKind::UnfinalizedShard,
2541                self.op_id,
2542            ))
2543            .chain(get_large_collection_op_updates(
2544                audit_log_updates,
2545                StateUpdateKind::AuditLog,
2546                self.op_id,
2547            ))
2548            .map(|(kind, diff)| StateUpdate {
2549                kind,
2550                ts: upper.clone(),
2551                diff,
2552            })
2553            .collect();
2554
2555        updates
2556    }
2557
2558    pub fn is_savepoint(&self) -> bool {
2559        self.durable_catalog.is_savepoint()
2560    }
2561
2562    fn commit_op(&mut self) {
2563        self.op_id += 1;
2564    }
2565
2566    pub fn op_id(&self) -> Timestamp {
2567        self.op_id
2568    }
2569
2570    pub fn upper(&self) -> mz_repr::Timestamp {
2571        self.upper
2572    }
2573
2574    pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2575        let audit_log_updates = self
2576            .audit_log_updates
2577            .into_iter()
2578            .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2579            .collect();
2580
2581        let txn_batch = TransactionBatch {
2582            databases: self.databases.pending(),
2583            schemas: self.schemas.pending(),
2584            items: self.items.pending(),
2585            comments: self.comments.pending(),
2586            roles: self.roles.pending(),
2587            role_auth: self.role_auth.pending(),
2588            clusters: self.clusters.pending(),
2589            cluster_replicas: self.cluster_replicas.pending(),
2590            network_policies: self.network_policies.pending(),
2591            introspection_sources: self.introspection_sources.pending(),
2592            id_allocator: self.id_allocator.pending(),
2593            configs: self.configs.pending(),
2594            source_references: self.source_references.pending(),
2595            settings: self.settings.pending(),
2596            system_gid_mapping: self.system_gid_mapping.pending(),
2597            system_configurations: self.system_configurations.pending(),
2598            cluster_system_configurations: self.cluster_system_configurations.pending(),
2599            replica_system_configurations: self.replica_system_configurations.pending(),
2600            default_privileges: self.default_privileges.pending(),
2601            system_privileges: self.system_privileges.pending(),
2602            storage_collection_metadata: self.storage_collection_metadata.pending(),
2603            unfinalized_shards: self.unfinalized_shards.pending(),
2604            txn_wal_shard: self.txn_wal_shard.pending(),
2605            audit_log_updates,
2606            upper: self.upper,
2607        };
2608        (txn_batch, self.durable_catalog)
2609    }
2610
2611    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2612    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2613    /// before proceeding. In general, this must be fatal to the calling process. We do not
2614    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2615    ///
2616    /// The transaction is committed at `commit_ts`.
2617    ///
2618    /// Returns what the upper was directly after the transaction committed.
2619    ///
2620    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2621    /// catalog is not writeable.
2622    #[mz_ore::instrument(level = "debug")]
2623    pub(crate) async fn commit_internal(
2624        self,
2625        commit_ts: mz_repr::Timestamp,
2626    ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2627        let (mut txn_batch, durable_catalog) = self.into_parts();
2628        let TransactionBatch {
2629            databases,
2630            schemas,
2631            items,
2632            comments,
2633            roles,
2634            role_auth,
2635            clusters,
2636            cluster_replicas,
2637            network_policies,
2638            introspection_sources,
2639            id_allocator,
2640            configs,
2641            source_references,
2642            settings,
2643            system_gid_mapping,
2644            system_configurations,
2645            cluster_system_configurations,
2646            replica_system_configurations,
2647            default_privileges,
2648            system_privileges,
2649            storage_collection_metadata,
2650            unfinalized_shards,
2651            txn_wal_shard,
2652            audit_log_updates,
2653            upper: _,
2654        } = &mut txn_batch;
2655        // Consolidate in memory because it will likely be faster than consolidating after the
2656        // transaction has been made durable.
2657        differential_dataflow::consolidation::consolidate_updates(databases);
2658        differential_dataflow::consolidation::consolidate_updates(schemas);
2659        differential_dataflow::consolidation::consolidate_updates(items);
2660        differential_dataflow::consolidation::consolidate_updates(comments);
2661        differential_dataflow::consolidation::consolidate_updates(roles);
2662        differential_dataflow::consolidation::consolidate_updates(role_auth);
2663        differential_dataflow::consolidation::consolidate_updates(clusters);
2664        differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2665        differential_dataflow::consolidation::consolidate_updates(network_policies);
2666        differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2667        differential_dataflow::consolidation::consolidate_updates(id_allocator);
2668        differential_dataflow::consolidation::consolidate_updates(configs);
2669        differential_dataflow::consolidation::consolidate_updates(settings);
2670        differential_dataflow::consolidation::consolidate_updates(source_references);
2671        differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2672        differential_dataflow::consolidation::consolidate_updates(system_configurations);
2673        differential_dataflow::consolidation::consolidate_updates(cluster_system_configurations);
2674        differential_dataflow::consolidation::consolidate_updates(replica_system_configurations);
2675        differential_dataflow::consolidation::consolidate_updates(default_privileges);
2676        differential_dataflow::consolidation::consolidate_updates(system_privileges);
2677        differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2678        differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2679        differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2680        differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2681
2682        let upper = durable_catalog
2683            .commit_transaction(txn_batch, commit_ts)
2684            .await?;
2685        Ok((durable_catalog, upper))
2686    }
2687
2688    /// Commits the storage transaction to durable storage. Any error returned outside read-only
2689    /// mode indicates the catalog may be in an indeterminate state and needs to be fully re-read
2690    /// before proceeding. In general, this must be fatal to the calling process. We do not
2691    /// panic/halt inside this function itself so that errors can bubble up during initialization.
2692    ///
2693    /// In read-only mode, this will return an error for non-empty transactions indicating that the
2694    /// catalog is not writeable.
2695    ///
2696    /// IMPORTANT: It is assumed that the committer of this transaction has already applied all
2697    /// updates from this transaction. Therefore, updates from this transaction will not be returned
2698    /// when calling [`crate::durable::ReadOnlyDurableCatalogState::sync_to_current_updates`] or
2699    /// [`crate::durable::ReadOnlyDurableCatalogState::sync_updates`].
2700    ///
2701    /// An alternative implementation would be for the caller to explicitly consume their updates
2702    /// after committing and only then apply the updates in-memory. While this removes assumptions
2703    /// about the caller in this method, in practice it results in duplicate work on every commit.
2704    #[mz_ore::instrument(level = "debug")]
2705    pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2706        let op_updates = self.get_op_updates();
2707        assert!(
2708            op_updates.is_empty(),
2709            "unconsumed transaction updates: {op_updates:?}"
2710        );
2711
2712        let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2713        // Drain all the updates from the commit since it is assumed that they were already applied.
2714        let updates = durable_storage.sync_updates(upper).await?;
2715        // Writable and savepoint catalogs should have consumed all updates before committing a
2716        // transaction, otherwise the commit was performed with an out of date state.
2717        // Read-only catalogs can only commit empty transactions, so they don't need to consume all
2718        // updates before committing.
2719        soft_assert_no_log!(
2720            durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2721            "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2722        );
2723        Ok(())
2724    }
2725}
2726
2727use crate::durable::async_trait;
2728
2729use super::objects::{RoleAuthKey, RoleAuthValue};
2730
2731#[async_trait]
2732impl StorageTxn for Transaction<'_> {
2733    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2734        self.storage_collection_metadata
2735            .items()
2736            .into_iter()
2737            .map(
2738                |(
2739                    StorageCollectionMetadataKey { id },
2740                    StorageCollectionMetadataValue { shard },
2741                )| { (*id, shard.clone()) },
2742            )
2743            .collect()
2744    }
2745
2746    fn insert_collection_metadata(
2747        &mut self,
2748        metadata: BTreeMap<GlobalId, ShardId>,
2749    ) -> Result<(), StorageError> {
2750        for (id, shard) in metadata {
2751            self.storage_collection_metadata
2752                .insert(
2753                    StorageCollectionMetadataKey { id },
2754                    StorageCollectionMetadataValue {
2755                        shard: shard.clone(),
2756                    },
2757                    self.op_id,
2758                )
2759                .map_err(|err| match err {
2760                    DurableCatalogError::DuplicateKey => {
2761                        StorageError::CollectionMetadataAlreadyExists(id)
2762                    }
2763                    DurableCatalogError::UniquenessViolation => {
2764                        StorageError::PersistShardAlreadyInUse(shard)
2765                    }
2766                    err => StorageError::Generic(anyhow::anyhow!(err)),
2767                })?;
2768        }
2769        Ok(())
2770    }
2771
2772    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2773        let ks: Vec<_> = ids
2774            .into_iter()
2775            .map(|id| StorageCollectionMetadataKey { id })
2776            .collect();
2777        self.storage_collection_metadata
2778            .delete_by_keys(ks, self.op_id)
2779            .into_iter()
2780            .map(
2781                |(
2782                    StorageCollectionMetadataKey { id },
2783                    StorageCollectionMetadataValue { shard },
2784                )| (id, shard),
2785            )
2786            .collect()
2787    }
2788
2789    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2790        self.unfinalized_shards
2791            .items()
2792            .into_iter()
2793            .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2794            .collect()
2795    }
2796
2797    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError> {
2798        for shard in s {
2799            match self
2800                .unfinalized_shards
2801                .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2802            {
2803                // Inserting duplicate keys has no effect.
2804                Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2805                Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2806            };
2807        }
2808        Ok(())
2809    }
2810
2811    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2812        let ks: Vec<_> = shards
2813            .into_iter()
2814            .map(|shard| UnfinalizedShardKey { shard })
2815            .collect();
2816        let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2817    }
2818
2819    fn get_txn_wal_shard(&self) -> Option<ShardId> {
2820        self.txn_wal_shard
2821            .values()
2822            .iter()
2823            .next()
2824            .map(|TxnWalShardValue { shard }| *shard)
2825    }
2826
2827    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError> {
2828        self.txn_wal_shard
2829            .insert((), TxnWalShardValue { shard }, self.op_id)
2830            .map_err(|err| match err {
2831                DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2832                err => StorageError::Generic(anyhow::anyhow!(err)),
2833            })
2834    }
2835}
2836
2837/// Describes a set of changes to apply as the result of a catalog transaction.
2838#[derive(Debug, Clone, Default, PartialEq)]
2839pub struct TransactionBatch {
2840    pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2841    pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2842    pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2843    pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2844    pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2845    pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2846    pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2847    pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2848    pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2849    pub(crate) introspection_sources: Vec<(
2850        proto::ClusterIntrospectionSourceIndexKey,
2851        proto::ClusterIntrospectionSourceIndexValue,
2852        Diff,
2853    )>,
2854    pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2855    pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2856    pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2857    pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2858    pub(crate) system_configurations: Vec<(
2859        proto::ServerConfigurationKey,
2860        proto::ServerConfigurationValue,
2861        Diff,
2862    )>,
2863    pub(crate) cluster_system_configurations: Vec<(
2864        proto::ClusterSystemConfigurationKey,
2865        proto::ClusterSystemConfigurationValue,
2866        Diff,
2867    )>,
2868    pub(crate) replica_system_configurations: Vec<(
2869        proto::ReplicaSystemConfigurationKey,
2870        proto::ReplicaSystemConfigurationValue,
2871        Diff,
2872    )>,
2873    pub(crate) default_privileges: Vec<(
2874        proto::DefaultPrivilegesKey,
2875        proto::DefaultPrivilegesValue,
2876        Diff,
2877    )>,
2878    pub(crate) source_references: Vec<(
2879        proto::SourceReferencesKey,
2880        proto::SourceReferencesValue,
2881        Diff,
2882    )>,
2883    pub(crate) system_privileges: Vec<(
2884        proto::SystemPrivilegesKey,
2885        proto::SystemPrivilegesValue,
2886        Diff,
2887    )>,
2888    pub(crate) storage_collection_metadata: Vec<(
2889        proto::StorageCollectionMetadataKey,
2890        proto::StorageCollectionMetadataValue,
2891        Diff,
2892    )>,
2893    pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2894    pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2895    pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2896    /// The upper of the catalog when the transaction started.
2897    pub(crate) upper: mz_repr::Timestamp,
2898}
2899
2900impl TransactionBatch {
2901    pub fn is_empty(&self) -> bool {
2902        let TransactionBatch {
2903            databases,
2904            schemas,
2905            items,
2906            comments,
2907            roles,
2908            role_auth,
2909            clusters,
2910            cluster_replicas,
2911            network_policies,
2912            introspection_sources,
2913            id_allocator,
2914            configs,
2915            settings,
2916            source_references,
2917            system_gid_mapping,
2918            system_configurations,
2919            cluster_system_configurations,
2920            replica_system_configurations,
2921            default_privileges,
2922            system_privileges,
2923            storage_collection_metadata,
2924            unfinalized_shards,
2925            txn_wal_shard,
2926            audit_log_updates,
2927            upper: _,
2928        } = self;
2929        databases.is_empty()
2930            && schemas.is_empty()
2931            && items.is_empty()
2932            && comments.is_empty()
2933            && roles.is_empty()
2934            && role_auth.is_empty()
2935            && clusters.is_empty()
2936            && cluster_replicas.is_empty()
2937            && network_policies.is_empty()
2938            && introspection_sources.is_empty()
2939            && id_allocator.is_empty()
2940            && configs.is_empty()
2941            && settings.is_empty()
2942            && source_references.is_empty()
2943            && system_gid_mapping.is_empty()
2944            && system_configurations.is_empty()
2945            && cluster_system_configurations.is_empty()
2946            && replica_system_configurations.is_empty()
2947            && default_privileges.is_empty()
2948            && system_privileges.is_empty()
2949            && storage_collection_metadata.is_empty()
2950            && unfinalized_shards.is_empty()
2951            && txn_wal_shard.is_empty()
2952            && audit_log_updates.is_empty()
2953    }
2954}
2955
2956#[derive(Debug, Clone, PartialEq, Eq)]
2957struct TransactionUpdate<V> {
2958    value: V,
2959    ts: Timestamp,
2960    diff: Diff,
2961}
2962
2963/// Utility trait to check for plan validity.
2964trait UniqueName {
2965    /// Does the item have a unique name? If yes, we can check for name equality in validity
2966    /// checking.
2967    const HAS_UNIQUE_NAME: bool;
2968    /// The unique name, only returns a meaningful name if [`Self::HAS_UNIQUE_NAME`] is `true`.
2969    fn unique_name(&self) -> &str;
2970}
2971
2972mod unique_name {
2973    use crate::durable::objects::*;
2974
2975    macro_rules! impl_unique_name {
2976        ($($t:ty),* $(,)?) => {
2977            $(
2978                impl crate::durable::transaction::UniqueName for $t {
2979                    const HAS_UNIQUE_NAME: bool = true;
2980                    fn unique_name(&self) -> &str {
2981                        &self.name
2982                    }
2983                }
2984            )*
2985        };
2986    }
2987
2988    macro_rules! impl_no_unique_name {
2989        ($($t:ty),* $(,)?) => {
2990            $(
2991                impl crate::durable::transaction::UniqueName for $t {
2992                    const HAS_UNIQUE_NAME: bool = false;
2993                    fn unique_name(&self) -> &str {
2994                       ""
2995                    }
2996                }
2997            )*
2998        };
2999    }
3000
3001    impl_unique_name! {
3002        ClusterReplicaValue,
3003        ClusterValue,
3004        DatabaseValue,
3005        ItemValue,
3006        NetworkPolicyValue,
3007        RoleValue,
3008        SchemaValue,
3009    }
3010
3011    impl_no_unique_name!(
3012        (),
3013        ClusterIntrospectionSourceIndexValue,
3014        ClusterSystemConfigurationValue,
3015        CommentValue,
3016        ConfigValue,
3017        DefaultPrivilegesValue,
3018        GidMappingValue,
3019        IdAllocValue,
3020        ReplicaSystemConfigurationValue,
3021        ServerConfigurationValue,
3022        SettingValue,
3023        SourceReferencesValue,
3024        StorageCollectionMetadataValue,
3025        SystemPrivilegesValue,
3026        TxnWalShardValue,
3027        RoleAuthValue,
3028    );
3029
3030    #[cfg(test)]
3031    mod test {
3032        impl_no_unique_name!(String,);
3033    }
3034}
3035
3036/// TableTransaction emulates some features of a typical SQL transaction over
3037/// table for a Collection.
3038///
3039/// It supports:
3040/// - uniqueness constraints
3041/// - transactional reads and writes (including read-your-writes before commit)
3042///
3043/// `K` is the primary key type. Multiple entries with the same key are disallowed.
3044/// `V` is the an arbitrary value type.
3045#[derive(Debug)]
3046struct TableTransaction<K, V> {
3047    initial: BTreeMap<K, V>,
3048    // The desired updates to keys after commit.
3049    // Invariant: Value is sorted by `ts`.
3050    pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
3051    uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
3052}
3053
3054impl<K, V> TableTransaction<K, V>
3055where
3056    K: Ord + Eq + Clone + Debug,
3057    V: Ord + Clone + Debug + UniqueName,
3058{
3059    /// Create a new TableTransaction with initial data.
3060    ///
3061    /// Internally the catalog serializes data as protobuf. All fields in a proto message are
3062    /// optional, which makes using them in Rust cumbersome. Generic parameters `KP` and `VP` are
3063    /// protobuf types which deserialize to `K` and `V` that a [`TableTransaction`] is generic
3064    /// over.
3065    fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
3066    where
3067        K: RustType<KP>,
3068        V: RustType<VP>,
3069    {
3070        let initial = initial
3071            .into_iter()
3072            .map(RustType::from_proto)
3073            .collect::<Result<_, _>>()?;
3074
3075        Ok(Self {
3076            initial,
3077            pending: BTreeMap::new(),
3078            uniqueness_violation: None,
3079        })
3080    }
3081
3082    /// Like [`Self::new`], but you can also provide `uniqueness_violation`, which is a function
3083    /// that determines whether there is a uniqueness violation among two values.
3084    fn new_with_uniqueness_fn<KP, VP>(
3085        initial: BTreeMap<KP, VP>,
3086        uniqueness_violation: fn(a: &V, b: &V) -> bool,
3087    ) -> Result<Self, TryFromProtoError>
3088    where
3089        K: RustType<KP>,
3090        V: RustType<VP>,
3091    {
3092        let initial = initial
3093            .into_iter()
3094            .map(RustType::from_proto)
3095            .collect::<Result<_, _>>()?;
3096
3097        Ok(Self {
3098            initial,
3099            pending: BTreeMap::new(),
3100            uniqueness_violation: Some(uniqueness_violation),
3101        })
3102    }
3103
3104    /// Consumes and returns the pending changes and their diffs. `Diff` is
3105    /// guaranteed to be 1 or -1.
3106    fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
3107    where
3108        K: RustType<KP>,
3109        V: RustType<VP>,
3110    {
3111        soft_assert_no_log!(self.verify().is_ok());
3112        // Pending describes the desired final state for some keys. K,V pairs should be
3113        // retracted if they already exist and were deleted or are being updated.
3114        self.pending
3115            .into_iter()
3116            .flat_map(|(k, v)| {
3117                let mut v: Vec<_> = v
3118                    .into_iter()
3119                    .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3120                    .collect();
3121                differential_dataflow::consolidation::consolidate(&mut v);
3122                v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3123            })
3124            .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3125            .collect()
3126    }
3127
3128    /// Verifies that no items in `self` violate `self.uniqueness_violation`.
3129    ///
3130    /// Runtime is O(n^2), where n is the number of items in `self`, if
3131    /// [`UniqueName::HAS_UNIQUE_NAME`] is false for `V`. Prefer using [`Self::verify_keys`].
3132    fn verify(&self) -> Result<(), DurableCatalogError> {
3133        if let Some(uniqueness_violation) = self.uniqueness_violation {
3134            // Compare each value to each other value and ensure they are unique.
3135            let items = self.values();
3136            if V::HAS_UNIQUE_NAME {
3137                let by_name: BTreeMap<_, _> = items
3138                    .iter()
3139                    .enumerate()
3140                    .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3141                    .collect();
3142                for (i, vi) in items.iter().enumerate() {
3143                    if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3144                        if i != *j && uniqueness_violation(vi, *vj) {
3145                            return Err(DurableCatalogError::UniquenessViolation);
3146                        }
3147                    }
3148                }
3149            } else {
3150                for (i, vi) in items.iter().enumerate() {
3151                    for (j, vj) in items.iter().enumerate() {
3152                        if i != j && uniqueness_violation(vi, vj) {
3153                            return Err(DurableCatalogError::UniquenessViolation);
3154                        }
3155                    }
3156                }
3157            }
3158        }
3159        soft_assert_no_log!(
3160            self.pending
3161                .values()
3162                .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3163            "pending should be sorted by timestamp: {:?}",
3164            self.pending
3165        );
3166        Ok(())
3167    }
3168
3169    /// Verifies that no items in `self` violate `self.uniqueness_violation` with `keys`.
3170    ///
3171    /// Runtime is O(n * k), where n is the number of items in `self` and k is the number of
3172    /// items in `keys`.
3173    fn verify_keys<'a>(
3174        &self,
3175        keys: impl IntoIterator<Item = &'a K>,
3176    ) -> Result<(), DurableCatalogError>
3177    where
3178        K: 'a,
3179    {
3180        if let Some(uniqueness_violation) = self.uniqueness_violation {
3181            let entries: Vec<_> = keys
3182                .into_iter()
3183                .filter_map(|key| self.get(key).map(|value| (key, value)))
3184                .collect();
3185            // Compare each value in `entries` to each value in `self` and ensure they are unique.
3186            for (ki, vi) in self.items() {
3187                for (kj, vj) in &entries {
3188                    if ki != *kj && uniqueness_violation(vi, vj) {
3189                        return Err(DurableCatalogError::UniquenessViolation);
3190                    }
3191                }
3192            }
3193        }
3194        soft_assert_no_log!(self.verify().is_ok());
3195        Ok(())
3196    }
3197
3198    /// Iterates over the items viewable in the current transaction in arbitrary
3199    /// order and applies `f` on all key, value pairs.
3200    fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3201        let mut seen = BTreeSet::new();
3202        for k in self.pending.keys() {
3203            seen.insert(k);
3204            let v = self.get(k);
3205            // Deleted items don't exist so shouldn't be visited, but still suppress
3206            // visiting the key later.
3207            if let Some(v) = v {
3208                f(k, v);
3209            }
3210        }
3211        for (k, v) in self.initial.iter() {
3212            // Add on initial items that don't have updates.
3213            if !seen.contains(k) {
3214                f(k, v);
3215            }
3216        }
3217    }
3218
3219    /// Returns the current value of `k`.
3220    fn get(&self, k: &K) -> Option<&V> {
3221        let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3222        let mut updates = Vec::with_capacity(pending.len() + 1);
3223        if let Some(initial) = self.initial.get(k) {
3224            updates.push((initial, Diff::ONE));
3225        }
3226        updates.extend(
3227            pending
3228                .into_iter()
3229                .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3230        );
3231
3232        differential_dataflow::consolidation::consolidate(&mut updates);
3233        assert!(updates.len() <= 1);
3234        updates.into_iter().next().map(|(v, _)| v)
3235    }
3236
3237    /// Returns the items viewable in the current transaction. The items are
3238    /// cloned, so this is an expensive operation. Prefer using [`Self::items`], or
3239    /// [`Self::for_values`].
3240    // Used by tests.
3241    #[cfg(test)]
3242    fn items_cloned(&self) -> BTreeMap<K, V> {
3243        let mut items = BTreeMap::new();
3244        self.for_values(|k, v| {
3245            items.insert(k.clone(), v.clone());
3246        });
3247        items
3248    }
3249
3250    /// Returns the current items as proto-typed key-value pairs, suitable for
3251    /// constructing a [`Snapshot`]. This merges `initial` and `pending` to
3252    /// produce the current view and converts back to proto types.
3253    fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3254    where
3255        K: RustType<KP>,
3256        V: RustType<VP>,
3257        KP: Ord,
3258    {
3259        let mut items = BTreeMap::new();
3260        self.for_values(|k, v| {
3261            items.insert(k.into_proto(), v.into_proto());
3262        });
3263        items
3264    }
3265
3266    /// Returns the items viewable in the current transaction as references. Returns a map
3267    /// of references.
3268    fn items(&self) -> BTreeMap<&K, &V> {
3269        let mut items = BTreeMap::new();
3270        self.for_values(|k, v| {
3271            items.insert(k, v);
3272        });
3273        items
3274    }
3275
3276    /// Returns the values viewable in the current transaction as references.
3277    fn values(&self) -> BTreeSet<&V> {
3278        let mut items = BTreeSet::new();
3279        self.for_values(|_, v| {
3280            items.insert(v);
3281        });
3282        items
3283    }
3284
3285    /// Returns the number of items viewable in the current transaction.
3286    fn len(&self) -> usize {
3287        let mut count = 0;
3288        self.for_values(|_, _| {
3289            count += 1;
3290        });
3291        count
3292    }
3293
3294    /// Iterates over the items viewable in the current transaction, and provides a
3295    /// map where additional pending items can be inserted, which will be appended
3296    /// to current pending items. Does not verify uniqueness.
3297    fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3298        &mut self,
3299        mut f: F,
3300    ) {
3301        let mut pending = BTreeMap::new();
3302        self.for_values(|k, v| f(&mut pending, k, v));
3303        for (k, updates) in pending {
3304            self.pending.entry(k).or_default().extend(updates);
3305        }
3306    }
3307
3308    /// Inserts a new k,v pair.
3309    ///
3310    /// Returns an error if the uniqueness check failed or the key already exists.
3311    fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3312        let mut violation = None;
3313        self.for_values(|for_k, for_v| {
3314            if &k == for_k {
3315                violation = Some(DurableCatalogError::DuplicateKey);
3316            }
3317            if let Some(uniqueness_violation) = self.uniqueness_violation {
3318                if uniqueness_violation(for_v, &v) {
3319                    violation = Some(DurableCatalogError::UniquenessViolation);
3320                }
3321            }
3322        });
3323        if let Some(violation) = violation {
3324            return Err(violation);
3325        }
3326        self.pending.entry(k).or_default().push(TransactionUpdate {
3327            value: v,
3328            ts,
3329            diff: Diff::ONE,
3330        });
3331        soft_assert_no_log!(self.verify().is_ok());
3332        Ok(())
3333    }
3334
3335    /// Updates k, v pairs. `f` is a function that can return `Some(V)` if the
3336    /// value should be updated, otherwise `None`. Returns the number of changed
3337    /// entries.
3338    ///
3339    /// Returns an error if the uniqueness check failed.
3340    ///
3341    /// Prefer using [`Self::update_by_key`] or [`Self::update_by_keys`], which generally have
3342    /// better performance.
3343    fn update<F: Fn(&K, &V) -> Option<V>>(
3344        &mut self,
3345        f: F,
3346        ts: Timestamp,
3347    ) -> Result<Diff, DurableCatalogError> {
3348        let mut changed = Diff::ZERO;
3349        let mut keys = BTreeSet::new();
3350        // Keep a copy of pending in case of uniqueness violation.
3351        let pending = self.pending.clone();
3352        self.for_values_mut(|p, k, v| {
3353            if let Some(next) = f(k, v) {
3354                changed += Diff::ONE;
3355                keys.insert(k.clone());
3356                let updates = p.entry(k.clone()).or_default();
3357                updates.push(TransactionUpdate {
3358                    value: v.clone(),
3359                    ts,
3360                    diff: Diff::MINUS_ONE,
3361                });
3362                updates.push(TransactionUpdate {
3363                    value: next,
3364                    ts,
3365                    diff: Diff::ONE,
3366                });
3367            }
3368        });
3369        // Check for uniqueness violation.
3370        if let Err(err) = self.verify_keys(&keys) {
3371            self.pending = pending;
3372            Err(err)
3373        } else {
3374            Ok(changed)
3375        }
3376    }
3377
3378    /// Updates `k`, `v` pair if `k` already exists in `self`.
3379    ///
3380    /// Returns `true` if `k` was updated, `false` otherwise.
3381    /// Returns an error if the uniqueness check failed.
3382    fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3383        if let Some(cur_v) = self.get(&k) {
3384            if v != *cur_v {
3385                self.set(k, Some(v), ts)?;
3386            }
3387            Ok(true)
3388        } else {
3389            Ok(false)
3390        }
3391    }
3392
3393    /// Updates k, v pairs. Keys that don't already exist in `self` are ignored.
3394    ///
3395    /// Returns the number of changed entries.
3396    /// Returns an error if the uniqueness check failed.
3397    fn update_by_keys(
3398        &mut self,
3399        kvs: impl IntoIterator<Item = (K, V)>,
3400        ts: Timestamp,
3401    ) -> Result<Diff, DurableCatalogError> {
3402        let kvs: Vec<_> = kvs
3403            .into_iter()
3404            .filter_map(|(k, v)| match self.get(&k) {
3405                // Record if updating this entry would be a no-op.
3406                Some(cur_v) => Some((*cur_v == v, k, v)),
3407                None => None,
3408            })
3409            .collect();
3410        let changed = kvs.len();
3411        let changed =
3412            Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3413        let kvs = kvs
3414            .into_iter()
3415            // Filter out no-ops to save some work.
3416            .filter(|(no_op, _, _)| !no_op)
3417            .map(|(_, k, v)| (k, Some(v)))
3418            .collect();
3419        self.set_many(kvs, ts)?;
3420        Ok(changed)
3421    }
3422
3423    /// Set the value for a key. Returns the previous entry if the key existed,
3424    /// otherwise None.
3425    ///
3426    /// Returns an error if the uniqueness check failed.
3427    ///
3428    /// DO NOT call this function in a loop, use [`Self::set_many`] instead.
3429    fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3430        let prev = self.get(&k).cloned();
3431        let entry = self.pending.entry(k.clone()).or_default();
3432        let restore_len = entry.len();
3433
3434        match (v, prev.clone()) {
3435            (Some(v), Some(prev)) => {
3436                entry.push(TransactionUpdate {
3437                    value: prev,
3438                    ts,
3439                    diff: Diff::MINUS_ONE,
3440                });
3441                entry.push(TransactionUpdate {
3442                    value: v,
3443                    ts,
3444                    diff: Diff::ONE,
3445                });
3446            }
3447            (Some(v), None) => {
3448                entry.push(TransactionUpdate {
3449                    value: v,
3450                    ts,
3451                    diff: Diff::ONE,
3452                });
3453            }
3454            (None, Some(prev)) => {
3455                entry.push(TransactionUpdate {
3456                    value: prev,
3457                    ts,
3458                    diff: Diff::MINUS_ONE,
3459                });
3460            }
3461            (None, None) => {}
3462        }
3463
3464        // Check for uniqueness violation.
3465        if let Err(err) = self.verify_keys([&k]) {
3466            // Revert self.pending to the state it was in before calling this
3467            // function.
3468            let pending = self.pending.get_mut(&k).expect("inserted above");
3469            pending.truncate(restore_len);
3470            Err(err)
3471        } else {
3472            Ok(prev)
3473        }
3474    }
3475
3476    /// Set the values for many keys. Returns the previous entry for each key if the key existed,
3477    /// otherwise None.
3478    ///
3479    /// Returns an error if any uniqueness check failed.
3480    fn set_many(
3481        &mut self,
3482        kvs: BTreeMap<K, Option<V>>,
3483        ts: Timestamp,
3484    ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3485        if kvs.is_empty() {
3486            return Ok(BTreeMap::new());
3487        }
3488
3489        let mut prevs = BTreeMap::new();
3490        let mut restores = BTreeMap::new();
3491
3492        for (k, v) in kvs {
3493            let prev = self.get(&k).cloned();
3494            let entry = self.pending.entry(k.clone()).or_default();
3495            restores.insert(k.clone(), entry.len());
3496
3497            match (v, prev.clone()) {
3498                (Some(v), Some(prev)) => {
3499                    entry.push(TransactionUpdate {
3500                        value: prev,
3501                        ts,
3502                        diff: Diff::MINUS_ONE,
3503                    });
3504                    entry.push(TransactionUpdate {
3505                        value: v,
3506                        ts,
3507                        diff: Diff::ONE,
3508                    });
3509                }
3510                (Some(v), None) => {
3511                    entry.push(TransactionUpdate {
3512                        value: v,
3513                        ts,
3514                        diff: Diff::ONE,
3515                    });
3516                }
3517                (None, Some(prev)) => {
3518                    entry.push(TransactionUpdate {
3519                        value: prev,
3520                        ts,
3521                        diff: Diff::MINUS_ONE,
3522                    });
3523                }
3524                (None, None) => {}
3525            }
3526
3527            prevs.insert(k, prev);
3528        }
3529
3530        // Check for uniqueness violation.
3531        if let Err(err) = self.verify_keys(prevs.keys()) {
3532            for (k, restore_len) in restores {
3533                // Revert self.pending to the state it was in before calling this
3534                // function.
3535                let pending = self.pending.get_mut(&k).expect("inserted above");
3536                pending.truncate(restore_len);
3537            }
3538            Err(err)
3539        } else {
3540            Ok(prevs)
3541        }
3542    }
3543
3544    /// Deletes items for which `f` returns true. Returns the keys and values of
3545    /// the deleted entries.
3546    ///
3547    /// Prefer using [`Self::delete_by_key`] or [`Self::delete_by_keys`], which generally have
3548    /// better performance.
3549    fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3550        let mut deleted = Vec::new();
3551        self.for_values_mut(|p, k, v| {
3552            if f(k, v) {
3553                deleted.push((k.clone(), v.clone()));
3554                p.entry(k.clone()).or_default().push(TransactionUpdate {
3555                    value: v.clone(),
3556                    ts,
3557                    diff: Diff::MINUS_ONE,
3558                });
3559            }
3560        });
3561        soft_assert_no_log!(self.verify().is_ok());
3562        deleted
3563    }
3564
3565    /// Deletes item with key `k`.
3566    ///
3567    /// Returns the value of the deleted entry, if it existed.
3568    fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3569        self.set(k, None, ts)
3570            .expect("deleting an entry cannot violate uniqueness")
3571    }
3572
3573    /// Deletes items with key in `ks`.
3574    ///
3575    /// Returns the keys and values of the deleted entries.
3576    fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3577        let kvs = ks.into_iter().map(|k| (k, None)).collect();
3578        let prevs = self
3579            .set_many(kvs, ts)
3580            .expect("deleting entries cannot violate uniqueness");
3581        prevs
3582            .into_iter()
3583            .filter_map(|(k, v)| v.map(|v| (k, v)))
3584            .collect()
3585    }
3586}
3587
3588#[cfg(test)]
3589#[allow(clippy::unwrap_used)]
3590mod tests {
3591    use super::*;
3592
3593    use mz_ore::now::SYSTEM_TIME;
3594    use mz_ore::{assert_none, assert_ok};
3595    use mz_persist_client::cache::PersistClientCache;
3596    use mz_persist_types::PersistLocation;
3597    use semver::Version;
3598
3599    use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3600    use crate::memory;
3601
3602    #[mz_ore::test]
3603    fn test_table_transaction_simple() {
3604        fn uniqueness_violation(a: &String, b: &String) -> bool {
3605            a == b
3606        }
3607        let mut table = TableTransaction::new_with_uniqueness_fn(
3608            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3609            uniqueness_violation,
3610        )
3611        .unwrap();
3612
3613        // Ideally, we compare for errors here, but it's hard/impossible to implement PartialEq
3614        // for DurableCatalogError.
3615        assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3616        assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3617        assert!(
3618            table
3619                .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3620                .is_err()
3621        );
3622        assert!(
3623            table
3624                .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3625                .is_err()
3626        );
3627    }
3628
3629    #[mz_ore::test]
3630    fn test_table_transaction() {
3631        fn uniqueness_violation(a: &String, b: &String) -> bool {
3632            a == b
3633        }
3634        let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3635
3636        fn commit(
3637            table: &mut BTreeMap<Vec<u8>, String>,
3638            mut pending: Vec<(Vec<u8>, String, Diff)>,
3639        ) {
3640            // Sort by diff so that we process retractions first.
3641            pending.sort_by(|a, b| a.2.cmp(&b.2));
3642            for (k, v, diff) in pending {
3643                if diff == Diff::MINUS_ONE {
3644                    let prev = table.remove(&k);
3645                    assert_eq!(prev, Some(v));
3646                } else if diff == Diff::ONE {
3647                    let prev = table.insert(k, v);
3648                    assert_eq!(prev, None);
3649                } else {
3650                    panic!("unexpected diff: {diff}");
3651                }
3652            }
3653        }
3654
3655        table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3656        table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3657        let mut table_txn =
3658            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3659        assert_eq!(table_txn.items_cloned(), table);
3660        assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3661        assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3662        assert_eq!(
3663            table_txn.items_cloned(),
3664            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3665        );
3666        assert_eq!(
3667            table_txn
3668                .update(|_k, _v| Some("v3".to_string()), 2)
3669                .unwrap(),
3670            Diff::ONE
3671        );
3672
3673        // Uniqueness violation.
3674        table_txn
3675            .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3676            .unwrap_err();
3677
3678        table_txn
3679            .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3680            .unwrap();
3681        assert_eq!(
3682            table_txn.items_cloned(),
3683            BTreeMap::from([
3684                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3685                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3686            ])
3687        );
3688        let err = table_txn
3689            .update(|_k, _v| Some("v1".to_string()), 5)
3690            .unwrap_err();
3691        assert!(
3692            matches!(err, DurableCatalogError::UniquenessViolation),
3693            "unexpected err: {err:?}"
3694        );
3695        let pending = table_txn.pending();
3696        assert_eq!(
3697            pending,
3698            vec![
3699                (
3700                    1i64.to_le_bytes().to_vec(),
3701                    "v1".to_string(),
3702                    Diff::MINUS_ONE
3703                ),
3704                (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3705                (
3706                    2i64.to_le_bytes().to_vec(),
3707                    "v2".to_string(),
3708                    Diff::MINUS_ONE
3709                ),
3710                (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3711            ]
3712        );
3713        commit(&mut table, pending);
3714        assert_eq!(
3715            table,
3716            BTreeMap::from([
3717                (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3718                (3i64.to_le_bytes().to_vec(), "v4".to_string())
3719            ])
3720        );
3721
3722        let mut table_txn =
3723            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3724        // Deleting then creating an item that has a uniqueness violation should work.
3725        assert_eq!(
3726            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3727            1
3728        );
3729        table_txn
3730            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3731            .unwrap();
3732        // Uniqueness violation in value.
3733        table_txn
3734            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3735            .unwrap_err();
3736        // Key already exists, expect error.
3737        table_txn
3738            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3739            .unwrap_err();
3740        assert_eq!(
3741            table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3742            1
3743        );
3744        // Both the inserts work now because the key and uniqueness violation are gone.
3745        table_txn
3746            .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3747            .unwrap();
3748        table_txn
3749            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3750            .unwrap();
3751        let pending = table_txn.pending();
3752        assert_eq!(
3753            pending,
3754            vec![
3755                (
3756                    1i64.to_le_bytes().to_vec(),
3757                    "v3".to_string(),
3758                    Diff::MINUS_ONE
3759                ),
3760                (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3761                (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3762            ]
3763        );
3764        commit(&mut table, pending);
3765        assert_eq!(
3766            table,
3767            BTreeMap::from([
3768                (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3769                (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3770                (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3771            ])
3772        );
3773
3774        let mut table_txn =
3775            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3776        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3777        table_txn
3778            .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3779            .unwrap();
3780
3781        commit(&mut table, table_txn.pending());
3782        assert_eq!(
3783            table,
3784            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3785        );
3786
3787        let mut table_txn =
3788            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3789        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3790        table_txn
3791            .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3792            .unwrap();
3793        commit(&mut table, table_txn.pending());
3794        assert_eq!(
3795            table,
3796            BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3797        );
3798
3799        // Verify we don't try to delete v3 or v4 during commit.
3800        let mut table_txn =
3801            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3802        assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3803        table_txn
3804            .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3805            .unwrap();
3806        table_txn
3807            .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3808            .unwrap_err();
3809        assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3810        table_txn
3811            .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3812            .unwrap();
3813        commit(&mut table, table_txn.pending());
3814        assert_eq!(
3815            table.clone().into_iter().collect::<Vec<_>>(),
3816            vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3817        );
3818
3819        // Test `set`.
3820        let mut table_txn =
3821            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3822        // Uniqueness violation.
3823        table_txn
3824            .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3825            .unwrap_err();
3826        table_txn
3827            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3828            .unwrap();
3829        table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3830        table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3831        let pending = table_txn.pending();
3832        assert_eq!(
3833            pending,
3834            vec![
3835                (
3836                    1i64.to_le_bytes().to_vec(),
3837                    "v5".to_string(),
3838                    Diff::MINUS_ONE
3839                ),
3840                (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3841            ]
3842        );
3843        commit(&mut table, pending);
3844        assert_eq!(
3845            table,
3846            BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3847        );
3848
3849        // Duplicate `set`.
3850        let mut table_txn =
3851            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3852        table_txn
3853            .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3854            .unwrap();
3855        let pending = table_txn.pending::<Vec<u8>, String>();
3856        assert!(pending.is_empty());
3857
3858        // Test `set_many`.
3859        let mut table_txn =
3860            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3861        // Uniqueness violation.
3862        table_txn
3863            .set_many(
3864                BTreeMap::from([
3865                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3866                    (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3867                ]),
3868                0,
3869            )
3870            .unwrap_err();
3871        table_txn
3872            .set_many(
3873                BTreeMap::from([
3874                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3875                    (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3876                ]),
3877                1,
3878            )
3879            .unwrap();
3880        table_txn
3881            .set_many(
3882                BTreeMap::from([
3883                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3884                    (3i64.to_le_bytes().to_vec(), None),
3885                ]),
3886                2,
3887            )
3888            .unwrap();
3889        let pending = table_txn.pending();
3890        assert_eq!(
3891            pending,
3892            vec![
3893                (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3894                (
3895                    3i64.to_le_bytes().to_vec(),
3896                    "v6".to_string(),
3897                    Diff::MINUS_ONE
3898                ),
3899                (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3900            ]
3901        );
3902        commit(&mut table, pending);
3903        assert_eq!(
3904            table,
3905            BTreeMap::from([
3906                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3907                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3908            ])
3909        );
3910
3911        // Duplicate `set_many`.
3912        let mut table_txn =
3913            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3914        table_txn
3915            .set_many(
3916                BTreeMap::from([
3917                    (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3918                    (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3919                ]),
3920                0,
3921            )
3922            .unwrap();
3923        let pending = table_txn.pending::<Vec<u8>, String>();
3924        assert!(pending.is_empty());
3925        commit(&mut table, pending);
3926        assert_eq!(
3927            table,
3928            BTreeMap::from([
3929                (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3930                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3931            ])
3932        );
3933
3934        // Test `update_by_key`
3935        let mut table_txn =
3936            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3937        // Uniqueness violation.
3938        table_txn
3939            .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3940            .unwrap_err();
3941        assert!(
3942            table_txn
3943                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3944                .unwrap()
3945        );
3946        assert!(
3947            !table_txn
3948                .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3949                .unwrap()
3950        );
3951        let pending = table_txn.pending();
3952        assert_eq!(
3953            pending,
3954            vec![
3955                (
3956                    1i64.to_le_bytes().to_vec(),
3957                    "v6".to_string(),
3958                    Diff::MINUS_ONE
3959                ),
3960                (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3961            ]
3962        );
3963        commit(&mut table, pending);
3964        assert_eq!(
3965            table,
3966            BTreeMap::from([
3967                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3968                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3969            ])
3970        );
3971
3972        // Duplicate `update_by_key`.
3973        let mut table_txn =
3974            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3975        assert!(
3976            table_txn
3977                .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3978                .unwrap()
3979        );
3980        let pending = table_txn.pending::<Vec<u8>, String>();
3981        assert!(pending.is_empty());
3982        commit(&mut table, pending);
3983        assert_eq!(
3984            table,
3985            BTreeMap::from([
3986                (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3987                (42i64.to_le_bytes().to_vec(), "v7".to_string())
3988            ])
3989        );
3990
3991        // Test `update_by_keys`
3992        let mut table_txn =
3993            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3994        // Uniqueness violation.
3995        table_txn
3996            .update_by_keys(
3997                [
3998                    (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3999                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
4000                ],
4001                0,
4002            )
4003            .unwrap_err();
4004        let n = table_txn
4005            .update_by_keys(
4006                [
4007                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4008                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
4009                ],
4010                1,
4011            )
4012            .unwrap();
4013        assert_eq!(n, Diff::ONE);
4014        let n = table_txn
4015            .update_by_keys(
4016                [
4017                    (15i64.to_le_bytes().to_vec(), "v9".to_string()),
4018                    (5i64.to_le_bytes().to_vec(), "v7".to_string()),
4019                ],
4020                2,
4021            )
4022            .unwrap();
4023        assert_eq!(n, Diff::ZERO);
4024        let pending = table_txn.pending();
4025        assert_eq!(
4026            pending,
4027            vec![
4028                (
4029                    1i64.to_le_bytes().to_vec(),
4030                    "v8".to_string(),
4031                    Diff::MINUS_ONE
4032                ),
4033                (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
4034            ]
4035        );
4036        commit(&mut table, pending);
4037        assert_eq!(
4038            table,
4039            BTreeMap::from([
4040                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4041                (42i64.to_le_bytes().to_vec(), "v7".to_string())
4042            ])
4043        );
4044
4045        // Duplicate `update_by_keys`.
4046        let mut table_txn =
4047            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4048        let n = table_txn
4049            .update_by_keys(
4050                [
4051                    (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4052                    (42i64.to_le_bytes().to_vec(), "v7".to_string()),
4053                ],
4054                0,
4055            )
4056            .unwrap();
4057        assert_eq!(n, Diff::from(2));
4058        let pending = table_txn.pending::<Vec<u8>, String>();
4059        assert!(pending.is_empty());
4060        commit(&mut table, pending);
4061        assert_eq!(
4062            table,
4063            BTreeMap::from([
4064                (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4065                (42i64.to_le_bytes().to_vec(), "v7".to_string())
4066            ])
4067        );
4068
4069        // Test `delete_by_key`
4070        let mut table_txn =
4071            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4072        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
4073        assert_eq!(prev, Some("v9".to_string()));
4074        let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
4075        assert_none!(prev);
4076        let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
4077        assert_none!(prev);
4078        let pending = table_txn.pending();
4079        assert_eq!(
4080            pending,
4081            vec![(
4082                1i64.to_le_bytes().to_vec(),
4083                "v9".to_string(),
4084                Diff::MINUS_ONE
4085            ),]
4086        );
4087        commit(&mut table, pending);
4088        assert_eq!(
4089            table,
4090            BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
4091        );
4092
4093        // Test `delete_by_keys`
4094        let mut table_txn =
4095            TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4096        let prevs = table_txn.delete_by_keys(
4097            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4098            0,
4099        );
4100        assert_eq!(
4101            prevs,
4102            vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
4103        );
4104        let prevs = table_txn.delete_by_keys(
4105            [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4106            1,
4107        );
4108        assert_eq!(prevs, vec![]);
4109        let prevs = table_txn.delete_by_keys(
4110            [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4111            2,
4112        );
4113        assert_eq!(prevs, vec![]);
4114        let pending = table_txn.pending();
4115        assert_eq!(
4116            pending,
4117            vec![(
4118                42i64.to_le_bytes().to_vec(),
4119                "v7".to_string(),
4120                Diff::MINUS_ONE
4121            ),]
4122        );
4123        commit(&mut table, pending);
4124        assert_eq!(table, BTreeMap::new());
4125    }
4126
4127    #[mz_ore::test(tokio::test)]
4128    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
4129    async fn test_savepoint() {
4130        const VERSION: Version = Version::new(26, 0, 0);
4131        let mut persist_cache = PersistClientCache::new_no_metrics();
4132        persist_cache.cfg.build_version = VERSION;
4133        let persist_client = persist_cache
4134            .open(PersistLocation::new_in_mem())
4135            .await
4136            .unwrap();
4137        let state_builder = TestCatalogStateBuilder::new(persist_client)
4138            .with_default_deploy_generation()
4139            .with_version(VERSION);
4140
4141        // Initialize catalog.
4142        let _ = state_builder
4143            .clone()
4144            .unwrap_build()
4145            .await
4146            .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4147            .await
4148            .unwrap()
4149            .0;
4150        let mut savepoint_state = state_builder
4151            .unwrap_build()
4152            .await
4153            .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4154            .await
4155            .unwrap()
4156            .0;
4157
4158        let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4159        assert!(!initial_snapshot.is_empty());
4160
4161        let db_name = "db";
4162        let db_owner = RoleId::User(42);
4163        let db_privileges = Vec::new();
4164        let mut txn = savepoint_state.transaction().await.unwrap();
4165        let (db_id, db_oid) = txn
4166            .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4167            .unwrap();
4168        let commit_ts = txn.upper();
4169        txn.commit_internal(commit_ts).await.unwrap();
4170        let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4171        let update = updates.into_element();
4172
4173        assert_eq!(update.diff, StateDiff::Addition);
4174
4175        let db = match update.kind {
4176            memory::objects::StateUpdateKind::Database(db) => db,
4177            update => panic!("unexpected update: {update:?}"),
4178        };
4179
4180        assert_eq!(db_id, db.id);
4181        assert_eq!(db_oid, db.oid);
4182        assert_eq!(db_name, db.name);
4183        assert_eq!(db_owner, db.owner_id);
4184        assert_eq!(db_privileges, db.privileges);
4185    }
4186
4187    #[mz_ore::test]
4188    fn test_allocate_introspection_source_index_id() {
4189        let cluster_variant: u8 = 0b0000_0001;
4190        let cluster_id_inner: u64 =
4191            0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4192        let timely_messages_received_log_variant: u8 = 0b0000_1000;
4193
4194        let cluster_id = ClusterId::System(cluster_id_inner);
4195        let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4196
4197        let introspection_source_index_id: u64 =
4198            0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4199
4200        // Sanity check that `introspection_source_index_id` contains `cluster_variant`.
4201        {
4202            let mut cluster_variant_mask = 0xFF << 56;
4203            cluster_variant_mask &= introspection_source_index_id;
4204            cluster_variant_mask >>= 56;
4205            assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4206        }
4207
4208        // Sanity check that `introspection_source_index_id` contains `cluster_id_inner`.
4209        {
4210            let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4211            cluster_id_inner_mask &= introspection_source_index_id;
4212            cluster_id_inner_mask >>= 8;
4213            assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4214        }
4215
4216        // Sanity check that `introspection_source_index_id` contains `timely_messages_received_log_variant`.
4217        {
4218            let mut log_variant_mask = 0xFF;
4219            log_variant_mask &= introspection_source_index_id;
4220            assert_eq!(
4221                log_variant_mask,
4222                u64::from(timely_messages_received_log_variant)
4223            );
4224        }
4225
4226        let (catalog_item_id, global_id) =
4227            Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4228
4229        assert_eq!(
4230            catalog_item_id,
4231            CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4232        );
4233        assert_eq!(
4234            global_id,
4235            GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4236        );
4237    }
4238}