1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67 SCHEMA_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
68 SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
69 USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
70};
71use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
72
73type Timestamp = u64;
74
75#[derive(Derivative)]
78#[derivative(Debug)]
79pub struct Transaction<'a> {
80 #[derivative(Debug = "ignore")]
81 #[derivative(PartialEq = "ignore")]
82 durable_catalog: &'a mut dyn DurableCatalogState,
83 databases: TableTransaction<DatabaseKey, DatabaseValue>,
84 schemas: TableTransaction<SchemaKey, SchemaValue>,
85 items: TableTransaction<ItemKey, ItemValue>,
86 comments: TableTransaction<CommentKey, CommentValue>,
87 roles: TableTransaction<RoleKey, RoleValue>,
88 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
89 clusters: TableTransaction<ClusterKey, ClusterValue>,
90 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
91 introspection_sources:
92 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
93 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
94 configs: TableTransaction<ConfigKey, ConfigValue>,
95 settings: TableTransaction<SettingKey, SettingValue>,
96 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
97 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
98 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
99 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
100 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
101 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
102 storage_collection_metadata:
103 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
104 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
105 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
106 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
109 upper: mz_repr::Timestamp,
111 op_id: Timestamp,
113}
114
115impl<'a> Transaction<'a> {
116 pub fn new(
117 durable_catalog: &'a mut dyn DurableCatalogState,
118 Snapshot {
119 databases,
120 schemas,
121 roles,
122 role_auth,
123 items,
124 comments,
125 clusters,
126 network_policies,
127 cluster_replicas,
128 introspection_sources,
129 id_allocator,
130 configs,
131 settings,
132 source_references,
133 system_object_mappings,
134 system_configurations,
135 default_privileges,
136 system_privileges,
137 storage_collection_metadata,
138 unfinalized_shards,
139 txn_wal_shard,
140 }: Snapshot,
141 upper: mz_repr::Timestamp,
142 ) -> Result<Transaction<'a>, CatalogError> {
143 Ok(Transaction {
144 durable_catalog,
145 databases: TableTransaction::new_with_uniqueness_fn(
146 databases,
147 |a: &DatabaseValue, b| a.name == b.name,
148 )?,
149 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
150 a.database_id == b.database_id && a.name == b.name
151 })?,
152 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
153 a.schema_id == b.schema_id && a.name == b.name && {
154 let a_type = a.item_type();
156 let b_type = b.item_type();
157 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
158 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
159 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
160 }
161 })?,
162 comments: TableTransaction::new(comments)?,
163 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
164 a.name == b.name
165 })?,
166 role_auth: TableTransaction::new(role_auth)?,
167 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
168 a.name == b.name
169 })?,
170 network_policies: TableTransaction::new_with_uniqueness_fn(
171 network_policies,
172 |a: &NetworkPolicyValue, b| a.name == b.name,
173 )?,
174 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
175 cluster_replicas,
176 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
177 )?,
178 introspection_sources: TableTransaction::new(introspection_sources)?,
179 id_allocator: TableTransaction::new(id_allocator)?,
180 configs: TableTransaction::new(configs)?,
181 settings: TableTransaction::new(settings)?,
182 source_references: TableTransaction::new(source_references)?,
183 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
184 system_configurations: TableTransaction::new(system_configurations)?,
185 default_privileges: TableTransaction::new(default_privileges)?,
186 system_privileges: TableTransaction::new(system_privileges)?,
187 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
188 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
189 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
193 audit_log_updates: Vec::new(),
194 upper,
195 op_id: 0,
196 })
197 }
198
199 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
200 let key = ItemKey { id: *id };
201 self.items
202 .get(&key)
203 .map(|v| DurableType::from_key_value(key, v.clone()))
204 }
205
206 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
207 self.items
208 .items()
209 .into_iter()
210 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
211 .sorted_by_key(|Item { id, .. }| *id)
212 }
213
214 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
215 self.insert_audit_log_events([event]);
216 }
217
218 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
219 let events = events
220 .into_iter()
221 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
222 self.audit_log_updates.extend(events);
223 }
224
225 pub fn insert_user_database(
226 &mut self,
227 database_name: &str,
228 owner_id: RoleId,
229 privileges: Vec<MzAclItem>,
230 temporary_oids: &HashSet<u32>,
231 ) -> Result<(DatabaseId, u32), CatalogError> {
232 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
233 let id = DatabaseId::User(id);
234 let oid = self.allocate_oid(temporary_oids)?;
235 self.insert_database(id, database_name, owner_id, privileges, oid)?;
236 Ok((id, oid))
237 }
238
239 pub(crate) fn insert_database(
240 &mut self,
241 id: DatabaseId,
242 database_name: &str,
243 owner_id: RoleId,
244 privileges: Vec<MzAclItem>,
245 oid: u32,
246 ) -> Result<u32, CatalogError> {
247 match self.databases.insert(
248 DatabaseKey { id },
249 DatabaseValue {
250 name: database_name.to_string(),
251 owner_id,
252 privileges,
253 oid,
254 },
255 self.op_id,
256 ) {
257 Ok(_) => Ok(oid),
258 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
259 }
260 }
261
262 pub fn insert_user_schema(
263 &mut self,
264 database_id: DatabaseId,
265 schema_name: &str,
266 owner_id: RoleId,
267 privileges: Vec<MzAclItem>,
268 temporary_oids: &HashSet<u32>,
269 ) -> Result<(SchemaId, u32), CatalogError> {
270 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
271 let id = SchemaId::User(id);
272 let oid = self.allocate_oid(temporary_oids)?;
273 self.insert_schema(
274 id,
275 Some(database_id),
276 schema_name.to_string(),
277 owner_id,
278 privileges,
279 oid,
280 )?;
281 Ok((id, oid))
282 }
283
284 pub fn insert_system_schema(
285 &mut self,
286 schema_id: u64,
287 schema_name: &str,
288 owner_id: RoleId,
289 privileges: Vec<MzAclItem>,
290 oid: u32,
291 ) -> Result<(), CatalogError> {
292 let id = SchemaId::System(schema_id);
293 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
294 }
295
296 pub(crate) fn insert_schema(
297 &mut self,
298 schema_id: SchemaId,
299 database_id: Option<DatabaseId>,
300 schema_name: String,
301 owner_id: RoleId,
302 privileges: Vec<MzAclItem>,
303 oid: u32,
304 ) -> Result<(), CatalogError> {
305 match self.schemas.insert(
306 SchemaKey { id: schema_id },
307 SchemaValue {
308 database_id,
309 name: schema_name.clone(),
310 owner_id,
311 privileges,
312 oid,
313 },
314 self.op_id,
315 ) {
316 Ok(_) => Ok(()),
317 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
318 }
319 }
320
321 pub fn insert_builtin_role(
322 &mut self,
323 id: RoleId,
324 name: String,
325 attributes: RoleAttributesRaw,
326 membership: RoleMembership,
327 vars: RoleVars,
328 oid: u32,
329 ) -> Result<RoleId, CatalogError> {
330 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
331 self.insert_role(id, name, attributes, membership, vars, oid)?;
332 Ok(id)
333 }
334
335 pub fn insert_user_role(
336 &mut self,
337 name: String,
338 attributes: RoleAttributesRaw,
339 membership: RoleMembership,
340 vars: RoleVars,
341 temporary_oids: &HashSet<u32>,
342 ) -> Result<(RoleId, u32), CatalogError> {
343 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
344 let id = RoleId::User(id);
345 let oid = self.allocate_oid(temporary_oids)?;
346 self.insert_role(id, name, attributes, membership, vars, oid)?;
347 Ok((id, oid))
348 }
349
350 fn insert_role(
351 &mut self,
352 id: RoleId,
353 name: String,
354 attributes: RoleAttributesRaw,
355 membership: RoleMembership,
356 vars: RoleVars,
357 oid: u32,
358 ) -> Result<(), CatalogError> {
359 if let Some(ref password) = attributes.password {
360 let hash = mz_auth::hash::scram256_hash(
361 password,
362 &attributes
363 .scram_iterations
364 .or_else(|| {
365 soft_panic_or_log!(
366 "Hash iterations must be set if a password is provided."
367 );
368 None
369 })
370 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
373 )
374 .expect("password hash should be valid");
375 match self.role_auth.insert(
376 RoleAuthKey { role_id: id },
377 RoleAuthValue {
378 password_hash: Some(hash),
379 updated_at: SYSTEM_TIME(),
380 },
381 self.op_id,
382 ) {
383 Ok(_) => {}
384 Err(_) => {
385 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
386 }
387 }
388 }
389
390 match self.roles.insert(
391 RoleKey { id },
392 RoleValue {
393 name: name.clone(),
394 attributes: attributes.into(),
395 membership,
396 vars,
397 oid,
398 },
399 self.op_id,
400 ) {
401 Ok(_) => Ok(()),
402 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
403 }
404 }
405
406 pub fn insert_user_cluster(
408 &mut self,
409 cluster_id: ClusterId,
410 cluster_name: &str,
411 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
412 owner_id: RoleId,
413 privileges: Vec<MzAclItem>,
414 config: ClusterConfig,
415 temporary_oids: &HashSet<u32>,
416 ) -> Result<(), CatalogError> {
417 self.insert_cluster(
418 cluster_id,
419 cluster_name,
420 introspection_source_indexes,
421 owner_id,
422 privileges,
423 config,
424 temporary_oids,
425 )
426 }
427
428 pub fn insert_system_cluster(
430 &mut self,
431 cluster_name: &str,
432 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
433 privileges: Vec<MzAclItem>,
434 owner_id: RoleId,
435 config: ClusterConfig,
436 temporary_oids: &HashSet<u32>,
437 ) -> Result<ClusterId, CatalogError> {
438 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
439 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
440 self.insert_cluster(
441 cluster_id,
442 cluster_name,
443 introspection_source_indexes,
444 owner_id,
445 privileges,
446 config,
447 temporary_oids,
448 )?;
449 Ok(cluster_id)
450 }
451
452 fn insert_cluster(
453 &mut self,
454 cluster_id: ClusterId,
455 cluster_name: &str,
456 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457 owner_id: RoleId,
458 privileges: Vec<MzAclItem>,
459 config: ClusterConfig,
460 temporary_oids: &HashSet<u32>,
461 ) -> Result<(), CatalogError> {
462 if let Err(_) = self.clusters.insert(
463 ClusterKey { id: cluster_id },
464 ClusterValue {
465 name: cluster_name.to_string(),
466 owner_id,
467 privileges,
468 config,
469 },
470 self.op_id,
471 ) {
472 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473 };
474
475 let amount = usize_to_u64(introspection_source_indexes.len());
476 let oids = self.allocate_oids(amount, temporary_oids)?;
477 let introspection_source_indexes: Vec<_> = introspection_source_indexes
478 .into_iter()
479 .zip_eq(oids)
480 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481 .collect();
482 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483 let introspection_source_index = IntrospectionSourceIndex {
484 cluster_id,
485 name: builtin.name.to_string(),
486 item_id,
487 index_id,
488 oid,
489 };
490 let (key, value) = introspection_source_index.into_key_value();
491 self.introspection_sources
492 .insert(key, value, self.op_id)
493 .expect("no uniqueness violation");
494 }
495
496 Ok(())
497 }
498
499 pub fn rename_cluster(
500 &mut self,
501 cluster_id: ClusterId,
502 cluster_name: &str,
503 cluster_to_name: &str,
504 ) -> Result<(), CatalogError> {
505 let key = ClusterKey { id: cluster_id };
506
507 match self.clusters.update(
508 |k, v| {
509 if *k == key {
510 let mut value = v.clone();
511 value.name = cluster_to_name.to_string();
512 Some(value)
513 } else {
514 None
515 }
516 },
517 self.op_id,
518 )? {
519 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520 Diff::ONE => Ok(()),
521 n => panic!(
522 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523 ),
524 }
525 }
526
527 pub fn rename_cluster_replica(
528 &mut self,
529 replica_id: ReplicaId,
530 replica_name: &QualifiedReplica,
531 replica_to_name: &str,
532 ) -> Result<(), CatalogError> {
533 let key = ClusterReplicaKey { id: replica_id };
534
535 match self.cluster_replicas.update(
536 |k, v| {
537 if *k == key {
538 let mut value = v.clone();
539 value.name = replica_to_name.to_string();
540 Some(value)
541 } else {
542 None
543 }
544 },
545 self.op_id,
546 )? {
547 Diff::ZERO => {
548 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549 }
550 Diff::ONE => Ok(()),
551 n => panic!(
552 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553 ),
554 }
555 }
556
557 pub fn insert_cluster_replica(
558 &mut self,
559 cluster_id: ClusterId,
560 replica_name: &str,
561 config: ReplicaConfig,
562 owner_id: RoleId,
563 ) -> Result<ReplicaId, CatalogError> {
564 let replica_id = match cluster_id {
565 ClusterId::System(_) => self.allocate_system_replica_id()?,
566 ClusterId::User(_) => self.allocate_user_replica_id()?,
567 };
568 self.insert_cluster_replica_with_id(
569 cluster_id,
570 replica_id,
571 replica_name,
572 config,
573 owner_id,
574 )?;
575 Ok(replica_id)
576 }
577
578 pub(crate) fn insert_cluster_replica_with_id(
579 &mut self,
580 cluster_id: ClusterId,
581 replica_id: ReplicaId,
582 replica_name: &str,
583 config: ReplicaConfig,
584 owner_id: RoleId,
585 ) -> Result<(), CatalogError> {
586 if let Err(_) = self.cluster_replicas.insert(
587 ClusterReplicaKey { id: replica_id },
588 ClusterReplicaValue {
589 cluster_id,
590 name: replica_name.into(),
591 config,
592 owner_id,
593 },
594 self.op_id,
595 ) {
596 let cluster = self
597 .clusters
598 .get(&ClusterKey { id: cluster_id })
599 .expect("cluster exists");
600 return Err(SqlCatalogError::DuplicateReplica(
601 replica_name.to_string(),
602 cluster.name.to_string(),
603 )
604 .into());
605 };
606 Ok(())
607 }
608
609 pub fn insert_user_network_policy(
610 &mut self,
611 name: String,
612 rules: Vec<NetworkPolicyRule>,
613 privileges: Vec<MzAclItem>,
614 owner_id: RoleId,
615 temporary_oids: &HashSet<u32>,
616 ) -> Result<NetworkPolicyId, CatalogError> {
617 let oid = self.allocate_oid(temporary_oids)?;
618 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619 let id = NetworkPolicyId::User(id);
620 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621 }
622
623 pub fn insert_network_policy(
624 &mut self,
625 id: NetworkPolicyId,
626 name: String,
627 rules: Vec<NetworkPolicyRule>,
628 privileges: Vec<MzAclItem>,
629 owner_id: RoleId,
630 oid: u32,
631 ) -> Result<NetworkPolicyId, CatalogError> {
632 match self.network_policies.insert(
633 NetworkPolicyKey { id },
634 NetworkPolicyValue {
635 name: name.clone(),
636 rules,
637 privileges,
638 owner_id,
639 oid,
640 },
641 self.op_id,
642 ) {
643 Ok(_) => Ok(id),
644 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645 }
646 }
647
648 pub fn update_introspection_source_index_gids(
653 &mut self,
654 mappings: impl Iterator<
655 Item = (
656 ClusterId,
657 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658 ),
659 >,
660 ) -> Result<(), CatalogError> {
661 for (cluster_id, updates) in mappings {
662 for (name, item_id, index_id, oid) in updates {
663 let introspection_source_index = IntrospectionSourceIndex {
664 cluster_id,
665 name,
666 item_id,
667 index_id,
668 oid,
669 };
670 let (key, value) = introspection_source_index.into_key_value();
671
672 let prev = self
673 .introspection_sources
674 .set(key, Some(value), self.op_id)?;
675 if prev.is_none() {
676 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677 "{index_id}"
678 ))
679 .into());
680 }
681 }
682 }
683 Ok(())
684 }
685
686 pub fn insert_user_item(
687 &mut self,
688 id: CatalogItemId,
689 global_id: GlobalId,
690 schema_id: SchemaId,
691 item_name: &str,
692 create_sql: String,
693 owner_id: RoleId,
694 privileges: Vec<MzAclItem>,
695 temporary_oids: &HashSet<u32>,
696 versions: BTreeMap<RelationVersion, GlobalId>,
697 ) -> Result<u32, CatalogError> {
698 let oid = self.allocate_oid(temporary_oids)?;
699 self.insert_item(
700 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701 )?;
702 Ok(oid)
703 }
704
705 pub fn insert_item(
706 &mut self,
707 id: CatalogItemId,
708 oid: u32,
709 global_id: GlobalId,
710 schema_id: SchemaId,
711 item_name: &str,
712 create_sql: String,
713 owner_id: RoleId,
714 privileges: Vec<MzAclItem>,
715 extra_versions: BTreeMap<RelationVersion, GlobalId>,
716 ) -> Result<(), CatalogError> {
717 match self.items.insert(
718 ItemKey { id },
719 ItemValue {
720 schema_id,
721 name: item_name.to_string(),
722 create_sql,
723 owner_id,
724 privileges,
725 oid,
726 global_id,
727 extra_versions,
728 },
729 self.op_id,
730 ) {
731 Ok(_) => Ok(()),
732 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733 }
734 }
735
736 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738 }
739
740 pub fn get_and_increment_id_by(
741 &mut self,
742 key: String,
743 amount: u64,
744 ) -> Result<Vec<u64>, CatalogError> {
745 assert!(
746 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747 "system item IDs cannot be allocated outside of bootstrap"
748 );
749
750 let current_id = self
751 .id_allocator
752 .items()
753 .get(&IdAllocKey { name: key.clone() })
754 .unwrap_or_else(|| panic!("{key} id allocator missing"))
755 .next_id;
756 let next_id = current_id
757 .checked_add(amount)
758 .ok_or(SqlCatalogError::IdExhaustion)?;
759 let prev = self.id_allocator.set(
760 IdAllocKey { name: key },
761 Some(IdAllocValue { next_id }),
762 self.op_id,
763 )?;
764 assert_eq!(
765 prev,
766 Some(IdAllocValue {
767 next_id: current_id
768 })
769 );
770 Ok((current_id..next_id).collect())
771 }
772
773 pub fn allocate_system_item_ids(
774 &mut self,
775 amount: u64,
776 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777 assert!(
778 !self.durable_catalog.is_bootstrap_complete(),
779 "we can only allocate system item IDs during bootstrap"
780 );
781 Ok(self
782 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783 .into_iter()
784 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786 .collect())
787 }
788
789 pub fn allocate_introspection_source_index_id(
821 cluster_id: &ClusterId,
822 log_variant: LogVariant,
823 ) -> (CatalogItemId, GlobalId) {
824 let cluster_variant: u8 = match cluster_id {
825 ClusterId::System(_) => 1,
826 ClusterId::User(_) => 2,
827 };
828 let cluster_id: u64 = cluster_id.inner_id();
829 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830 assert_eq!(
831 CLUSTER_ID_MASK & cluster_id,
832 0,
833 "invalid cluster ID: {cluster_id}"
834 );
835 let log_variant: u8 = match log_variant {
836 LogVariant::Timely(TimelyLog::Operates) => 1,
837 LogVariant::Timely(TimelyLog::Channels) => 2,
838 LogVariant::Timely(TimelyLog::Elapsed) => 3,
839 LogVariant::Timely(TimelyLog::Histogram) => 4,
840 LogVariant::Timely(TimelyLog::Addresses) => 5,
841 LogVariant::Timely(TimelyLog::Parks) => 6,
842 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844 LogVariant::Timely(TimelyLog::Reachability) => 9,
845 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849 LogVariant::Differential(DifferentialLog::Sharing) => 14,
850 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
863 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
864 LogVariant::Compute(ComputeLog::LirMapping) => 30,
865 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
866 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
867 LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
868 };
869
870 let mut id: u64 = u64::from(cluster_variant) << 56;
871 id |= cluster_id << 8;
872 id |= u64::from(log_variant);
873
874 (
875 CatalogItemId::IntrospectionSourceIndex(id),
876 GlobalId::IntrospectionSourceIndex(id),
877 )
878 }
879
880 pub fn allocate_user_item_ids(
881 &mut self,
882 amount: u64,
883 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
884 Ok(self
885 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
886 .into_iter()
887 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
889 .collect())
890 }
891
892 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
893 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
894 Ok(ReplicaId::User(id))
895 }
896
897 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
898 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
899 Ok(ReplicaId::System(id))
900 }
901
902 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
903 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
904 }
905
906 #[mz_ore::instrument]
909 fn allocate_oids(
910 &mut self,
911 amount: u64,
912 temporary_oids: &HashSet<u32>,
913 ) -> Result<Vec<u32>, CatalogError> {
914 struct UserOid(u32);
917
918 impl UserOid {
919 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
920 if oid < FIRST_USER_OID {
921 Err(anyhow!("invalid user OID {oid}"))
922 } else {
923 Ok(UserOid(oid))
924 }
925 }
926 }
927
928 impl std::ops::AddAssign<u32> for UserOid {
929 fn add_assign(&mut self, rhs: u32) {
930 let (res, overflow) = self.0.overflowing_add(rhs);
931 self.0 = if overflow { FIRST_USER_OID + res } else { res };
932 }
933 }
934
935 if amount > u32::MAX.into() {
936 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
937 }
938
939 let mut allocated_oids = HashSet::with_capacity(
945 self.databases.len()
946 + self.schemas.len()
947 + self.roles.len()
948 + self.items.len()
949 + self.introspection_sources.len()
950 + temporary_oids.len(),
951 );
952 self.databases.for_values(|_, value| {
953 allocated_oids.insert(value.oid);
954 });
955 self.schemas.for_values(|_, value| {
956 allocated_oids.insert(value.oid);
957 });
958 self.roles.for_values(|_, value| {
959 allocated_oids.insert(value.oid);
960 });
961 self.items.for_values(|_, value| {
962 allocated_oids.insert(value.oid);
963 });
964 self.introspection_sources.for_values(|_, value| {
965 allocated_oids.insert(value.oid);
966 });
967
968 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
969
970 let start_oid: u32 = self
971 .id_allocator
972 .items()
973 .get(&IdAllocKey {
974 name: OID_ALLOC_KEY.to_string(),
975 })
976 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
977 .next_id
978 .try_into()
979 .expect("we should never persist an oid outside of the u32 range");
980 let mut current_oid = UserOid::new(start_oid)
981 .expect("we should never persist an oid outside of user OID range");
982 let mut oids = Vec::new();
983 while oids.len() < u64_to_usize(amount) {
984 if !is_allocated(current_oid.0) {
985 oids.push(current_oid.0);
986 }
987 current_oid += 1;
988
989 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
990 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
992 }
993 }
994
995 let next_id = current_oid.0;
996 let prev = self.id_allocator.set(
997 IdAllocKey {
998 name: OID_ALLOC_KEY.to_string(),
999 },
1000 Some(IdAllocValue {
1001 next_id: next_id.into(),
1002 }),
1003 self.op_id,
1004 )?;
1005 assert_eq!(
1006 prev,
1007 Some(IdAllocValue {
1008 next_id: start_oid.into(),
1009 })
1010 );
1011
1012 Ok(oids)
1013 }
1014
1015 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1018 self.allocate_oids(1, temporary_oids)
1019 .map(|oids| oids.into_element())
1020 }
1021
1022 pub fn current_snapshot(&self) -> Snapshot {
1030 Snapshot {
1031 databases: self.databases.current_items_proto(),
1032 schemas: self.schemas.current_items_proto(),
1033 roles: self.roles.current_items_proto(),
1034 role_auth: self.role_auth.current_items_proto(),
1035 items: self.items.current_items_proto(),
1036 comments: self.comments.current_items_proto(),
1037 clusters: self.clusters.current_items_proto(),
1038 network_policies: self.network_policies.current_items_proto(),
1039 cluster_replicas: self.cluster_replicas.current_items_proto(),
1040 introspection_sources: self.introspection_sources.current_items_proto(),
1041 id_allocator: self.id_allocator.current_items_proto(),
1042 configs: self.configs.current_items_proto(),
1043 settings: self.settings.current_items_proto(),
1044 system_object_mappings: self.system_gid_mapping.current_items_proto(),
1045 system_configurations: self.system_configurations.current_items_proto(),
1046 default_privileges: self.default_privileges.current_items_proto(),
1047 source_references: self.source_references.current_items_proto(),
1048 system_privileges: self.system_privileges.current_items_proto(),
1049 storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1050 unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1051 txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1052 }
1053 }
1054
1055 pub(crate) fn insert_id_allocator(
1056 &mut self,
1057 name: String,
1058 next_id: u64,
1059 ) -> Result<(), CatalogError> {
1060 match self.id_allocator.insert(
1061 IdAllocKey { name: name.clone() },
1062 IdAllocValue { next_id },
1063 self.op_id,
1064 ) {
1065 Ok(_) => Ok(()),
1066 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1067 }
1068 }
1069
1070 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1077 let prev = self
1078 .databases
1079 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1080 if prev.is_some() {
1081 Ok(())
1082 } else {
1083 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1084 }
1085 }
1086
1087 pub fn remove_databases(
1094 &mut self,
1095 databases: &BTreeSet<DatabaseId>,
1096 ) -> Result<(), CatalogError> {
1097 if databases.is_empty() {
1098 return Ok(());
1099 }
1100
1101 let to_remove = databases
1102 .iter()
1103 .map(|id| (DatabaseKey { id: *id }, None))
1104 .collect();
1105 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1106 prev.retain(|_k, val| val.is_none());
1107
1108 if !prev.is_empty() {
1109 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1110 return Err(SqlCatalogError::UnknownDatabase(err).into());
1111 }
1112
1113 Ok(())
1114 }
1115
1116 pub fn remove_schema(
1123 &mut self,
1124 database_id: &Option<DatabaseId>,
1125 schema_id: &SchemaId,
1126 ) -> Result<(), CatalogError> {
1127 let prev = self
1128 .schemas
1129 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1130 if prev.is_some() {
1131 Ok(())
1132 } else {
1133 let database_name = match database_id {
1134 Some(id) => format!("{id}."),
1135 None => "".to_string(),
1136 };
1137 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1138 }
1139 }
1140
1141 pub fn remove_schemas(
1148 &mut self,
1149 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1150 ) -> Result<(), CatalogError> {
1151 if schemas.is_empty() {
1152 return Ok(());
1153 }
1154
1155 let to_remove = schemas
1156 .iter()
1157 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1158 .collect();
1159 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1160 prev.retain(|_k, v| v.is_none());
1161
1162 if !prev.is_empty() {
1163 let err = prev
1164 .keys()
1165 .map(|k| {
1166 let db_spec = schemas.get(&k.id).expect("should_exist");
1167 let db_name = match db_spec {
1168 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1169 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1170 };
1171 format!("{}.{}", db_name, k.id)
1172 })
1173 .join(", ");
1174
1175 return Err(SqlCatalogError::UnknownSchema(err).into());
1176 }
1177
1178 Ok(())
1179 }
1180
1181 pub fn remove_source_references(
1182 &mut self,
1183 source_id: CatalogItemId,
1184 ) -> Result<(), CatalogError> {
1185 let deleted = self
1186 .source_references
1187 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1188 .is_some();
1189 if deleted {
1190 Ok(())
1191 } else {
1192 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1193 }
1194 }
1195
1196 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1203 assert!(
1204 roles.iter().all(|id| id.is_user()),
1205 "cannot delete non-user roles"
1206 );
1207 self.remove_roles(roles)
1208 }
1209
1210 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1217 if roles.is_empty() {
1218 return Ok(());
1219 }
1220
1221 let to_remove_keys = roles
1222 .iter()
1223 .map(|role_id| RoleKey { id: *role_id })
1224 .collect::<Vec<_>>();
1225
1226 let to_remove_roles = to_remove_keys
1227 .iter()
1228 .map(|role_key| (role_key.clone(), None))
1229 .collect();
1230
1231 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1232
1233 let to_remove_role_auth = to_remove_keys
1234 .iter()
1235 .map(|role_key| {
1236 (
1237 RoleAuthKey {
1238 role_id: role_key.id,
1239 },
1240 None,
1241 )
1242 })
1243 .collect();
1244
1245 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1246
1247 prev.retain(|_k, v| v.is_none());
1248 if !prev.is_empty() {
1249 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1250 return Err(SqlCatalogError::UnknownRole(err).into());
1251 }
1252
1253 role_auth_prev.retain(|_k, v| v.is_none());
1254 Ok(())
1258 }
1259
1260 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1267 if clusters.is_empty() {
1268 return Ok(());
1269 }
1270
1271 let to_remove = clusters
1272 .iter()
1273 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1274 .collect();
1275 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1276
1277 prev.retain(|_k, v| v.is_none());
1278 if !prev.is_empty() {
1279 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1280 return Err(SqlCatalogError::UnknownCluster(err).into());
1281 }
1282
1283 self.cluster_replicas
1289 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1290 self.introspection_sources
1291 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1292
1293 Ok(())
1294 }
1295
1296 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1303 let deleted = self
1304 .cluster_replicas
1305 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1306 .is_some();
1307 if deleted {
1308 Ok(())
1309 } else {
1310 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1311 }
1312 }
1313
1314 pub fn remove_cluster_replicas(
1321 &mut self,
1322 replicas: &BTreeSet<ReplicaId>,
1323 ) -> Result<(), CatalogError> {
1324 if replicas.is_empty() {
1325 return Ok(());
1326 }
1327
1328 let to_remove = replicas
1329 .iter()
1330 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1331 .collect();
1332 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1333
1334 prev.retain(|_k, v| v.is_none());
1335 if !prev.is_empty() {
1336 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1337 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1338 }
1339
1340 Ok(())
1341 }
1342
1343 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1350 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1351 if prev.is_some() {
1352 Ok(())
1353 } else {
1354 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1355 }
1356 }
1357
1358 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1365 if ids.is_empty() {
1366 return Ok(());
1367 }
1368
1369 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1370 let n = self.items.delete_by_keys(ks, self.op_id).len();
1371 if n == ids.len() {
1372 Ok(())
1373 } else {
1374 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1375 let mut unknown = ids.difference(&item_ids);
1376 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1377 }
1378 }
1379
1380 pub fn remove_system_object_mappings(
1387 &mut self,
1388 descriptions: BTreeSet<SystemObjectDescription>,
1389 ) -> Result<(), CatalogError> {
1390 if descriptions.is_empty() {
1391 return Ok(());
1392 }
1393
1394 let ks: Vec<_> = descriptions
1395 .clone()
1396 .into_iter()
1397 .map(|desc| GidMappingKey {
1398 schema_name: desc.schema_name,
1399 object_type: desc.object_type,
1400 object_name: desc.object_name,
1401 })
1402 .collect();
1403 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1404
1405 if n == descriptions.len() {
1406 Ok(())
1407 } else {
1408 let item_descriptions = self
1409 .system_gid_mapping
1410 .items()
1411 .keys()
1412 .map(|k| SystemObjectDescription {
1413 schema_name: k.schema_name.clone(),
1414 object_type: k.object_type.clone(),
1415 object_name: k.object_name.clone(),
1416 })
1417 .collect();
1418 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1419 format!(
1420 "{} {}.{}",
1421 desc.object_type, desc.schema_name, desc.object_name
1422 )
1423 });
1424 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1425 }
1426 }
1427
1428 pub fn remove_introspection_source_indexes(
1435 &mut self,
1436 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1437 ) -> Result<(), CatalogError> {
1438 if introspection_source_indexes.is_empty() {
1439 return Ok(());
1440 }
1441
1442 let ks: Vec<_> = introspection_source_indexes
1443 .clone()
1444 .into_iter()
1445 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1446 .collect();
1447 let n = self
1448 .introspection_sources
1449 .delete_by_keys(ks, self.op_id)
1450 .len();
1451 if n == introspection_source_indexes.len() {
1452 Ok(())
1453 } else {
1454 let txn_indexes = self
1455 .introspection_sources
1456 .items()
1457 .keys()
1458 .map(|k| (k.cluster_id, k.name.clone()))
1459 .collect();
1460 let mut unknown = introspection_source_indexes
1461 .difference(&txn_indexes)
1462 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1463 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1464 }
1465 }
1466
1467 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1474 let updated =
1475 self.items
1476 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1477 if updated {
1478 Ok(())
1479 } else {
1480 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1481 }
1482 }
1483
1484 pub fn update_items(
1492 &mut self,
1493 items: BTreeMap<CatalogItemId, Item>,
1494 ) -> Result<(), CatalogError> {
1495 if items.is_empty() {
1496 return Ok(());
1497 }
1498
1499 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1500 let kvs: Vec<_> = items
1501 .clone()
1502 .into_iter()
1503 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1504 .collect();
1505 let n = self.items.update_by_keys(kvs, self.op_id)?;
1506 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1507 if n == update_ids.len() {
1508 Ok(())
1509 } else {
1510 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1511 let mut unknown = update_ids.difference(&item_ids);
1512 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1513 }
1514 }
1515
1516 pub fn update_role(
1524 &mut self,
1525 id: RoleId,
1526 role: Role,
1527 password: PasswordAction,
1528 ) -> Result<(), CatalogError> {
1529 let key = RoleKey { id };
1530 if self.roles.get(&key).is_some() {
1531 let auth_key = RoleAuthKey { role_id: id };
1532
1533 match password {
1534 PasswordAction::Set(new_password) => {
1535 let hash = mz_auth::hash::scram256_hash(
1536 &new_password.password,
1537 &new_password.scram_iterations,
1538 )
1539 .expect("password hash should be valid");
1540 let value = RoleAuthValue {
1541 password_hash: Some(hash),
1542 updated_at: SYSTEM_TIME(),
1543 };
1544
1545 if self.role_auth.get(&auth_key).is_some() {
1546 self.role_auth
1547 .update_by_key(auth_key.clone(), value, self.op_id)?;
1548 } else {
1549 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1550 }
1551 }
1552 PasswordAction::Clear => {
1553 let value = RoleAuthValue {
1554 password_hash: None,
1555 updated_at: SYSTEM_TIME(),
1556 };
1557 if self.role_auth.get(&auth_key).is_some() {
1558 self.role_auth
1559 .update_by_key(auth_key.clone(), value, self.op_id)?;
1560 }
1561 }
1562 PasswordAction::NoChange => {}
1563 }
1564
1565 self.roles
1566 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1567
1568 Ok(())
1569 } else {
1570 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1571 }
1572 }
1573
1574 pub fn update_roles_without_auth(
1585 &mut self,
1586 roles: BTreeMap<RoleId, Role>,
1587 ) -> Result<(), CatalogError> {
1588 if roles.is_empty() {
1589 return Ok(());
1590 }
1591
1592 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1593 let kvs: Vec<_> = roles
1594 .into_iter()
1595 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1596 .collect();
1597 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1598 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1599
1600 if n == update_role_ids.len() {
1601 Ok(())
1602 } else {
1603 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1604 let mut unknown = update_role_ids.difference(&role_ids);
1605 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1606 }
1607 }
1608
1609 pub fn update_system_object_mappings(
1614 &mut self,
1615 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1616 ) -> Result<(), CatalogError> {
1617 if mappings.is_empty() {
1618 return Ok(());
1619 }
1620
1621 let n = self.system_gid_mapping.update(
1622 |_k, v| {
1623 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1624 let (_, new_value) = mapping.clone().into_key_value();
1625 Some(new_value)
1626 } else {
1627 None
1628 }
1629 },
1630 self.op_id,
1631 )?;
1632
1633 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1634 != mappings.len()
1635 {
1636 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1637 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1638 }
1639
1640 Ok(())
1641 }
1642
1643 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1650 let updated = self.clusters.update_by_key(
1651 ClusterKey { id },
1652 cluster.into_key_value().1,
1653 self.op_id,
1654 )?;
1655 if updated {
1656 Ok(())
1657 } else {
1658 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1659 }
1660 }
1661
1662 pub fn update_cluster_replica(
1669 &mut self,
1670 replica_id: ReplicaId,
1671 replica: ClusterReplica,
1672 ) -> Result<(), CatalogError> {
1673 let updated = self.cluster_replicas.update_by_key(
1674 ClusterReplicaKey { id: replica_id },
1675 replica.into_key_value().1,
1676 self.op_id,
1677 )?;
1678 if updated {
1679 Ok(())
1680 } else {
1681 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1682 }
1683 }
1684
1685 pub fn update_database(
1692 &mut self,
1693 id: DatabaseId,
1694 database: Database,
1695 ) -> Result<(), CatalogError> {
1696 let updated = self.databases.update_by_key(
1697 DatabaseKey { id },
1698 database.into_key_value().1,
1699 self.op_id,
1700 )?;
1701 if updated {
1702 Ok(())
1703 } else {
1704 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1705 }
1706 }
1707
1708 pub fn update_schema(
1715 &mut self,
1716 schema_id: SchemaId,
1717 schema: Schema,
1718 ) -> Result<(), CatalogError> {
1719 let updated = self.schemas.update_by_key(
1720 SchemaKey { id: schema_id },
1721 schema.into_key_value().1,
1722 self.op_id,
1723 )?;
1724 if updated {
1725 Ok(())
1726 } else {
1727 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1728 }
1729 }
1730
1731 pub fn update_network_policy(
1738 &mut self,
1739 id: NetworkPolicyId,
1740 network_policy: NetworkPolicy,
1741 ) -> Result<(), CatalogError> {
1742 let updated = self.network_policies.update_by_key(
1743 NetworkPolicyKey { id },
1744 network_policy.into_key_value().1,
1745 self.op_id,
1746 )?;
1747 if updated {
1748 Ok(())
1749 } else {
1750 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1751 }
1752 }
1753 pub fn remove_network_policies(
1760 &mut self,
1761 network_policies: &BTreeSet<NetworkPolicyId>,
1762 ) -> Result<(), CatalogError> {
1763 if network_policies.is_empty() {
1764 return Ok(());
1765 }
1766
1767 let to_remove = network_policies
1768 .iter()
1769 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1770 .collect();
1771 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1772 assert!(
1773 prev.iter().all(|(k, _)| k.id.is_user()),
1774 "cannot delete non-user network policy"
1775 );
1776
1777 prev.retain(|_k, v| v.is_none());
1778 if !prev.is_empty() {
1779 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1780 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1781 }
1782
1783 Ok(())
1784 }
1785 pub fn set_default_privilege(
1789 &mut self,
1790 role_id: RoleId,
1791 database_id: Option<DatabaseId>,
1792 schema_id: Option<SchemaId>,
1793 object_type: ObjectType,
1794 grantee: RoleId,
1795 privileges: Option<AclMode>,
1796 ) -> Result<(), CatalogError> {
1797 self.default_privileges.set(
1798 DefaultPrivilegesKey {
1799 role_id,
1800 database_id,
1801 schema_id,
1802 object_type,
1803 grantee,
1804 },
1805 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1806 self.op_id,
1807 )?;
1808 Ok(())
1809 }
1810
1811 pub fn set_default_privileges(
1813 &mut self,
1814 default_privileges: Vec<DefaultPrivilege>,
1815 ) -> Result<(), CatalogError> {
1816 if default_privileges.is_empty() {
1817 return Ok(());
1818 }
1819
1820 let default_privileges = default_privileges
1821 .into_iter()
1822 .map(DurableType::into_key_value)
1823 .map(|(k, v)| (k, Some(v)))
1824 .collect();
1825 self.default_privileges
1826 .set_many(default_privileges, self.op_id)?;
1827 Ok(())
1828 }
1829
1830 pub fn set_system_privilege(
1834 &mut self,
1835 grantee: RoleId,
1836 grantor: RoleId,
1837 acl_mode: Option<AclMode>,
1838 ) -> Result<(), CatalogError> {
1839 self.system_privileges.set(
1840 SystemPrivilegesKey { grantee, grantor },
1841 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1842 self.op_id,
1843 )?;
1844 Ok(())
1845 }
1846
1847 pub fn set_system_privileges(
1849 &mut self,
1850 system_privileges: Vec<MzAclItem>,
1851 ) -> Result<(), CatalogError> {
1852 if system_privileges.is_empty() {
1853 return Ok(());
1854 }
1855
1856 let system_privileges = system_privileges
1857 .into_iter()
1858 .map(DurableType::into_key_value)
1859 .map(|(k, v)| (k, Some(v)))
1860 .collect();
1861 self.system_privileges
1862 .set_many(system_privileges, self.op_id)?;
1863 Ok(())
1864 }
1865
1866 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1868 self.settings.set(
1869 SettingKey { name },
1870 value.map(|value| SettingValue { value }),
1871 self.op_id,
1872 )?;
1873 Ok(())
1874 }
1875
1876 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1877 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1878 }
1879
1880 pub fn insert_introspection_source_indexes(
1882 &mut self,
1883 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1884 temporary_oids: &HashSet<u32>,
1885 ) -> Result<(), CatalogError> {
1886 if introspection_source_indexes.is_empty() {
1887 return Ok(());
1888 }
1889
1890 let amount = usize_to_u64(introspection_source_indexes.len());
1891 let oids = self.allocate_oids(amount, temporary_oids)?;
1892 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1893 .into_iter()
1894 .zip_eq(oids)
1895 .map(
1896 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1897 cluster_id,
1898 name,
1899 item_id,
1900 index_id,
1901 oid,
1902 },
1903 )
1904 .collect();
1905
1906 for introspection_source_index in introspection_source_indexes {
1907 let (key, value) = introspection_source_index.into_key_value();
1908 self.introspection_sources.insert(key, value, self.op_id)?;
1909 }
1910
1911 Ok(())
1912 }
1913
1914 pub fn set_system_object_mappings(
1916 &mut self,
1917 mappings: Vec<SystemObjectMapping>,
1918 ) -> Result<(), CatalogError> {
1919 if mappings.is_empty() {
1920 return Ok(());
1921 }
1922
1923 let mappings = mappings
1924 .into_iter()
1925 .map(DurableType::into_key_value)
1926 .map(|(k, v)| (k, Some(v)))
1927 .collect();
1928 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1929 Ok(())
1930 }
1931
1932 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1934 if replicas.is_empty() {
1935 return Ok(());
1936 }
1937
1938 let replicas = replicas
1939 .into_iter()
1940 .map(DurableType::into_key_value)
1941 .map(|(k, v)| (k, Some(v)))
1942 .collect();
1943 self.cluster_replicas.set_many(replicas, self.op_id)?;
1944 Ok(())
1945 }
1946
1947 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1949 match value {
1950 Some(value) => {
1951 let config = Config { key, value };
1952 let (key, value) = config.into_key_value();
1953 self.configs.set(key, Some(value), self.op_id)?;
1954 }
1955 None => {
1956 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1957 }
1958 }
1959 Ok(())
1960 }
1961
1962 pub fn get_config(&self, key: String) -> Option<u64> {
1964 self.configs
1965 .get(&ConfigKey { key })
1966 .map(|entry| entry.value)
1967 }
1968
1969 pub fn get_setting(&self, name: String) -> Option<&str> {
1971 self.settings
1972 .get(&SettingKey { name })
1973 .map(|entry| &*entry.value)
1974 }
1975
1976 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1977 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1978 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1979 }
1980
1981 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1982 self.set_setting(
1983 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1984 Some(shard_id.to_string()),
1985 )
1986 }
1987
1988 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1989 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1990 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1991 }
1992
1993 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1994 self.set_setting(
1995 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1996 Some(shard_id.to_string()),
1997 )
1998 }
1999
2000 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2006 self.set_config(
2007 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2008 Some(
2009 value
2010 .as_millis()
2011 .try_into()
2012 .expect("max wait fits into u64"),
2013 ),
2014 )
2015 }
2016
2017 pub fn set_0dt_deployment_ddl_check_interval(
2024 &mut self,
2025 value: Duration,
2026 ) -> Result<(), CatalogError> {
2027 self.set_config(
2028 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2029 Some(
2030 value
2031 .as_millis()
2032 .try_into()
2033 .expect("ddl check interval fits into u64"),
2034 ),
2035 )
2036 }
2037
2038 pub fn set_enable_0dt_deployment_panic_after_timeout(
2044 &mut self,
2045 value: bool,
2046 ) -> Result<(), CatalogError> {
2047 self.set_config(
2048 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2049 Some(u64::from(value)),
2050 )
2051 }
2052
2053 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2059 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2060 }
2061
2062 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2069 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2070 }
2071
2072 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2079 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2080 }
2081
2082 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2084 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2085 }
2086
2087 pub fn update_comment(
2088 &mut self,
2089 object_id: CommentObjectId,
2090 sub_component: Option<usize>,
2091 comment: Option<String>,
2092 ) -> Result<(), CatalogError> {
2093 let key = CommentKey {
2094 object_id,
2095 sub_component,
2096 };
2097 let value = comment.map(|c| CommentValue { comment: c });
2098 self.comments.set(key, value, self.op_id)?;
2099
2100 Ok(())
2101 }
2102
2103 pub fn drop_comments(
2104 &mut self,
2105 object_ids: &BTreeSet<CommentObjectId>,
2106 ) -> Result<(), CatalogError> {
2107 if object_ids.is_empty() {
2108 return Ok(());
2109 }
2110
2111 self.comments
2112 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2113 Ok(())
2114 }
2115
2116 pub fn update_source_references(
2117 &mut self,
2118 source_id: CatalogItemId,
2119 references: Vec<SourceReference>,
2120 updated_at: u64,
2121 ) -> Result<(), CatalogError> {
2122 let key = SourceReferencesKey { source_id };
2123 let value = SourceReferencesValue {
2124 references,
2125 updated_at,
2126 };
2127 self.source_references.set(key, Some(value), self.op_id)?;
2128 Ok(())
2129 }
2130
2131 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2133 let key = ServerConfigurationKey {
2134 name: name.to_string(),
2135 };
2136 let value = ServerConfigurationValue { value };
2137 self.system_configurations
2138 .set(key, Some(value), self.op_id)?;
2139 Ok(())
2140 }
2141
2142 pub fn remove_system_config(&mut self, name: &str) {
2144 let key = ServerConfigurationKey {
2145 name: name.to_string(),
2146 };
2147 self.system_configurations
2148 .set(key, None, self.op_id)
2149 .expect("cannot have uniqueness violation");
2150 }
2151
2152 pub fn clear_system_configs(&mut self) {
2154 self.system_configurations.delete(|_k, _v| true, self.op_id);
2155 }
2156
2157 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2158 match self.configs.insert(
2159 ConfigKey { key: key.clone() },
2160 ConfigValue { value },
2161 self.op_id,
2162 ) {
2163 Ok(_) => Ok(()),
2164 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2165 }
2166 }
2167
2168 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2169 self.clusters
2170 .items()
2171 .into_iter()
2172 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2173 }
2174
2175 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2176 self.cluster_replicas
2177 .items()
2178 .into_iter()
2179 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2180 }
2181
2182 pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2183 self.databases
2184 .items()
2185 .into_iter()
2186 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2187 }
2188
2189 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2190 self.roles
2191 .items()
2192 .into_iter()
2193 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2194 }
2195
2196 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2197 self.network_policies
2198 .items()
2199 .into_iter()
2200 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2201 }
2202
2203 pub fn get_system_object_mappings(
2204 &self,
2205 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2206 self.system_gid_mapping
2207 .items()
2208 .into_iter()
2209 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2210 }
2211
2212 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2213 self.schemas
2214 .items()
2215 .into_iter()
2216 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2217 }
2218
2219 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2220 self.system_configurations
2221 .items()
2222 .into_iter()
2223 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2224 }
2225
2226 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2227 let key = SchemaKey { id: *id };
2228 self.schemas
2229 .get(&key)
2230 .map(|v| DurableType::from_key_value(key, v.clone()))
2231 }
2232
2233 pub fn get_introspection_source_indexes(
2234 &self,
2235 cluster_id: ClusterId,
2236 ) -> BTreeMap<&str, (GlobalId, u32)> {
2237 self.introspection_sources
2238 .items()
2239 .into_iter()
2240 .filter(|(k, _v)| k.cluster_id == cluster_id)
2241 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2242 .collect()
2243 }
2244
2245 pub fn get_catalog_content_version(&self) -> Option<&str> {
2246 self.settings
2247 .get(&SettingKey {
2248 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2249 })
2250 .map(|value| &*value.value)
2251 }
2252
2253 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2254 self.settings
2255 .get(&SettingKey {
2256 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2257 })
2258 .map(|value| value.value.clone())
2259 }
2260
2261 #[must_use]
2267 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2268 let updates = self.get_op_updates();
2269 self.commit_op();
2270 updates
2271 }
2272
2273 fn get_op_updates(&self) -> Vec<StateUpdate> {
2274 fn get_collection_op_updates<'a, T>(
2275 table_txn: &'a TableTransaction<T::Key, T::Value>,
2276 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2277 op: Timestamp,
2278 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2279 where
2280 T::Key: Ord + Eq + Clone + Debug,
2281 T::Value: Ord + Clone + Debug,
2282 T: DurableType,
2283 {
2284 table_txn
2285 .pending
2286 .iter()
2287 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2288 .filter_map(move |(k, v)| {
2289 if v.ts == op {
2290 let key = k.clone();
2291 let value = v.value.clone();
2292 let diff = v.diff.clone().try_into().expect("invalid diff");
2293 let update = DurableType::from_key_value(key, value);
2294 let kind = kind_fn(update);
2295 Some((kind, diff))
2296 } else {
2297 None
2298 }
2299 })
2300 }
2301
2302 fn get_large_collection_op_updates<'a, T>(
2303 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2304 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2305 op: Timestamp,
2306 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2307 where
2308 T::Key: Ord + Eq + Clone + Debug,
2309 T: DurableType<Value = ()>,
2310 {
2311 collection.iter().filter_map(move |(k, diff, ts)| {
2312 if *ts == op {
2313 let key = k.clone();
2314 let diff = diff.clone().try_into().expect("invalid diff");
2315 let update = DurableType::from_key_value(key, ());
2316 let kind = kind_fn(update);
2317 Some((kind, diff))
2318 } else {
2319 None
2320 }
2321 })
2322 }
2323
2324 let Transaction {
2325 durable_catalog: _,
2326 databases,
2327 schemas,
2328 items,
2329 comments,
2330 roles,
2331 role_auth,
2332 clusters,
2333 network_policies,
2334 cluster_replicas,
2335 introspection_sources,
2336 system_gid_mapping,
2337 system_configurations,
2338 default_privileges,
2339 source_references,
2340 system_privileges,
2341 audit_log_updates,
2342 storage_collection_metadata,
2343 unfinalized_shards,
2344 id_allocator: _,
2346 configs: _,
2347 settings: _,
2348 txn_wal_shard: _,
2349 upper,
2350 op_id: _,
2351 } = &self;
2352
2353 let updates = std::iter::empty()
2354 .chain(get_collection_op_updates(
2355 roles,
2356 StateUpdateKind::Role,
2357 self.op_id,
2358 ))
2359 .chain(get_collection_op_updates(
2360 role_auth,
2361 StateUpdateKind::RoleAuth,
2362 self.op_id,
2363 ))
2364 .chain(get_collection_op_updates(
2365 databases,
2366 StateUpdateKind::Database,
2367 self.op_id,
2368 ))
2369 .chain(get_collection_op_updates(
2370 schemas,
2371 StateUpdateKind::Schema,
2372 self.op_id,
2373 ))
2374 .chain(get_collection_op_updates(
2375 default_privileges,
2376 StateUpdateKind::DefaultPrivilege,
2377 self.op_id,
2378 ))
2379 .chain(get_collection_op_updates(
2380 system_privileges,
2381 StateUpdateKind::SystemPrivilege,
2382 self.op_id,
2383 ))
2384 .chain(get_collection_op_updates(
2385 system_configurations,
2386 StateUpdateKind::SystemConfiguration,
2387 self.op_id,
2388 ))
2389 .chain(get_collection_op_updates(
2390 clusters,
2391 StateUpdateKind::Cluster,
2392 self.op_id,
2393 ))
2394 .chain(get_collection_op_updates(
2395 network_policies,
2396 StateUpdateKind::NetworkPolicy,
2397 self.op_id,
2398 ))
2399 .chain(get_collection_op_updates(
2400 introspection_sources,
2401 StateUpdateKind::IntrospectionSourceIndex,
2402 self.op_id,
2403 ))
2404 .chain(get_collection_op_updates(
2405 cluster_replicas,
2406 StateUpdateKind::ClusterReplica,
2407 self.op_id,
2408 ))
2409 .chain(get_collection_op_updates(
2410 system_gid_mapping,
2411 StateUpdateKind::SystemObjectMapping,
2412 self.op_id,
2413 ))
2414 .chain(get_collection_op_updates(
2415 items,
2416 StateUpdateKind::Item,
2417 self.op_id,
2418 ))
2419 .chain(get_collection_op_updates(
2420 comments,
2421 StateUpdateKind::Comment,
2422 self.op_id,
2423 ))
2424 .chain(get_collection_op_updates(
2425 source_references,
2426 StateUpdateKind::SourceReferences,
2427 self.op_id,
2428 ))
2429 .chain(get_collection_op_updates(
2430 storage_collection_metadata,
2431 StateUpdateKind::StorageCollectionMetadata,
2432 self.op_id,
2433 ))
2434 .chain(get_collection_op_updates(
2435 unfinalized_shards,
2436 StateUpdateKind::UnfinalizedShard,
2437 self.op_id,
2438 ))
2439 .chain(get_large_collection_op_updates(
2440 audit_log_updates,
2441 StateUpdateKind::AuditLog,
2442 self.op_id,
2443 ))
2444 .map(|(kind, diff)| StateUpdate {
2445 kind,
2446 ts: upper.clone(),
2447 diff,
2448 })
2449 .collect();
2450
2451 updates
2452 }
2453
2454 pub fn is_savepoint(&self) -> bool {
2455 self.durable_catalog.is_savepoint()
2456 }
2457
2458 fn commit_op(&mut self) {
2459 self.op_id += 1;
2460 }
2461
2462 pub fn op_id(&self) -> Timestamp {
2463 self.op_id
2464 }
2465
2466 pub fn upper(&self) -> mz_repr::Timestamp {
2467 self.upper
2468 }
2469
2470 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2471 let audit_log_updates = self
2472 .audit_log_updates
2473 .into_iter()
2474 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2475 .collect();
2476
2477 let txn_batch = TransactionBatch {
2478 databases: self.databases.pending(),
2479 schemas: self.schemas.pending(),
2480 items: self.items.pending(),
2481 comments: self.comments.pending(),
2482 roles: self.roles.pending(),
2483 role_auth: self.role_auth.pending(),
2484 clusters: self.clusters.pending(),
2485 cluster_replicas: self.cluster_replicas.pending(),
2486 network_policies: self.network_policies.pending(),
2487 introspection_sources: self.introspection_sources.pending(),
2488 id_allocator: self.id_allocator.pending(),
2489 configs: self.configs.pending(),
2490 source_references: self.source_references.pending(),
2491 settings: self.settings.pending(),
2492 system_gid_mapping: self.system_gid_mapping.pending(),
2493 system_configurations: self.system_configurations.pending(),
2494 default_privileges: self.default_privileges.pending(),
2495 system_privileges: self.system_privileges.pending(),
2496 storage_collection_metadata: self.storage_collection_metadata.pending(),
2497 unfinalized_shards: self.unfinalized_shards.pending(),
2498 txn_wal_shard: self.txn_wal_shard.pending(),
2499 audit_log_updates,
2500 upper: self.upper,
2501 };
2502 (txn_batch, self.durable_catalog)
2503 }
2504
2505 #[mz_ore::instrument(level = "debug")]
2517 pub(crate) async fn commit_internal(
2518 self,
2519 commit_ts: mz_repr::Timestamp,
2520 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2521 let (mut txn_batch, durable_catalog) = self.into_parts();
2522 let TransactionBatch {
2523 databases,
2524 schemas,
2525 items,
2526 comments,
2527 roles,
2528 role_auth,
2529 clusters,
2530 cluster_replicas,
2531 network_policies,
2532 introspection_sources,
2533 id_allocator,
2534 configs,
2535 source_references,
2536 settings,
2537 system_gid_mapping,
2538 system_configurations,
2539 default_privileges,
2540 system_privileges,
2541 storage_collection_metadata,
2542 unfinalized_shards,
2543 txn_wal_shard,
2544 audit_log_updates,
2545 upper: _,
2546 } = &mut txn_batch;
2547 differential_dataflow::consolidation::consolidate_updates(databases);
2550 differential_dataflow::consolidation::consolidate_updates(schemas);
2551 differential_dataflow::consolidation::consolidate_updates(items);
2552 differential_dataflow::consolidation::consolidate_updates(comments);
2553 differential_dataflow::consolidation::consolidate_updates(roles);
2554 differential_dataflow::consolidation::consolidate_updates(role_auth);
2555 differential_dataflow::consolidation::consolidate_updates(clusters);
2556 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2557 differential_dataflow::consolidation::consolidate_updates(network_policies);
2558 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2559 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2560 differential_dataflow::consolidation::consolidate_updates(configs);
2561 differential_dataflow::consolidation::consolidate_updates(settings);
2562 differential_dataflow::consolidation::consolidate_updates(source_references);
2563 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2564 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2565 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2566 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2567 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2568 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2569 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2570 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2571
2572 let upper = durable_catalog
2573 .commit_transaction(txn_batch, commit_ts)
2574 .await?;
2575 Ok((durable_catalog, upper))
2576 }
2577
2578 #[mz_ore::instrument(level = "debug")]
2595 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2596 let op_updates = self.get_op_updates();
2597 assert!(
2598 op_updates.is_empty(),
2599 "unconsumed transaction updates: {op_updates:?}"
2600 );
2601
2602 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2603 let updates = durable_storage.sync_updates(upper).await?;
2605 soft_assert_no_log!(
2610 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2611 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2612 );
2613 Ok(())
2614 }
2615}
2616
2617use crate::durable::async_trait;
2618
2619use super::objects::{RoleAuthKey, RoleAuthValue};
2620
2621#[async_trait]
2622impl StorageTxn for Transaction<'_> {
2623 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2624 self.storage_collection_metadata
2625 .items()
2626 .into_iter()
2627 .map(
2628 |(
2629 StorageCollectionMetadataKey { id },
2630 StorageCollectionMetadataValue { shard },
2631 )| { (*id, shard.clone()) },
2632 )
2633 .collect()
2634 }
2635
2636 fn insert_collection_metadata(
2637 &mut self,
2638 metadata: BTreeMap<GlobalId, ShardId>,
2639 ) -> Result<(), StorageError> {
2640 for (id, shard) in metadata {
2641 self.storage_collection_metadata
2642 .insert(
2643 StorageCollectionMetadataKey { id },
2644 StorageCollectionMetadataValue {
2645 shard: shard.clone(),
2646 },
2647 self.op_id,
2648 )
2649 .map_err(|err| match err {
2650 DurableCatalogError::DuplicateKey => {
2651 StorageError::CollectionMetadataAlreadyExists(id)
2652 }
2653 DurableCatalogError::UniquenessViolation => {
2654 StorageError::PersistShardAlreadyInUse(shard)
2655 }
2656 err => StorageError::Generic(anyhow::anyhow!(err)),
2657 })?;
2658 }
2659 Ok(())
2660 }
2661
2662 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2663 let ks: Vec<_> = ids
2664 .into_iter()
2665 .map(|id| StorageCollectionMetadataKey { id })
2666 .collect();
2667 self.storage_collection_metadata
2668 .delete_by_keys(ks, self.op_id)
2669 .into_iter()
2670 .map(
2671 |(
2672 StorageCollectionMetadataKey { id },
2673 StorageCollectionMetadataValue { shard },
2674 )| (id, shard),
2675 )
2676 .collect()
2677 }
2678
2679 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2680 self.unfinalized_shards
2681 .items()
2682 .into_iter()
2683 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2684 .collect()
2685 }
2686
2687 fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError> {
2688 for shard in s {
2689 match self
2690 .unfinalized_shards
2691 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2692 {
2693 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2695 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2696 };
2697 }
2698 Ok(())
2699 }
2700
2701 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2702 let ks: Vec<_> = shards
2703 .into_iter()
2704 .map(|shard| UnfinalizedShardKey { shard })
2705 .collect();
2706 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2707 }
2708
2709 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2710 self.txn_wal_shard
2711 .values()
2712 .iter()
2713 .next()
2714 .map(|TxnWalShardValue { shard }| *shard)
2715 }
2716
2717 fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError> {
2718 self.txn_wal_shard
2719 .insert((), TxnWalShardValue { shard }, self.op_id)
2720 .map_err(|err| match err {
2721 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2722 err => StorageError::Generic(anyhow::anyhow!(err)),
2723 })
2724 }
2725}
2726
2727#[derive(Debug, Clone, Default, PartialEq)]
2729pub struct TransactionBatch {
2730 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2731 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2732 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2733 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2734 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2735 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2736 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2737 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2738 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2739 pub(crate) introspection_sources: Vec<(
2740 proto::ClusterIntrospectionSourceIndexKey,
2741 proto::ClusterIntrospectionSourceIndexValue,
2742 Diff,
2743 )>,
2744 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2745 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2746 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2747 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2748 pub(crate) system_configurations: Vec<(
2749 proto::ServerConfigurationKey,
2750 proto::ServerConfigurationValue,
2751 Diff,
2752 )>,
2753 pub(crate) default_privileges: Vec<(
2754 proto::DefaultPrivilegesKey,
2755 proto::DefaultPrivilegesValue,
2756 Diff,
2757 )>,
2758 pub(crate) source_references: Vec<(
2759 proto::SourceReferencesKey,
2760 proto::SourceReferencesValue,
2761 Diff,
2762 )>,
2763 pub(crate) system_privileges: Vec<(
2764 proto::SystemPrivilegesKey,
2765 proto::SystemPrivilegesValue,
2766 Diff,
2767 )>,
2768 pub(crate) storage_collection_metadata: Vec<(
2769 proto::StorageCollectionMetadataKey,
2770 proto::StorageCollectionMetadataValue,
2771 Diff,
2772 )>,
2773 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2774 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2775 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2776 pub(crate) upper: mz_repr::Timestamp,
2778}
2779
2780impl TransactionBatch {
2781 pub fn is_empty(&self) -> bool {
2782 let TransactionBatch {
2783 databases,
2784 schemas,
2785 items,
2786 comments,
2787 roles,
2788 role_auth,
2789 clusters,
2790 cluster_replicas,
2791 network_policies,
2792 introspection_sources,
2793 id_allocator,
2794 configs,
2795 settings,
2796 source_references,
2797 system_gid_mapping,
2798 system_configurations,
2799 default_privileges,
2800 system_privileges,
2801 storage_collection_metadata,
2802 unfinalized_shards,
2803 txn_wal_shard,
2804 audit_log_updates,
2805 upper: _,
2806 } = self;
2807 databases.is_empty()
2808 && schemas.is_empty()
2809 && items.is_empty()
2810 && comments.is_empty()
2811 && roles.is_empty()
2812 && role_auth.is_empty()
2813 && clusters.is_empty()
2814 && cluster_replicas.is_empty()
2815 && network_policies.is_empty()
2816 && introspection_sources.is_empty()
2817 && id_allocator.is_empty()
2818 && configs.is_empty()
2819 && settings.is_empty()
2820 && source_references.is_empty()
2821 && system_gid_mapping.is_empty()
2822 && system_configurations.is_empty()
2823 && default_privileges.is_empty()
2824 && system_privileges.is_empty()
2825 && storage_collection_metadata.is_empty()
2826 && unfinalized_shards.is_empty()
2827 && txn_wal_shard.is_empty()
2828 && audit_log_updates.is_empty()
2829 }
2830}
2831
2832#[derive(Debug, Clone, PartialEq, Eq)]
2833struct TransactionUpdate<V> {
2834 value: V,
2835 ts: Timestamp,
2836 diff: Diff,
2837}
2838
2839trait UniqueName {
2841 const HAS_UNIQUE_NAME: bool;
2844 fn unique_name(&self) -> &str;
2846}
2847
2848mod unique_name {
2849 use crate::durable::objects::*;
2850
2851 macro_rules! impl_unique_name {
2852 ($($t:ty),* $(,)?) => {
2853 $(
2854 impl crate::durable::transaction::UniqueName for $t {
2855 const HAS_UNIQUE_NAME: bool = true;
2856 fn unique_name(&self) -> &str {
2857 &self.name
2858 }
2859 }
2860 )*
2861 };
2862 }
2863
2864 macro_rules! impl_no_unique_name {
2865 ($($t:ty),* $(,)?) => {
2866 $(
2867 impl crate::durable::transaction::UniqueName for $t {
2868 const HAS_UNIQUE_NAME: bool = false;
2869 fn unique_name(&self) -> &str {
2870 ""
2871 }
2872 }
2873 )*
2874 };
2875 }
2876
2877 impl_unique_name! {
2878 ClusterReplicaValue,
2879 ClusterValue,
2880 DatabaseValue,
2881 ItemValue,
2882 NetworkPolicyValue,
2883 RoleValue,
2884 SchemaValue,
2885 }
2886
2887 impl_no_unique_name!(
2888 (),
2889 ClusterIntrospectionSourceIndexValue,
2890 CommentValue,
2891 ConfigValue,
2892 DefaultPrivilegesValue,
2893 GidMappingValue,
2894 IdAllocValue,
2895 ServerConfigurationValue,
2896 SettingValue,
2897 SourceReferencesValue,
2898 StorageCollectionMetadataValue,
2899 SystemPrivilegesValue,
2900 TxnWalShardValue,
2901 RoleAuthValue,
2902 );
2903
2904 #[cfg(test)]
2905 mod test {
2906 impl_no_unique_name!(String,);
2907 }
2908}
2909
2910#[derive(Debug)]
2920struct TableTransaction<K, V> {
2921 initial: BTreeMap<K, V>,
2922 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2925 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2926}
2927
2928impl<K, V> TableTransaction<K, V>
2929where
2930 K: Ord + Eq + Clone + Debug,
2931 V: Ord + Clone + Debug + UniqueName,
2932{
2933 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2940 where
2941 K: RustType<KP>,
2942 V: RustType<VP>,
2943 {
2944 let initial = initial
2945 .into_iter()
2946 .map(RustType::from_proto)
2947 .collect::<Result<_, _>>()?;
2948
2949 Ok(Self {
2950 initial,
2951 pending: BTreeMap::new(),
2952 uniqueness_violation: None,
2953 })
2954 }
2955
2956 fn new_with_uniqueness_fn<KP, VP>(
2959 initial: BTreeMap<KP, VP>,
2960 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2961 ) -> Result<Self, TryFromProtoError>
2962 where
2963 K: RustType<KP>,
2964 V: RustType<VP>,
2965 {
2966 let initial = initial
2967 .into_iter()
2968 .map(RustType::from_proto)
2969 .collect::<Result<_, _>>()?;
2970
2971 Ok(Self {
2972 initial,
2973 pending: BTreeMap::new(),
2974 uniqueness_violation: Some(uniqueness_violation),
2975 })
2976 }
2977
2978 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2981 where
2982 K: RustType<KP>,
2983 V: RustType<VP>,
2984 {
2985 soft_assert_no_log!(self.verify().is_ok());
2986 self.pending
2989 .into_iter()
2990 .flat_map(|(k, v)| {
2991 let mut v: Vec<_> = v
2992 .into_iter()
2993 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2994 .collect();
2995 differential_dataflow::consolidation::consolidate(&mut v);
2996 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2997 })
2998 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2999 .collect()
3000 }
3001
3002 fn verify(&self) -> Result<(), DurableCatalogError> {
3007 if let Some(uniqueness_violation) = self.uniqueness_violation {
3008 let items = self.values();
3010 if V::HAS_UNIQUE_NAME {
3011 let by_name: BTreeMap<_, _> = items
3012 .iter()
3013 .enumerate()
3014 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3015 .collect();
3016 for (i, vi) in items.iter().enumerate() {
3017 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3018 if i != *j && uniqueness_violation(vi, *vj) {
3019 return Err(DurableCatalogError::UniquenessViolation);
3020 }
3021 }
3022 }
3023 } else {
3024 for (i, vi) in items.iter().enumerate() {
3025 for (j, vj) in items.iter().enumerate() {
3026 if i != j && uniqueness_violation(vi, vj) {
3027 return Err(DurableCatalogError::UniquenessViolation);
3028 }
3029 }
3030 }
3031 }
3032 }
3033 soft_assert_no_log!(
3034 self.pending
3035 .values()
3036 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3037 "pending should be sorted by timestamp: {:?}",
3038 self.pending
3039 );
3040 Ok(())
3041 }
3042
3043 fn verify_keys<'a>(
3048 &self,
3049 keys: impl IntoIterator<Item = &'a K>,
3050 ) -> Result<(), DurableCatalogError>
3051 where
3052 K: 'a,
3053 {
3054 if let Some(uniqueness_violation) = self.uniqueness_violation {
3055 let entries: Vec<_> = keys
3056 .into_iter()
3057 .filter_map(|key| self.get(key).map(|value| (key, value)))
3058 .collect();
3059 for (ki, vi) in self.items() {
3061 for (kj, vj) in &entries {
3062 if ki != *kj && uniqueness_violation(vi, vj) {
3063 return Err(DurableCatalogError::UniquenessViolation);
3064 }
3065 }
3066 }
3067 }
3068 soft_assert_no_log!(self.verify().is_ok());
3069 Ok(())
3070 }
3071
3072 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3075 let mut seen = BTreeSet::new();
3076 for k in self.pending.keys() {
3077 seen.insert(k);
3078 let v = self.get(k);
3079 if let Some(v) = v {
3082 f(k, v);
3083 }
3084 }
3085 for (k, v) in self.initial.iter() {
3086 if !seen.contains(k) {
3088 f(k, v);
3089 }
3090 }
3091 }
3092
3093 fn get(&self, k: &K) -> Option<&V> {
3095 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3096 let mut updates = Vec::with_capacity(pending.len() + 1);
3097 if let Some(initial) = self.initial.get(k) {
3098 updates.push((initial, Diff::ONE));
3099 }
3100 updates.extend(
3101 pending
3102 .into_iter()
3103 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3104 );
3105
3106 differential_dataflow::consolidation::consolidate(&mut updates);
3107 assert!(updates.len() <= 1);
3108 updates.into_iter().next().map(|(v, _)| v)
3109 }
3110
3111 #[cfg(test)]
3116 fn items_cloned(&self) -> BTreeMap<K, V> {
3117 let mut items = BTreeMap::new();
3118 self.for_values(|k, v| {
3119 items.insert(k.clone(), v.clone());
3120 });
3121 items
3122 }
3123
3124 fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3128 where
3129 K: RustType<KP>,
3130 V: RustType<VP>,
3131 KP: Ord,
3132 {
3133 let mut items = BTreeMap::new();
3134 self.for_values(|k, v| {
3135 items.insert(k.into_proto(), v.into_proto());
3136 });
3137 items
3138 }
3139
3140 fn items(&self) -> BTreeMap<&K, &V> {
3143 let mut items = BTreeMap::new();
3144 self.for_values(|k, v| {
3145 items.insert(k, v);
3146 });
3147 items
3148 }
3149
3150 fn values(&self) -> BTreeSet<&V> {
3152 let mut items = BTreeSet::new();
3153 self.for_values(|_, v| {
3154 items.insert(v);
3155 });
3156 items
3157 }
3158
3159 fn len(&self) -> usize {
3161 let mut count = 0;
3162 self.for_values(|_, _| {
3163 count += 1;
3164 });
3165 count
3166 }
3167
3168 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3172 &mut self,
3173 mut f: F,
3174 ) {
3175 let mut pending = BTreeMap::new();
3176 self.for_values(|k, v| f(&mut pending, k, v));
3177 for (k, updates) in pending {
3178 self.pending.entry(k).or_default().extend(updates);
3179 }
3180 }
3181
3182 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3186 let mut violation = None;
3187 self.for_values(|for_k, for_v| {
3188 if &k == for_k {
3189 violation = Some(DurableCatalogError::DuplicateKey);
3190 }
3191 if let Some(uniqueness_violation) = self.uniqueness_violation {
3192 if uniqueness_violation(for_v, &v) {
3193 violation = Some(DurableCatalogError::UniquenessViolation);
3194 }
3195 }
3196 });
3197 if let Some(violation) = violation {
3198 return Err(violation);
3199 }
3200 self.pending.entry(k).or_default().push(TransactionUpdate {
3201 value: v,
3202 ts,
3203 diff: Diff::ONE,
3204 });
3205 soft_assert_no_log!(self.verify().is_ok());
3206 Ok(())
3207 }
3208
3209 fn update<F: Fn(&K, &V) -> Option<V>>(
3218 &mut self,
3219 f: F,
3220 ts: Timestamp,
3221 ) -> Result<Diff, DurableCatalogError> {
3222 let mut changed = Diff::ZERO;
3223 let mut keys = BTreeSet::new();
3224 let pending = self.pending.clone();
3226 self.for_values_mut(|p, k, v| {
3227 if let Some(next) = f(k, v) {
3228 changed += Diff::ONE;
3229 keys.insert(k.clone());
3230 let updates = p.entry(k.clone()).or_default();
3231 updates.push(TransactionUpdate {
3232 value: v.clone(),
3233 ts,
3234 diff: Diff::MINUS_ONE,
3235 });
3236 updates.push(TransactionUpdate {
3237 value: next,
3238 ts,
3239 diff: Diff::ONE,
3240 });
3241 }
3242 });
3243 if let Err(err) = self.verify_keys(&keys) {
3245 self.pending = pending;
3246 Err(err)
3247 } else {
3248 Ok(changed)
3249 }
3250 }
3251
3252 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3257 if let Some(cur_v) = self.get(&k) {
3258 if v != *cur_v {
3259 self.set(k, Some(v), ts)?;
3260 }
3261 Ok(true)
3262 } else {
3263 Ok(false)
3264 }
3265 }
3266
3267 fn update_by_keys(
3272 &mut self,
3273 kvs: impl IntoIterator<Item = (K, V)>,
3274 ts: Timestamp,
3275 ) -> Result<Diff, DurableCatalogError> {
3276 let kvs: Vec<_> = kvs
3277 .into_iter()
3278 .filter_map(|(k, v)| match self.get(&k) {
3279 Some(cur_v) => Some((*cur_v == v, k, v)),
3281 None => None,
3282 })
3283 .collect();
3284 let changed = kvs.len();
3285 let changed =
3286 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3287 let kvs = kvs
3288 .into_iter()
3289 .filter(|(no_op, _, _)| !no_op)
3291 .map(|(_, k, v)| (k, Some(v)))
3292 .collect();
3293 self.set_many(kvs, ts)?;
3294 Ok(changed)
3295 }
3296
3297 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3304 let prev = self.get(&k).cloned();
3305 let entry = self.pending.entry(k.clone()).or_default();
3306 let restore_len = entry.len();
3307
3308 match (v, prev.clone()) {
3309 (Some(v), Some(prev)) => {
3310 entry.push(TransactionUpdate {
3311 value: prev,
3312 ts,
3313 diff: Diff::MINUS_ONE,
3314 });
3315 entry.push(TransactionUpdate {
3316 value: v,
3317 ts,
3318 diff: Diff::ONE,
3319 });
3320 }
3321 (Some(v), None) => {
3322 entry.push(TransactionUpdate {
3323 value: v,
3324 ts,
3325 diff: Diff::ONE,
3326 });
3327 }
3328 (None, Some(prev)) => {
3329 entry.push(TransactionUpdate {
3330 value: prev,
3331 ts,
3332 diff: Diff::MINUS_ONE,
3333 });
3334 }
3335 (None, None) => {}
3336 }
3337
3338 if let Err(err) = self.verify_keys([&k]) {
3340 let pending = self.pending.get_mut(&k).expect("inserted above");
3343 pending.truncate(restore_len);
3344 Err(err)
3345 } else {
3346 Ok(prev)
3347 }
3348 }
3349
3350 fn set_many(
3355 &mut self,
3356 kvs: BTreeMap<K, Option<V>>,
3357 ts: Timestamp,
3358 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3359 if kvs.is_empty() {
3360 return Ok(BTreeMap::new());
3361 }
3362
3363 let mut prevs = BTreeMap::new();
3364 let mut restores = BTreeMap::new();
3365
3366 for (k, v) in kvs {
3367 let prev = self.get(&k).cloned();
3368 let entry = self.pending.entry(k.clone()).or_default();
3369 restores.insert(k.clone(), entry.len());
3370
3371 match (v, prev.clone()) {
3372 (Some(v), Some(prev)) => {
3373 entry.push(TransactionUpdate {
3374 value: prev,
3375 ts,
3376 diff: Diff::MINUS_ONE,
3377 });
3378 entry.push(TransactionUpdate {
3379 value: v,
3380 ts,
3381 diff: Diff::ONE,
3382 });
3383 }
3384 (Some(v), None) => {
3385 entry.push(TransactionUpdate {
3386 value: v,
3387 ts,
3388 diff: Diff::ONE,
3389 });
3390 }
3391 (None, Some(prev)) => {
3392 entry.push(TransactionUpdate {
3393 value: prev,
3394 ts,
3395 diff: Diff::MINUS_ONE,
3396 });
3397 }
3398 (None, None) => {}
3399 }
3400
3401 prevs.insert(k, prev);
3402 }
3403
3404 if let Err(err) = self.verify_keys(prevs.keys()) {
3406 for (k, restore_len) in restores {
3407 let pending = self.pending.get_mut(&k).expect("inserted above");
3410 pending.truncate(restore_len);
3411 }
3412 Err(err)
3413 } else {
3414 Ok(prevs)
3415 }
3416 }
3417
3418 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3424 let mut deleted = Vec::new();
3425 self.for_values_mut(|p, k, v| {
3426 if f(k, v) {
3427 deleted.push((k.clone(), v.clone()));
3428 p.entry(k.clone()).or_default().push(TransactionUpdate {
3429 value: v.clone(),
3430 ts,
3431 diff: Diff::MINUS_ONE,
3432 });
3433 }
3434 });
3435 soft_assert_no_log!(self.verify().is_ok());
3436 deleted
3437 }
3438
3439 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3443 self.set(k, None, ts)
3444 .expect("deleting an entry cannot violate uniqueness")
3445 }
3446
3447 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3451 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3452 let prevs = self
3453 .set_many(kvs, ts)
3454 .expect("deleting entries cannot violate uniqueness");
3455 prevs
3456 .into_iter()
3457 .filter_map(|(k, v)| v.map(|v| (k, v)))
3458 .collect()
3459 }
3460}
3461
3462#[cfg(test)]
3463#[allow(clippy::unwrap_used)]
3464mod tests {
3465 use super::*;
3466
3467 use mz_ore::now::SYSTEM_TIME;
3468 use mz_ore::{assert_none, assert_ok};
3469 use mz_persist_client::cache::PersistClientCache;
3470 use mz_persist_types::PersistLocation;
3471 use semver::Version;
3472
3473 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3474 use crate::memory;
3475
3476 #[mz_ore::test]
3477 fn test_table_transaction_simple() {
3478 fn uniqueness_violation(a: &String, b: &String) -> bool {
3479 a == b
3480 }
3481 let mut table = TableTransaction::new_with_uniqueness_fn(
3482 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3483 uniqueness_violation,
3484 )
3485 .unwrap();
3486
3487 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3490 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3491 assert!(
3492 table
3493 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3494 .is_err()
3495 );
3496 assert!(
3497 table
3498 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3499 .is_err()
3500 );
3501 }
3502
3503 #[mz_ore::test]
3504 fn test_table_transaction() {
3505 fn uniqueness_violation(a: &String, b: &String) -> bool {
3506 a == b
3507 }
3508 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3509
3510 fn commit(
3511 table: &mut BTreeMap<Vec<u8>, String>,
3512 mut pending: Vec<(Vec<u8>, String, Diff)>,
3513 ) {
3514 pending.sort_by(|a, b| a.2.cmp(&b.2));
3516 for (k, v, diff) in pending {
3517 if diff == Diff::MINUS_ONE {
3518 let prev = table.remove(&k);
3519 assert_eq!(prev, Some(v));
3520 } else if diff == Diff::ONE {
3521 let prev = table.insert(k, v);
3522 assert_eq!(prev, None);
3523 } else {
3524 panic!("unexpected diff: {diff}");
3525 }
3526 }
3527 }
3528
3529 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3530 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3531 let mut table_txn =
3532 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3533 assert_eq!(table_txn.items_cloned(), table);
3534 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3535 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3536 assert_eq!(
3537 table_txn.items_cloned(),
3538 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3539 );
3540 assert_eq!(
3541 table_txn
3542 .update(|_k, _v| Some("v3".to_string()), 2)
3543 .unwrap(),
3544 Diff::ONE
3545 );
3546
3547 table_txn
3549 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3550 .unwrap_err();
3551
3552 table_txn
3553 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3554 .unwrap();
3555 assert_eq!(
3556 table_txn.items_cloned(),
3557 BTreeMap::from([
3558 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3559 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3560 ])
3561 );
3562 let err = table_txn
3563 .update(|_k, _v| Some("v1".to_string()), 5)
3564 .unwrap_err();
3565 assert!(
3566 matches!(err, DurableCatalogError::UniquenessViolation),
3567 "unexpected err: {err:?}"
3568 );
3569 let pending = table_txn.pending();
3570 assert_eq!(
3571 pending,
3572 vec![
3573 (
3574 1i64.to_le_bytes().to_vec(),
3575 "v1".to_string(),
3576 Diff::MINUS_ONE
3577 ),
3578 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3579 (
3580 2i64.to_le_bytes().to_vec(),
3581 "v2".to_string(),
3582 Diff::MINUS_ONE
3583 ),
3584 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3585 ]
3586 );
3587 commit(&mut table, pending);
3588 assert_eq!(
3589 table,
3590 BTreeMap::from([
3591 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3592 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3593 ])
3594 );
3595
3596 let mut table_txn =
3597 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3598 assert_eq!(
3600 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3601 1
3602 );
3603 table_txn
3604 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3605 .unwrap();
3606 table_txn
3608 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3609 .unwrap_err();
3610 table_txn
3612 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3613 .unwrap_err();
3614 assert_eq!(
3615 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3616 1
3617 );
3618 table_txn
3620 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3621 .unwrap();
3622 table_txn
3623 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3624 .unwrap();
3625 let pending = table_txn.pending();
3626 assert_eq!(
3627 pending,
3628 vec![
3629 (
3630 1i64.to_le_bytes().to_vec(),
3631 "v3".to_string(),
3632 Diff::MINUS_ONE
3633 ),
3634 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3635 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3636 ]
3637 );
3638 commit(&mut table, pending);
3639 assert_eq!(
3640 table,
3641 BTreeMap::from([
3642 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3643 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3644 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3645 ])
3646 );
3647
3648 let mut table_txn =
3649 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3650 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3651 table_txn
3652 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3653 .unwrap();
3654
3655 commit(&mut table, table_txn.pending());
3656 assert_eq!(
3657 table,
3658 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3659 );
3660
3661 let mut table_txn =
3662 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3663 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3664 table_txn
3665 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3666 .unwrap();
3667 commit(&mut table, table_txn.pending());
3668 assert_eq!(
3669 table,
3670 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3671 );
3672
3673 let mut table_txn =
3675 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3676 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3677 table_txn
3678 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3679 .unwrap();
3680 table_txn
3681 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3682 .unwrap_err();
3683 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3684 table_txn
3685 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3686 .unwrap();
3687 commit(&mut table, table_txn.pending());
3688 assert_eq!(
3689 table.clone().into_iter().collect::<Vec<_>>(),
3690 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3691 );
3692
3693 let mut table_txn =
3695 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3696 table_txn
3698 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3699 .unwrap_err();
3700 table_txn
3701 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3702 .unwrap();
3703 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3704 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3705 let pending = table_txn.pending();
3706 assert_eq!(
3707 pending,
3708 vec![
3709 (
3710 1i64.to_le_bytes().to_vec(),
3711 "v5".to_string(),
3712 Diff::MINUS_ONE
3713 ),
3714 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3715 ]
3716 );
3717 commit(&mut table, pending);
3718 assert_eq!(
3719 table,
3720 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3721 );
3722
3723 let mut table_txn =
3725 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3726 table_txn
3727 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3728 .unwrap();
3729 let pending = table_txn.pending::<Vec<u8>, String>();
3730 assert!(pending.is_empty());
3731
3732 let mut table_txn =
3734 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3735 table_txn
3737 .set_many(
3738 BTreeMap::from([
3739 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3740 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3741 ]),
3742 0,
3743 )
3744 .unwrap_err();
3745 table_txn
3746 .set_many(
3747 BTreeMap::from([
3748 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3749 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3750 ]),
3751 1,
3752 )
3753 .unwrap();
3754 table_txn
3755 .set_many(
3756 BTreeMap::from([
3757 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3758 (3i64.to_le_bytes().to_vec(), None),
3759 ]),
3760 2,
3761 )
3762 .unwrap();
3763 let pending = table_txn.pending();
3764 assert_eq!(
3765 pending,
3766 vec![
3767 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3768 (
3769 3i64.to_le_bytes().to_vec(),
3770 "v6".to_string(),
3771 Diff::MINUS_ONE
3772 ),
3773 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3774 ]
3775 );
3776 commit(&mut table, pending);
3777 assert_eq!(
3778 table,
3779 BTreeMap::from([
3780 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3781 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3782 ])
3783 );
3784
3785 let mut table_txn =
3787 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3788 table_txn
3789 .set_many(
3790 BTreeMap::from([
3791 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3792 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3793 ]),
3794 0,
3795 )
3796 .unwrap();
3797 let pending = table_txn.pending::<Vec<u8>, String>();
3798 assert!(pending.is_empty());
3799 commit(&mut table, pending);
3800 assert_eq!(
3801 table,
3802 BTreeMap::from([
3803 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3804 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3805 ])
3806 );
3807
3808 let mut table_txn =
3810 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3811 table_txn
3813 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3814 .unwrap_err();
3815 assert!(
3816 table_txn
3817 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3818 .unwrap()
3819 );
3820 assert!(
3821 !table_txn
3822 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3823 .unwrap()
3824 );
3825 let pending = table_txn.pending();
3826 assert_eq!(
3827 pending,
3828 vec![
3829 (
3830 1i64.to_le_bytes().to_vec(),
3831 "v6".to_string(),
3832 Diff::MINUS_ONE
3833 ),
3834 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3835 ]
3836 );
3837 commit(&mut table, pending);
3838 assert_eq!(
3839 table,
3840 BTreeMap::from([
3841 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3842 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3843 ])
3844 );
3845
3846 let mut table_txn =
3848 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3849 assert!(
3850 table_txn
3851 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3852 .unwrap()
3853 );
3854 let pending = table_txn.pending::<Vec<u8>, String>();
3855 assert!(pending.is_empty());
3856 commit(&mut table, pending);
3857 assert_eq!(
3858 table,
3859 BTreeMap::from([
3860 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3861 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3862 ])
3863 );
3864
3865 let mut table_txn =
3867 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3868 table_txn
3870 .update_by_keys(
3871 [
3872 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3873 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3874 ],
3875 0,
3876 )
3877 .unwrap_err();
3878 let n = table_txn
3879 .update_by_keys(
3880 [
3881 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3882 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3883 ],
3884 1,
3885 )
3886 .unwrap();
3887 assert_eq!(n, Diff::ONE);
3888 let n = table_txn
3889 .update_by_keys(
3890 [
3891 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3892 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3893 ],
3894 2,
3895 )
3896 .unwrap();
3897 assert_eq!(n, Diff::ZERO);
3898 let pending = table_txn.pending();
3899 assert_eq!(
3900 pending,
3901 vec![
3902 (
3903 1i64.to_le_bytes().to_vec(),
3904 "v8".to_string(),
3905 Diff::MINUS_ONE
3906 ),
3907 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3908 ]
3909 );
3910 commit(&mut table, pending);
3911 assert_eq!(
3912 table,
3913 BTreeMap::from([
3914 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3915 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3916 ])
3917 );
3918
3919 let mut table_txn =
3921 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3922 let n = table_txn
3923 .update_by_keys(
3924 [
3925 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3926 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3927 ],
3928 0,
3929 )
3930 .unwrap();
3931 assert_eq!(n, Diff::from(2));
3932 let pending = table_txn.pending::<Vec<u8>, String>();
3933 assert!(pending.is_empty());
3934 commit(&mut table, pending);
3935 assert_eq!(
3936 table,
3937 BTreeMap::from([
3938 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3939 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3940 ])
3941 );
3942
3943 let mut table_txn =
3945 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3946 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3947 assert_eq!(prev, Some("v9".to_string()));
3948 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3949 assert_none!(prev);
3950 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3951 assert_none!(prev);
3952 let pending = table_txn.pending();
3953 assert_eq!(
3954 pending,
3955 vec![(
3956 1i64.to_le_bytes().to_vec(),
3957 "v9".to_string(),
3958 Diff::MINUS_ONE
3959 ),]
3960 );
3961 commit(&mut table, pending);
3962 assert_eq!(
3963 table,
3964 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3965 );
3966
3967 let mut table_txn =
3969 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3970 let prevs = table_txn.delete_by_keys(
3971 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3972 0,
3973 );
3974 assert_eq!(
3975 prevs,
3976 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3977 );
3978 let prevs = table_txn.delete_by_keys(
3979 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3980 1,
3981 );
3982 assert_eq!(prevs, vec![]);
3983 let prevs = table_txn.delete_by_keys(
3984 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3985 2,
3986 );
3987 assert_eq!(prevs, vec![]);
3988 let pending = table_txn.pending();
3989 assert_eq!(
3990 pending,
3991 vec![(
3992 42i64.to_le_bytes().to_vec(),
3993 "v7".to_string(),
3994 Diff::MINUS_ONE
3995 ),]
3996 );
3997 commit(&mut table, pending);
3998 assert_eq!(table, BTreeMap::new());
3999 }
4000
4001 #[mz_ore::test(tokio::test)]
4002 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
4004 const VERSION: Version = Version::new(26, 0, 0);
4005 let mut persist_cache = PersistClientCache::new_no_metrics();
4006 persist_cache.cfg.build_version = VERSION;
4007 let persist_client = persist_cache
4008 .open(PersistLocation::new_in_mem())
4009 .await
4010 .unwrap();
4011 let state_builder = TestCatalogStateBuilder::new(persist_client)
4012 .with_default_deploy_generation()
4013 .with_version(VERSION);
4014
4015 let _ = state_builder
4017 .clone()
4018 .unwrap_build()
4019 .await
4020 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4021 .await
4022 .unwrap()
4023 .0;
4024 let mut savepoint_state = state_builder
4025 .unwrap_build()
4026 .await
4027 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4028 .await
4029 .unwrap()
4030 .0;
4031
4032 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4033 assert!(!initial_snapshot.is_empty());
4034
4035 let db_name = "db";
4036 let db_owner = RoleId::User(42);
4037 let db_privileges = Vec::new();
4038 let mut txn = savepoint_state.transaction().await.unwrap();
4039 let (db_id, db_oid) = txn
4040 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4041 .unwrap();
4042 let commit_ts = txn.upper();
4043 txn.commit_internal(commit_ts).await.unwrap();
4044 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4045 let update = updates.into_element();
4046
4047 assert_eq!(update.diff, StateDiff::Addition);
4048
4049 let db = match update.kind {
4050 memory::objects::StateUpdateKind::Database(db) => db,
4051 update => panic!("unexpected update: {update:?}"),
4052 };
4053
4054 assert_eq!(db_id, db.id);
4055 assert_eq!(db_oid, db.oid);
4056 assert_eq!(db_name, db.name);
4057 assert_eq!(db_owner, db.owner_id);
4058 assert_eq!(db_privileges, db.privileges);
4059 }
4060
4061 #[mz_ore::test]
4062 fn test_allocate_introspection_source_index_id() {
4063 let cluster_variant: u8 = 0b0000_0001;
4064 let cluster_id_inner: u64 =
4065 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4066 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4067
4068 let cluster_id = ClusterId::System(cluster_id_inner);
4069 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4070
4071 let introspection_source_index_id: u64 =
4072 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4073
4074 {
4076 let mut cluster_variant_mask = 0xFF << 56;
4077 cluster_variant_mask &= introspection_source_index_id;
4078 cluster_variant_mask >>= 56;
4079 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4080 }
4081
4082 {
4084 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4085 cluster_id_inner_mask &= introspection_source_index_id;
4086 cluster_id_inner_mask >>= 8;
4087 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4088 }
4089
4090 {
4092 let mut log_variant_mask = 0xFF;
4093 log_variant_mask &= introspection_source_index_id;
4094 assert_eq!(
4095 log_variant_mask,
4096 u64::from(timely_messages_received_log_variant)
4097 );
4098 }
4099
4100 let (catalog_item_id, global_id) =
4101 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4102
4103 assert_eq!(
4104 catalog_item_id,
4105 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4106 );
4107 assert_eq!(
4108 global_id,
4109 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4110 );
4111 }
4112}