1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::time::Duration;
13
14use anyhow::anyhow;
15use derivative::Derivative;
16use itertools::Itertools;
17use mz_audit_log::VersionedEvent;
18use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
19use mz_controller_types::{ClusterId, ReplicaId};
20use mz_ore::cast::{u64_to_usize, usize_to_u64};
21use mz_ore::collections::{CollectionExt, HashSet};
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::vec::VecExt;
24use mz_ore::{soft_assert_no_log, soft_assert_or_log};
25use mz_persist_types::ShardId;
26use mz_pgrepr::oid::FIRST_USER_OID;
27use mz_proto::{RustType, TryFromProtoError};
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::role_id::RoleId;
31use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
32use mz_sql::catalog::{
33 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
34 RoleAttributesRaw, RoleMembership, RoleVars,
35};
36use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
37use mz_sql::plan::NetworkPolicyRule;
38use mz_sql_parser::ast::QualifiedReplica;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::controller::StorageError;
41use tracing::warn;
42
43use crate::builtin::BuiltinLog;
44use crate::durable::initialize::{
45 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
46 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
47};
48use crate::durable::objects::serialization::proto;
49use crate::durable::objects::{
50 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
51 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
52 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
53 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
54 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
55 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
56 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
57 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
58 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
59 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
60 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
61};
62use crate::durable::{
63 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
65 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
66 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
67 SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
68 USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
69 USER_ROLE_ID_ALLOC_KEY,
70};
71use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
72
73type Timestamp = u64;
74
75#[derive(Derivative)]
78#[derivative(Debug)]
79pub struct Transaction<'a> {
80 #[derivative(Debug = "ignore")]
81 #[derivative(PartialEq = "ignore")]
82 durable_catalog: &'a mut dyn DurableCatalogState,
83 databases: TableTransaction<DatabaseKey, DatabaseValue>,
84 schemas: TableTransaction<SchemaKey, SchemaValue>,
85 items: TableTransaction<ItemKey, ItemValue>,
86 comments: TableTransaction<CommentKey, CommentValue>,
87 roles: TableTransaction<RoleKey, RoleValue>,
88 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
89 clusters: TableTransaction<ClusterKey, ClusterValue>,
90 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
91 introspection_sources:
92 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
93 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
94 configs: TableTransaction<ConfigKey, ConfigValue>,
95 settings: TableTransaction<SettingKey, SettingValue>,
96 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
97 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
98 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
99 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
100 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
101 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
102 storage_collection_metadata:
103 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
104 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
105 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
106 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
109 upper: mz_repr::Timestamp,
111 op_id: Timestamp,
113}
114
115impl<'a> Transaction<'a> {
116 pub fn new(
117 durable_catalog: &'a mut dyn DurableCatalogState,
118 Snapshot {
119 databases,
120 schemas,
121 roles,
122 role_auth,
123 items,
124 comments,
125 clusters,
126 network_policies,
127 cluster_replicas,
128 introspection_sources,
129 id_allocator,
130 configs,
131 settings,
132 source_references,
133 system_object_mappings,
134 system_configurations,
135 default_privileges,
136 system_privileges,
137 storage_collection_metadata,
138 unfinalized_shards,
139 txn_wal_shard,
140 }: Snapshot,
141 upper: mz_repr::Timestamp,
142 ) -> Result<Transaction<'a>, CatalogError> {
143 Ok(Transaction {
144 durable_catalog,
145 databases: TableTransaction::new_with_uniqueness_fn(
146 databases,
147 |a: &DatabaseValue, b| a.name == b.name,
148 )?,
149 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
150 a.database_id == b.database_id && a.name == b.name
151 })?,
152 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
153 a.schema_id == b.schema_id && a.name == b.name && {
154 let a_type = a.item_type();
156 let b_type = b.item_type();
157 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
158 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
159 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
160 }
161 })?,
162 comments: TableTransaction::new(comments)?,
163 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
164 a.name == b.name
165 })?,
166 role_auth: TableTransaction::new(role_auth)?,
167 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
168 a.name == b.name
169 })?,
170 network_policies: TableTransaction::new_with_uniqueness_fn(
171 network_policies,
172 |a: &NetworkPolicyValue, b| a.name == b.name,
173 )?,
174 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
175 cluster_replicas,
176 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
177 )?,
178 introspection_sources: TableTransaction::new(introspection_sources)?,
179 id_allocator: TableTransaction::new(id_allocator)?,
180 configs: TableTransaction::new(configs)?,
181 settings: TableTransaction::new(settings)?,
182 source_references: TableTransaction::new(source_references)?,
183 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
184 system_configurations: TableTransaction::new(system_configurations)?,
185 default_privileges: TableTransaction::new(default_privileges)?,
186 system_privileges: TableTransaction::new(system_privileges)?,
187 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
188 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
189 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
193 audit_log_updates: Vec::new(),
194 upper,
195 op_id: 0,
196 })
197 }
198
199 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
200 let key = ItemKey { id: *id };
201 self.items
202 .get(&key)
203 .map(|v| DurableType::from_key_value(key, v.clone()))
204 }
205
206 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
207 self.items
208 .items()
209 .into_iter()
210 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
211 .sorted_by_key(|Item { id, .. }| *id)
212 }
213
214 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
215 self.insert_audit_log_events([event]);
216 }
217
218 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
219 let events = events
220 .into_iter()
221 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
222 self.audit_log_updates.extend(events);
223 }
224
225 pub fn insert_user_database(
226 &mut self,
227 database_name: &str,
228 owner_id: RoleId,
229 privileges: Vec<MzAclItem>,
230 temporary_oids: &HashSet<u32>,
231 ) -> Result<(DatabaseId, u32), CatalogError> {
232 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
233 let id = DatabaseId::User(id);
234 let oid = self.allocate_oid(temporary_oids)?;
235 self.insert_database(id, database_name, owner_id, privileges, oid)?;
236 Ok((id, oid))
237 }
238
239 pub(crate) fn insert_database(
240 &mut self,
241 id: DatabaseId,
242 database_name: &str,
243 owner_id: RoleId,
244 privileges: Vec<MzAclItem>,
245 oid: u32,
246 ) -> Result<u32, CatalogError> {
247 match self.databases.insert(
248 DatabaseKey { id },
249 DatabaseValue {
250 name: database_name.to_string(),
251 owner_id,
252 privileges,
253 oid,
254 },
255 self.op_id,
256 ) {
257 Ok(_) => Ok(oid),
258 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
259 }
260 }
261
262 pub fn insert_user_schema(
263 &mut self,
264 database_id: DatabaseId,
265 schema_name: &str,
266 owner_id: RoleId,
267 privileges: Vec<MzAclItem>,
268 temporary_oids: &HashSet<u32>,
269 ) -> Result<(SchemaId, u32), CatalogError> {
270 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
271 let id = SchemaId::User(id);
272 let oid = self.allocate_oid(temporary_oids)?;
273 self.insert_schema(
274 id,
275 Some(database_id),
276 schema_name.to_string(),
277 owner_id,
278 privileges,
279 oid,
280 )?;
281 Ok((id, oid))
282 }
283
284 pub fn insert_system_schema(
285 &mut self,
286 schema_id: u64,
287 schema_name: &str,
288 owner_id: RoleId,
289 privileges: Vec<MzAclItem>,
290 oid: u32,
291 ) -> Result<(), CatalogError> {
292 let id = SchemaId::System(schema_id);
293 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
294 }
295
296 pub(crate) fn insert_schema(
297 &mut self,
298 schema_id: SchemaId,
299 database_id: Option<DatabaseId>,
300 schema_name: String,
301 owner_id: RoleId,
302 privileges: Vec<MzAclItem>,
303 oid: u32,
304 ) -> Result<(), CatalogError> {
305 match self.schemas.insert(
306 SchemaKey { id: schema_id },
307 SchemaValue {
308 database_id,
309 name: schema_name.clone(),
310 owner_id,
311 privileges,
312 oid,
313 },
314 self.op_id,
315 ) {
316 Ok(_) => Ok(()),
317 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
318 }
319 }
320
321 pub fn insert_builtin_role(
322 &mut self,
323 id: RoleId,
324 name: String,
325 attributes: RoleAttributesRaw,
326 membership: RoleMembership,
327 vars: RoleVars,
328 oid: u32,
329 ) -> Result<RoleId, CatalogError> {
330 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
331 self.insert_role(id, name, attributes, membership, vars, oid)?;
332 Ok(id)
333 }
334
335 pub fn insert_user_role(
336 &mut self,
337 name: String,
338 attributes: RoleAttributesRaw,
339 membership: RoleMembership,
340 vars: RoleVars,
341 temporary_oids: &HashSet<u32>,
342 ) -> Result<(RoleId, u32), CatalogError> {
343 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
344 let id = RoleId::User(id);
345 let oid = self.allocate_oid(temporary_oids)?;
346 self.insert_role(id, name, attributes, membership, vars, oid)?;
347 Ok((id, oid))
348 }
349
350 fn insert_role(
351 &mut self,
352 id: RoleId,
353 name: String,
354 attributes: RoleAttributesRaw,
355 membership: RoleMembership,
356 vars: RoleVars,
357 oid: u32,
358 ) -> Result<(), CatalogError> {
359 if let Some(ref password) = attributes.password {
360 let hash =
361 mz_auth::hash::scram256_hash(password).expect("password hash should be valid");
362 match self.role_auth.insert(
363 RoleAuthKey { role_id: id },
364 RoleAuthValue {
365 password_hash: Some(hash),
366 updated_at: SYSTEM_TIME(),
367 },
368 self.op_id,
369 ) {
370 Ok(_) => {}
371 Err(_) => {
372 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
373 }
374 }
375 }
376
377 match self.roles.insert(
378 RoleKey { id },
379 RoleValue {
380 name: name.clone(),
381 attributes: attributes.into(),
382 membership,
383 vars,
384 oid,
385 },
386 self.op_id,
387 ) {
388 Ok(_) => Ok(()),
389 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
390 }
391 }
392
393 pub fn insert_user_cluster(
395 &mut self,
396 cluster_id: ClusterId,
397 cluster_name: &str,
398 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
399 owner_id: RoleId,
400 privileges: Vec<MzAclItem>,
401 config: ClusterConfig,
402 temporary_oids: &HashSet<u32>,
403 ) -> Result<(), CatalogError> {
404 self.insert_cluster(
405 cluster_id,
406 cluster_name,
407 introspection_source_indexes,
408 owner_id,
409 privileges,
410 config,
411 temporary_oids,
412 )
413 }
414
415 pub fn insert_system_cluster(
417 &mut self,
418 cluster_name: &str,
419 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
420 privileges: Vec<MzAclItem>,
421 owner_id: RoleId,
422 config: ClusterConfig,
423 temporary_oids: &HashSet<u32>,
424 ) -> Result<(), CatalogError> {
425 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
426 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
427 self.insert_cluster(
428 cluster_id,
429 cluster_name,
430 introspection_source_indexes,
431 owner_id,
432 privileges,
433 config,
434 temporary_oids,
435 )
436 }
437
438 fn insert_cluster(
439 &mut self,
440 cluster_id: ClusterId,
441 cluster_name: &str,
442 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
443 owner_id: RoleId,
444 privileges: Vec<MzAclItem>,
445 config: ClusterConfig,
446 temporary_oids: &HashSet<u32>,
447 ) -> Result<(), CatalogError> {
448 if let Err(_) = self.clusters.insert(
449 ClusterKey { id: cluster_id },
450 ClusterValue {
451 name: cluster_name.to_string(),
452 owner_id,
453 privileges,
454 config,
455 },
456 self.op_id,
457 ) {
458 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
459 };
460
461 let amount = usize_to_u64(introspection_source_indexes.len());
462 let oids = self.allocate_oids(amount, temporary_oids)?;
463 let introspection_source_indexes: Vec<_> = introspection_source_indexes
464 .into_iter()
465 .zip_eq(oids)
466 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
467 .collect();
468 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
469 let introspection_source_index = IntrospectionSourceIndex {
470 cluster_id,
471 name: builtin.name.to_string(),
472 item_id,
473 index_id,
474 oid,
475 };
476 let (key, value) = introspection_source_index.into_key_value();
477 self.introspection_sources
478 .insert(key, value, self.op_id)
479 .expect("no uniqueness violation");
480 }
481
482 Ok(())
483 }
484
485 pub fn rename_cluster(
486 &mut self,
487 cluster_id: ClusterId,
488 cluster_name: &str,
489 cluster_to_name: &str,
490 ) -> Result<(), CatalogError> {
491 let key = ClusterKey { id: cluster_id };
492
493 match self.clusters.update(
494 |k, v| {
495 if *k == key {
496 let mut value = v.clone();
497 value.name = cluster_to_name.to_string();
498 Some(value)
499 } else {
500 None
501 }
502 },
503 self.op_id,
504 )? {
505 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
506 Diff::ONE => Ok(()),
507 n => panic!(
508 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
509 ),
510 }
511 }
512
513 pub fn rename_cluster_replica(
514 &mut self,
515 replica_id: ReplicaId,
516 replica_name: &QualifiedReplica,
517 replica_to_name: &str,
518 ) -> Result<(), CatalogError> {
519 let key = ClusterReplicaKey { id: replica_id };
520
521 match self.cluster_replicas.update(
522 |k, v| {
523 if *k == key {
524 let mut value = v.clone();
525 value.name = replica_to_name.to_string();
526 Some(value)
527 } else {
528 None
529 }
530 },
531 self.op_id,
532 )? {
533 Diff::ZERO => {
534 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
535 }
536 Diff::ONE => Ok(()),
537 n => panic!(
538 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
539 ),
540 }
541 }
542
543 pub fn insert_cluster_replica(
544 &mut self,
545 cluster_id: ClusterId,
546 replica_name: &str,
547 config: ReplicaConfig,
548 owner_id: RoleId,
549 ) -> Result<ReplicaId, CatalogError> {
550 let replica_id = match cluster_id {
551 ClusterId::System(_) => self.allocate_system_replica_id()?,
552 ClusterId::User(_) => self.allocate_user_replica_id()?,
553 };
554 self.insert_cluster_replica_with_id(
555 cluster_id,
556 replica_id,
557 replica_name,
558 config,
559 owner_id,
560 )?;
561 Ok(replica_id)
562 }
563
564 pub(crate) fn insert_cluster_replica_with_id(
565 &mut self,
566 cluster_id: ClusterId,
567 replica_id: ReplicaId,
568 replica_name: &str,
569 config: ReplicaConfig,
570 owner_id: RoleId,
571 ) -> Result<(), CatalogError> {
572 if let Err(_) = self.cluster_replicas.insert(
573 ClusterReplicaKey { id: replica_id },
574 ClusterReplicaValue {
575 cluster_id,
576 name: replica_name.into(),
577 config,
578 owner_id,
579 },
580 self.op_id,
581 ) {
582 let cluster = self
583 .clusters
584 .get(&ClusterKey { id: cluster_id })
585 .expect("cluster exists");
586 return Err(SqlCatalogError::DuplicateReplica(
587 replica_name.to_string(),
588 cluster.name.to_string(),
589 )
590 .into());
591 };
592 Ok(())
593 }
594
595 pub fn insert_user_network_policy(
596 &mut self,
597 name: String,
598 rules: Vec<NetworkPolicyRule>,
599 privileges: Vec<MzAclItem>,
600 owner_id: RoleId,
601 temporary_oids: &HashSet<u32>,
602 ) -> Result<NetworkPolicyId, CatalogError> {
603 let oid = self.allocate_oid(temporary_oids)?;
604 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
605 let id = NetworkPolicyId::User(id);
606 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
607 }
608
609 pub fn insert_network_policy(
610 &mut self,
611 id: NetworkPolicyId,
612 name: String,
613 rules: Vec<NetworkPolicyRule>,
614 privileges: Vec<MzAclItem>,
615 owner_id: RoleId,
616 oid: u32,
617 ) -> Result<NetworkPolicyId, CatalogError> {
618 match self.network_policies.insert(
619 NetworkPolicyKey { id },
620 NetworkPolicyValue {
621 name: name.clone(),
622 rules,
623 privileges,
624 owner_id,
625 oid,
626 },
627 self.op_id,
628 ) {
629 Ok(_) => Ok(id),
630 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
631 }
632 }
633
634 pub fn update_introspection_source_index_gids(
639 &mut self,
640 mappings: impl Iterator<
641 Item = (
642 ClusterId,
643 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
644 ),
645 >,
646 ) -> Result<(), CatalogError> {
647 for (cluster_id, updates) in mappings {
648 for (name, item_id, index_id, oid) in updates {
649 let introspection_source_index = IntrospectionSourceIndex {
650 cluster_id,
651 name,
652 item_id,
653 index_id,
654 oid,
655 };
656 let (key, value) = introspection_source_index.into_key_value();
657
658 let prev = self
659 .introspection_sources
660 .set(key, Some(value), self.op_id)?;
661 if prev.is_none() {
662 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
663 "{index_id}"
664 ))
665 .into());
666 }
667 }
668 }
669 Ok(())
670 }
671
672 pub fn insert_user_item(
673 &mut self,
674 id: CatalogItemId,
675 global_id: GlobalId,
676 schema_id: SchemaId,
677 item_name: &str,
678 create_sql: String,
679 owner_id: RoleId,
680 privileges: Vec<MzAclItem>,
681 temporary_oids: &HashSet<u32>,
682 versions: BTreeMap<RelationVersion, GlobalId>,
683 ) -> Result<u32, CatalogError> {
684 let oid = self.allocate_oid(temporary_oids)?;
685 self.insert_item(
686 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
687 )?;
688 Ok(oid)
689 }
690
691 pub fn insert_item(
692 &mut self,
693 id: CatalogItemId,
694 oid: u32,
695 global_id: GlobalId,
696 schema_id: SchemaId,
697 item_name: &str,
698 create_sql: String,
699 owner_id: RoleId,
700 privileges: Vec<MzAclItem>,
701 extra_versions: BTreeMap<RelationVersion, GlobalId>,
702 ) -> Result<(), CatalogError> {
703 match self.items.insert(
704 ItemKey { id },
705 ItemValue {
706 schema_id,
707 name: item_name.to_string(),
708 create_sql,
709 owner_id,
710 privileges,
711 oid,
712 global_id,
713 extra_versions,
714 },
715 self.op_id,
716 ) {
717 Ok(_) => Ok(()),
718 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
719 }
720 }
721
722 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
723 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
724 }
725
726 pub fn get_and_increment_id_by(
727 &mut self,
728 key: String,
729 amount: u64,
730 ) -> Result<Vec<u64>, CatalogError> {
731 assert!(
732 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
733 "system item IDs cannot be allocated outside of bootstrap"
734 );
735
736 let current_id = self
737 .id_allocator
738 .items()
739 .get(&IdAllocKey { name: key.clone() })
740 .unwrap_or_else(|| panic!("{key} id allocator missing"))
741 .next_id;
742 let next_id = current_id
743 .checked_add(amount)
744 .ok_or(SqlCatalogError::IdExhaustion)?;
745 let prev = self.id_allocator.set(
746 IdAllocKey { name: key },
747 Some(IdAllocValue { next_id }),
748 self.op_id,
749 )?;
750 assert_eq!(
751 prev,
752 Some(IdAllocValue {
753 next_id: current_id
754 })
755 );
756 Ok((current_id..next_id).collect())
757 }
758
759 pub fn allocate_system_item_ids(
760 &mut self,
761 amount: u64,
762 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
763 assert!(
764 !self.durable_catalog.is_bootstrap_complete(),
765 "we can only allocate system item IDs during bootstrap"
766 );
767 Ok(self
768 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
769 .into_iter()
770 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
772 .collect())
773 }
774
775 pub fn allocate_introspection_source_index_id(
807 cluster_id: &ClusterId,
808 log_variant: LogVariant,
809 ) -> (CatalogItemId, GlobalId) {
810 let cluster_variant: u8 = match cluster_id {
811 ClusterId::System(_) => 1,
812 ClusterId::User(_) => 2,
813 };
814 let cluster_id: u64 = cluster_id.inner_id();
815 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
816 assert_eq!(
817 CLUSTER_ID_MASK & cluster_id,
818 0,
819 "invalid cluster ID: {cluster_id}"
820 );
821 let log_variant: u8 = match log_variant {
822 LogVariant::Timely(TimelyLog::Operates) => 1,
823 LogVariant::Timely(TimelyLog::Channels) => 2,
824 LogVariant::Timely(TimelyLog::Elapsed) => 3,
825 LogVariant::Timely(TimelyLog::Histogram) => 4,
826 LogVariant::Timely(TimelyLog::Addresses) => 5,
827 LogVariant::Timely(TimelyLog::Parks) => 6,
828 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
829 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
830 LogVariant::Timely(TimelyLog::Reachability) => 9,
831 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
832 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
833 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
834 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
835 LogVariant::Differential(DifferentialLog::Sharing) => 14,
836 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
837 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
838 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
839 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
840 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
841 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
842 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
843 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
844 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
845 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
846 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
847 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
848 LogVariant::Compute(ComputeLog::ShutdownDuration) => 27,
849 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
850 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
851 LogVariant::Compute(ComputeLog::LirMapping) => 30,
852 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
853 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
854 };
855
856 let mut id: u64 = u64::from(cluster_variant) << 56;
857 id |= cluster_id << 8;
858 id |= u64::from(log_variant);
859
860 (
861 CatalogItemId::IntrospectionSourceIndex(id),
862 GlobalId::IntrospectionSourceIndex(id),
863 )
864 }
865
866 pub fn allocate_user_item_ids(
867 &mut self,
868 amount: u64,
869 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
870 Ok(self
871 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
872 .into_iter()
873 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
875 .collect())
876 }
877
878 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
879 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
880 Ok(ReplicaId::User(id))
881 }
882
883 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
884 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
885 Ok(ReplicaId::System(id))
886 }
887
888 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
889 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
890 }
891
892 pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
893 self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
894 }
895
896 #[mz_ore::instrument]
899 fn allocate_oids(
900 &mut self,
901 amount: u64,
902 temporary_oids: &HashSet<u32>,
903 ) -> Result<Vec<u32>, CatalogError> {
904 struct UserOid(u32);
907
908 impl UserOid {
909 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
910 if oid < FIRST_USER_OID {
911 Err(anyhow!("invalid user OID {oid}"))
912 } else {
913 Ok(UserOid(oid))
914 }
915 }
916 }
917
918 impl std::ops::AddAssign<u32> for UserOid {
919 fn add_assign(&mut self, rhs: u32) {
920 let (res, overflow) = self.0.overflowing_add(rhs);
921 self.0 = if overflow { FIRST_USER_OID + res } else { res };
922 }
923 }
924
925 if amount > u32::MAX.into() {
926 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
927 }
928
929 let mut allocated_oids = HashSet::with_capacity(
935 self.databases.len()
936 + self.schemas.len()
937 + self.roles.len()
938 + self.items.len()
939 + self.introspection_sources.len()
940 + temporary_oids.len(),
941 );
942 self.databases.for_values(|_, value| {
943 allocated_oids.insert(value.oid);
944 });
945 self.schemas.for_values(|_, value| {
946 allocated_oids.insert(value.oid);
947 });
948 self.roles.for_values(|_, value| {
949 allocated_oids.insert(value.oid);
950 });
951 self.items.for_values(|_, value| {
952 allocated_oids.insert(value.oid);
953 });
954 self.introspection_sources.for_values(|_, value| {
955 allocated_oids.insert(value.oid);
956 });
957
958 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
959
960 let start_oid: u32 = self
961 .id_allocator
962 .items()
963 .get(&IdAllocKey {
964 name: OID_ALLOC_KEY.to_string(),
965 })
966 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
967 .next_id
968 .try_into()
969 .expect("we should never persist an oid outside of the u32 range");
970 let mut current_oid = UserOid::new(start_oid)
971 .expect("we should never persist an oid outside of user OID range");
972 let mut oids = Vec::new();
973 while oids.len() < u64_to_usize(amount) {
974 if !is_allocated(current_oid.0) {
975 oids.push(current_oid.0);
976 }
977 current_oid += 1;
978
979 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
980 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
982 }
983 }
984
985 let next_id = current_oid.0;
986 let prev = self.id_allocator.set(
987 IdAllocKey {
988 name: OID_ALLOC_KEY.to_string(),
989 },
990 Some(IdAllocValue {
991 next_id: next_id.into(),
992 }),
993 self.op_id,
994 )?;
995 assert_eq!(
996 prev,
997 Some(IdAllocValue {
998 next_id: start_oid.into(),
999 })
1000 );
1001
1002 Ok(oids)
1003 }
1004
1005 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1008 self.allocate_oids(1, temporary_oids)
1009 .map(|oids| oids.into_element())
1010 }
1011
1012 pub(crate) fn insert_id_allocator(
1013 &mut self,
1014 name: String,
1015 next_id: u64,
1016 ) -> Result<(), CatalogError> {
1017 match self.id_allocator.insert(
1018 IdAllocKey { name: name.clone() },
1019 IdAllocValue { next_id },
1020 self.op_id,
1021 ) {
1022 Ok(_) => Ok(()),
1023 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1024 }
1025 }
1026
1027 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1034 let prev = self
1035 .databases
1036 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1037 if prev.is_some() {
1038 Ok(())
1039 } else {
1040 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1041 }
1042 }
1043
1044 pub fn remove_databases(
1051 &mut self,
1052 databases: &BTreeSet<DatabaseId>,
1053 ) -> Result<(), CatalogError> {
1054 if databases.is_empty() {
1055 return Ok(());
1056 }
1057
1058 let to_remove = databases
1059 .iter()
1060 .map(|id| (DatabaseKey { id: *id }, None))
1061 .collect();
1062 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1063 prev.retain(|_k, val| val.is_none());
1064
1065 if !prev.is_empty() {
1066 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1067 return Err(SqlCatalogError::UnknownDatabase(err).into());
1068 }
1069
1070 Ok(())
1071 }
1072
1073 pub fn remove_schema(
1080 &mut self,
1081 database_id: &Option<DatabaseId>,
1082 schema_id: &SchemaId,
1083 ) -> Result<(), CatalogError> {
1084 let prev = self
1085 .schemas
1086 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1087 if prev.is_some() {
1088 Ok(())
1089 } else {
1090 let database_name = match database_id {
1091 Some(id) => format!("{id}."),
1092 None => "".to_string(),
1093 };
1094 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1095 }
1096 }
1097
1098 pub fn remove_schemas(
1105 &mut self,
1106 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1107 ) -> Result<(), CatalogError> {
1108 if schemas.is_empty() {
1109 return Ok(());
1110 }
1111
1112 let to_remove = schemas
1113 .iter()
1114 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1115 .collect();
1116 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1117 prev.retain(|_k, v| v.is_none());
1118
1119 if !prev.is_empty() {
1120 let err = prev
1121 .keys()
1122 .map(|k| {
1123 let db_spec = schemas.get(&k.id).expect("should_exist");
1124 let db_name = match db_spec {
1125 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1126 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1127 };
1128 format!("{}.{}", db_name, k.id)
1129 })
1130 .join(", ");
1131
1132 return Err(SqlCatalogError::UnknownSchema(err).into());
1133 }
1134
1135 Ok(())
1136 }
1137
1138 pub fn remove_source_references(
1139 &mut self,
1140 source_id: CatalogItemId,
1141 ) -> Result<(), CatalogError> {
1142 let deleted = self
1143 .source_references
1144 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1145 .is_some();
1146 if deleted {
1147 Ok(())
1148 } else {
1149 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1150 }
1151 }
1152
1153 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1160 assert!(
1161 roles.iter().all(|id| id.is_user()),
1162 "cannot delete non-user roles"
1163 );
1164 self.remove_roles(roles)
1165 }
1166
1167 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1174 if roles.is_empty() {
1175 return Ok(());
1176 }
1177
1178 let to_remove_keys = roles
1179 .iter()
1180 .map(|role_id| RoleKey { id: *role_id })
1181 .collect::<Vec<_>>();
1182
1183 let to_remove_roles = to_remove_keys
1184 .iter()
1185 .map(|role_key| (role_key.clone(), None))
1186 .collect();
1187
1188 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1189
1190 let to_remove_role_auth = to_remove_keys
1191 .iter()
1192 .map(|role_key| {
1193 (
1194 RoleAuthKey {
1195 role_id: role_key.id,
1196 },
1197 None,
1198 )
1199 })
1200 .collect();
1201
1202 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1203
1204 prev.retain(|_k, v| v.is_none());
1205 if !prev.is_empty() {
1206 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1207 return Err(SqlCatalogError::UnknownRole(err).into());
1208 }
1209
1210 role_auth_prev.retain(|_k, v| v.is_none());
1211 Ok(())
1215 }
1216
1217 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1224 if clusters.is_empty() {
1225 return Ok(());
1226 }
1227
1228 let to_remove = clusters
1229 .iter()
1230 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1231 .collect();
1232 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1233
1234 prev.retain(|_k, v| v.is_none());
1235 if !prev.is_empty() {
1236 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1237 return Err(SqlCatalogError::UnknownCluster(err).into());
1238 }
1239
1240 self.cluster_replicas
1246 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1247 self.introspection_sources
1248 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1249
1250 Ok(())
1251 }
1252
1253 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1260 let deleted = self
1261 .cluster_replicas
1262 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1263 .is_some();
1264 if deleted {
1265 Ok(())
1266 } else {
1267 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1268 }
1269 }
1270
1271 pub fn remove_cluster_replicas(
1278 &mut self,
1279 replicas: &BTreeSet<ReplicaId>,
1280 ) -> Result<(), CatalogError> {
1281 if replicas.is_empty() {
1282 return Ok(());
1283 }
1284
1285 let to_remove = replicas
1286 .iter()
1287 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1288 .collect();
1289 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1290
1291 prev.retain(|_k, v| v.is_none());
1292 if !prev.is_empty() {
1293 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1294 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1295 }
1296
1297 Ok(())
1298 }
1299
1300 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1307 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1308 if prev.is_some() {
1309 Ok(())
1310 } else {
1311 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1312 }
1313 }
1314
1315 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1322 if ids.is_empty() {
1323 return Ok(());
1324 }
1325
1326 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1327 let n = self.items.delete_by_keys(ks, self.op_id).len();
1328 if n == ids.len() {
1329 Ok(())
1330 } else {
1331 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1332 let mut unknown = ids.difference(&item_ids);
1333 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1334 }
1335 }
1336
1337 pub fn remove_system_object_mappings(
1344 &mut self,
1345 descriptions: BTreeSet<SystemObjectDescription>,
1346 ) -> Result<(), CatalogError> {
1347 if descriptions.is_empty() {
1348 return Ok(());
1349 }
1350
1351 let ks: Vec<_> = descriptions
1352 .clone()
1353 .into_iter()
1354 .map(|desc| GidMappingKey {
1355 schema_name: desc.schema_name,
1356 object_type: desc.object_type,
1357 object_name: desc.object_name,
1358 })
1359 .collect();
1360 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1361
1362 if n == descriptions.len() {
1363 Ok(())
1364 } else {
1365 let item_descriptions = self
1366 .system_gid_mapping
1367 .items()
1368 .keys()
1369 .map(|k| SystemObjectDescription {
1370 schema_name: k.schema_name.clone(),
1371 object_type: k.object_type.clone(),
1372 object_name: k.object_name.clone(),
1373 })
1374 .collect();
1375 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1376 format!(
1377 "{} {}.{}",
1378 desc.object_type, desc.schema_name, desc.object_name
1379 )
1380 });
1381 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1382 }
1383 }
1384
1385 pub fn remove_introspection_source_indexes(
1392 &mut self,
1393 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1394 ) -> Result<(), CatalogError> {
1395 if introspection_source_indexes.is_empty() {
1396 return Ok(());
1397 }
1398
1399 let ks: Vec<_> = introspection_source_indexes
1400 .clone()
1401 .into_iter()
1402 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1403 .collect();
1404 let n = self
1405 .introspection_sources
1406 .delete_by_keys(ks, self.op_id)
1407 .len();
1408 if n == introspection_source_indexes.len() {
1409 Ok(())
1410 } else {
1411 let txn_indexes = self
1412 .introspection_sources
1413 .items()
1414 .keys()
1415 .map(|k| (k.cluster_id, k.name.clone()))
1416 .collect();
1417 let mut unknown = introspection_source_indexes
1418 .difference(&txn_indexes)
1419 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1420 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1421 }
1422 }
1423
1424 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1431 let updated =
1432 self.items
1433 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1434 if updated {
1435 Ok(())
1436 } else {
1437 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1438 }
1439 }
1440
1441 pub fn update_items(
1449 &mut self,
1450 items: BTreeMap<CatalogItemId, Item>,
1451 ) -> Result<(), CatalogError> {
1452 if items.is_empty() {
1453 return Ok(());
1454 }
1455
1456 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1457 let kvs: Vec<_> = items
1458 .clone()
1459 .into_iter()
1460 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1461 .collect();
1462 let n = self.items.update_by_keys(kvs, self.op_id)?;
1463 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1464 if n == update_ids.len() {
1465 Ok(())
1466 } else {
1467 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1468 let mut unknown = update_ids.difference(&item_ids);
1469 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1470 }
1471 }
1472
1473 pub fn update_role(
1481 &mut self,
1482 id: RoleId,
1483 role: Role,
1484 password: PasswordAction,
1485 ) -> Result<(), CatalogError> {
1486 let key = RoleKey { id };
1487 if self.roles.get(&key).is_some() {
1488 let auth_key = RoleAuthKey { role_id: id };
1489
1490 match password {
1491 PasswordAction::Set(new_password) => {
1492 let hash = mz_auth::hash::scram256_hash(&new_password)
1493 .expect("password hash should be valid");
1494 let value = RoleAuthValue {
1495 password_hash: Some(hash),
1496 updated_at: SYSTEM_TIME(),
1497 };
1498
1499 if self.role_auth.get(&auth_key).is_some() {
1500 self.role_auth
1501 .update_by_key(auth_key.clone(), value, self.op_id)?;
1502 } else {
1503 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1504 }
1505 }
1506 PasswordAction::Clear => {
1507 let value = RoleAuthValue {
1508 password_hash: None,
1509 updated_at: SYSTEM_TIME(),
1510 };
1511 if self.role_auth.get(&auth_key).is_some() {
1512 self.role_auth
1513 .update_by_key(auth_key.clone(), value, self.op_id)?;
1514 }
1515 }
1516 PasswordAction::NoChange => {}
1517 }
1518
1519 self.roles
1520 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1521
1522 Ok(())
1523 } else {
1524 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1525 }
1526 }
1527
1528 pub fn update_roles_without_auth(
1539 &mut self,
1540 roles: BTreeMap<RoleId, Role>,
1541 ) -> Result<(), CatalogError> {
1542 if roles.is_empty() {
1543 return Ok(());
1544 }
1545
1546 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1547 let kvs: Vec<_> = roles
1548 .into_iter()
1549 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1550 .collect();
1551 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1552 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1553
1554 if n == update_role_ids.len() {
1555 Ok(())
1556 } else {
1557 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1558 let mut unknown = update_role_ids.difference(&role_ids);
1559 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1560 }
1561 }
1562
1563 pub fn update_system_object_mappings(
1568 &mut self,
1569 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1570 ) -> Result<(), CatalogError> {
1571 if mappings.is_empty() {
1572 return Ok(());
1573 }
1574
1575 let n = self.system_gid_mapping.update(
1576 |_k, v| {
1577 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1578 let (_, new_value) = mapping.clone().into_key_value();
1579 Some(new_value)
1580 } else {
1581 None
1582 }
1583 },
1584 self.op_id,
1585 )?;
1586
1587 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1588 != mappings.len()
1589 {
1590 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1591 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1592 }
1593
1594 Ok(())
1595 }
1596
1597 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1604 let updated = self.clusters.update_by_key(
1605 ClusterKey { id },
1606 cluster.into_key_value().1,
1607 self.op_id,
1608 )?;
1609 if updated {
1610 Ok(())
1611 } else {
1612 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1613 }
1614 }
1615
1616 pub fn update_cluster_replica(
1623 &mut self,
1624 replica_id: ReplicaId,
1625 replica: ClusterReplica,
1626 ) -> Result<(), CatalogError> {
1627 let updated = self.cluster_replicas.update_by_key(
1628 ClusterReplicaKey { id: replica_id },
1629 replica.into_key_value().1,
1630 self.op_id,
1631 )?;
1632 if updated {
1633 Ok(())
1634 } else {
1635 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1636 }
1637 }
1638
1639 pub fn update_database(
1646 &mut self,
1647 id: DatabaseId,
1648 database: Database,
1649 ) -> Result<(), CatalogError> {
1650 let updated = self.databases.update_by_key(
1651 DatabaseKey { id },
1652 database.into_key_value().1,
1653 self.op_id,
1654 )?;
1655 if updated {
1656 Ok(())
1657 } else {
1658 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1659 }
1660 }
1661
1662 pub fn update_schema(
1669 &mut self,
1670 schema_id: SchemaId,
1671 schema: Schema,
1672 ) -> Result<(), CatalogError> {
1673 let updated = self.schemas.update_by_key(
1674 SchemaKey { id: schema_id },
1675 schema.into_key_value().1,
1676 self.op_id,
1677 )?;
1678 if updated {
1679 Ok(())
1680 } else {
1681 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1682 }
1683 }
1684
1685 pub fn update_network_policy(
1692 &mut self,
1693 id: NetworkPolicyId,
1694 network_policy: NetworkPolicy,
1695 ) -> Result<(), CatalogError> {
1696 let updated = self.network_policies.update_by_key(
1697 NetworkPolicyKey { id },
1698 network_policy.into_key_value().1,
1699 self.op_id,
1700 )?;
1701 if updated {
1702 Ok(())
1703 } else {
1704 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1705 }
1706 }
1707 pub fn remove_network_policies(
1714 &mut self,
1715 network_policies: &BTreeSet<NetworkPolicyId>,
1716 ) -> Result<(), CatalogError> {
1717 if network_policies.is_empty() {
1718 return Ok(());
1719 }
1720
1721 let to_remove = network_policies
1722 .iter()
1723 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1724 .collect();
1725 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1726 assert!(
1727 prev.iter().all(|(k, _)| k.id.is_user()),
1728 "cannot delete non-user network policy"
1729 );
1730
1731 prev.retain(|_k, v| v.is_none());
1732 if !prev.is_empty() {
1733 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1734 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1735 }
1736
1737 Ok(())
1738 }
1739 pub fn set_default_privilege(
1743 &mut self,
1744 role_id: RoleId,
1745 database_id: Option<DatabaseId>,
1746 schema_id: Option<SchemaId>,
1747 object_type: ObjectType,
1748 grantee: RoleId,
1749 privileges: Option<AclMode>,
1750 ) -> Result<(), CatalogError> {
1751 self.default_privileges.set(
1752 DefaultPrivilegesKey {
1753 role_id,
1754 database_id,
1755 schema_id,
1756 object_type,
1757 grantee,
1758 },
1759 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1760 self.op_id,
1761 )?;
1762 Ok(())
1763 }
1764
1765 pub fn set_default_privileges(
1767 &mut self,
1768 default_privileges: Vec<DefaultPrivilege>,
1769 ) -> Result<(), CatalogError> {
1770 if default_privileges.is_empty() {
1771 return Ok(());
1772 }
1773
1774 let default_privileges = default_privileges
1775 .into_iter()
1776 .map(DurableType::into_key_value)
1777 .map(|(k, v)| (k, Some(v)))
1778 .collect();
1779 self.default_privileges
1780 .set_many(default_privileges, self.op_id)?;
1781 Ok(())
1782 }
1783
1784 pub fn set_system_privilege(
1788 &mut self,
1789 grantee: RoleId,
1790 grantor: RoleId,
1791 acl_mode: Option<AclMode>,
1792 ) -> Result<(), CatalogError> {
1793 self.system_privileges.set(
1794 SystemPrivilegesKey { grantee, grantor },
1795 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1796 self.op_id,
1797 )?;
1798 Ok(())
1799 }
1800
1801 pub fn set_system_privileges(
1803 &mut self,
1804 system_privileges: Vec<MzAclItem>,
1805 ) -> Result<(), CatalogError> {
1806 if system_privileges.is_empty() {
1807 return Ok(());
1808 }
1809
1810 let system_privileges = system_privileges
1811 .into_iter()
1812 .map(DurableType::into_key_value)
1813 .map(|(k, v)| (k, Some(v)))
1814 .collect();
1815 self.system_privileges
1816 .set_many(system_privileges, self.op_id)?;
1817 Ok(())
1818 }
1819
1820 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1822 self.settings.set(
1823 SettingKey { name },
1824 value.map(|value| SettingValue { value }),
1825 self.op_id,
1826 )?;
1827 Ok(())
1828 }
1829
1830 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1831 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1832 }
1833
1834 pub fn insert_introspection_source_indexes(
1836 &mut self,
1837 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1838 temporary_oids: &HashSet<u32>,
1839 ) -> Result<(), CatalogError> {
1840 if introspection_source_indexes.is_empty() {
1841 return Ok(());
1842 }
1843
1844 let amount = usize_to_u64(introspection_source_indexes.len());
1845 let oids = self.allocate_oids(amount, temporary_oids)?;
1846 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1847 .into_iter()
1848 .zip_eq(oids)
1849 .map(
1850 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1851 cluster_id,
1852 name,
1853 item_id,
1854 index_id,
1855 oid,
1856 },
1857 )
1858 .collect();
1859
1860 for introspection_source_index in introspection_source_indexes {
1861 let (key, value) = introspection_source_index.into_key_value();
1862 self.introspection_sources.insert(key, value, self.op_id)?;
1863 }
1864
1865 Ok(())
1866 }
1867
1868 pub fn set_system_object_mappings(
1870 &mut self,
1871 mappings: Vec<SystemObjectMapping>,
1872 ) -> Result<(), CatalogError> {
1873 if mappings.is_empty() {
1874 return Ok(());
1875 }
1876
1877 let mappings = mappings
1878 .into_iter()
1879 .map(DurableType::into_key_value)
1880 .map(|(k, v)| (k, Some(v)))
1881 .collect();
1882 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1883 Ok(())
1884 }
1885
1886 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1888 if replicas.is_empty() {
1889 return Ok(());
1890 }
1891
1892 let replicas = replicas
1893 .into_iter()
1894 .map(DurableType::into_key_value)
1895 .map(|(k, v)| (k, Some(v)))
1896 .collect();
1897 self.cluster_replicas.set_many(replicas, self.op_id)?;
1898 Ok(())
1899 }
1900
1901 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1903 match value {
1904 Some(value) => {
1905 let config = Config { key, value };
1906 let (key, value) = config.into_key_value();
1907 self.configs.set(key, Some(value), self.op_id)?;
1908 }
1909 None => {
1910 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1911 }
1912 }
1913 Ok(())
1914 }
1915
1916 pub fn get_config(&self, key: String) -> Option<u64> {
1918 self.configs
1919 .get(&ConfigKey { key })
1920 .map(|entry| entry.value)
1921 }
1922
1923 pub fn get_setting(&self, name: String) -> Option<&str> {
1925 self.settings
1926 .get(&SettingKey { name })
1927 .map(|entry| &*entry.value)
1928 }
1929
1930 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1931 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1932 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1933 }
1934
1935 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1936 self.set_setting(
1937 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1938 Some(shard_id.to_string()),
1939 )
1940 }
1941
1942 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1943 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1944 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1945 }
1946
1947 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1948 self.set_setting(
1949 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1950 Some(shard_id.to_string()),
1951 )
1952 }
1953
1954 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1960 self.set_config(
1961 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1962 Some(
1963 value
1964 .as_millis()
1965 .try_into()
1966 .expect("max wait fits into u64"),
1967 ),
1968 )
1969 }
1970
1971 pub fn set_0dt_deployment_ddl_check_interval(
1978 &mut self,
1979 value: Duration,
1980 ) -> Result<(), CatalogError> {
1981 self.set_config(
1982 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
1983 Some(
1984 value
1985 .as_millis()
1986 .try_into()
1987 .expect("ddl check interval fits into u64"),
1988 ),
1989 )
1990 }
1991
1992 pub fn set_enable_0dt_deployment_panic_after_timeout(
1998 &mut self,
1999 value: bool,
2000 ) -> Result<(), CatalogError> {
2001 self.set_config(
2002 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2003 Some(u64::from(value)),
2004 )
2005 }
2006
2007 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2013 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2014 }
2015
2016 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2023 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2024 }
2025
2026 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2033 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2034 }
2035
2036 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2038 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2039 }
2040
2041 pub fn update_comment(
2042 &mut self,
2043 object_id: CommentObjectId,
2044 sub_component: Option<usize>,
2045 comment: Option<String>,
2046 ) -> Result<(), CatalogError> {
2047 let key = CommentKey {
2048 object_id,
2049 sub_component,
2050 };
2051 let value = comment.map(|c| CommentValue { comment: c });
2052 self.comments.set(key, value, self.op_id)?;
2053
2054 Ok(())
2055 }
2056
2057 pub fn drop_comments(
2058 &mut self,
2059 object_ids: &BTreeSet<CommentObjectId>,
2060 ) -> Result<(), CatalogError> {
2061 if object_ids.is_empty() {
2062 return Ok(());
2063 }
2064
2065 self.comments
2066 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2067 Ok(())
2068 }
2069
2070 pub fn update_source_references(
2071 &mut self,
2072 source_id: CatalogItemId,
2073 references: Vec<SourceReference>,
2074 updated_at: u64,
2075 ) -> Result<(), CatalogError> {
2076 let key = SourceReferencesKey { source_id };
2077 let value = SourceReferencesValue {
2078 references,
2079 updated_at,
2080 };
2081 self.source_references.set(key, Some(value), self.op_id)?;
2082 Ok(())
2083 }
2084
2085 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2087 let key = ServerConfigurationKey {
2088 name: name.to_string(),
2089 };
2090 let value = ServerConfigurationValue { value };
2091 self.system_configurations
2092 .set(key, Some(value), self.op_id)?;
2093 Ok(())
2094 }
2095
2096 pub fn remove_system_config(&mut self, name: &str) {
2098 let key = ServerConfigurationKey {
2099 name: name.to_string(),
2100 };
2101 self.system_configurations
2102 .set(key, None, self.op_id)
2103 .expect("cannot have uniqueness violation");
2104 }
2105
2106 pub fn clear_system_configs(&mut self) {
2108 self.system_configurations.delete(|_k, _v| true, self.op_id);
2109 }
2110
2111 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2112 match self.configs.insert(
2113 ConfigKey { key: key.clone() },
2114 ConfigValue { value },
2115 self.op_id,
2116 ) {
2117 Ok(_) => Ok(()),
2118 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2119 }
2120 }
2121
2122 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2123 self.clusters
2124 .items()
2125 .into_iter()
2126 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2127 }
2128
2129 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2130 self.cluster_replicas
2131 .items()
2132 .into_iter()
2133 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2134 }
2135
2136 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2137 self.roles
2138 .items()
2139 .into_iter()
2140 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2141 }
2142
2143 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2144 self.network_policies
2145 .items()
2146 .into_iter()
2147 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2148 }
2149
2150 pub fn get_system_object_mappings(
2151 &self,
2152 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2153 self.system_gid_mapping
2154 .items()
2155 .into_iter()
2156 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2157 }
2158
2159 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2160 self.schemas
2161 .items()
2162 .into_iter()
2163 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2164 }
2165
2166 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2167 self.system_configurations
2168 .items()
2169 .into_iter()
2170 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2171 }
2172
2173 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2174 let key = SchemaKey { id: *id };
2175 self.schemas
2176 .get(&key)
2177 .map(|v| DurableType::from_key_value(key, v.clone()))
2178 }
2179
2180 pub fn get_introspection_source_indexes(
2181 &self,
2182 cluster_id: ClusterId,
2183 ) -> BTreeMap<&str, (GlobalId, u32)> {
2184 self.introspection_sources
2185 .items()
2186 .into_iter()
2187 .filter(|(k, _v)| k.cluster_id == cluster_id)
2188 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2189 .collect()
2190 }
2191
2192 pub fn get_catalog_content_version(&self) -> Option<&str> {
2193 self.settings
2194 .get(&SettingKey {
2195 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2196 })
2197 .map(|value| &*value.value)
2198 }
2199
2200 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2201 self.settings
2202 .get(&SettingKey {
2203 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2204 })
2205 .map(|value| value.value.clone())
2206 }
2207
2208 #[must_use]
2214 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2215 let updates = self.get_op_updates();
2216 self.commit_op();
2217 updates
2218 }
2219
2220 fn get_op_updates(&self) -> Vec<StateUpdate> {
2221 fn get_collection_op_updates<'a, T>(
2222 table_txn: &'a TableTransaction<T::Key, T::Value>,
2223 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2224 op: Timestamp,
2225 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2226 where
2227 T::Key: Ord + Eq + Clone + Debug,
2228 T::Value: Ord + Clone + Debug,
2229 T: DurableType,
2230 {
2231 table_txn
2232 .pending
2233 .iter()
2234 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2235 .filter_map(move |(k, v)| {
2236 if v.ts == op {
2237 let key = k.clone();
2238 let value = v.value.clone();
2239 let diff = v.diff.clone().try_into().expect("invalid diff");
2240 let update = DurableType::from_key_value(key, value);
2241 let kind = kind_fn(update);
2242 Some((kind, diff))
2243 } else {
2244 None
2245 }
2246 })
2247 }
2248
2249 fn get_large_collection_op_updates<'a, T>(
2250 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2251 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2252 op: Timestamp,
2253 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2254 where
2255 T::Key: Ord + Eq + Clone + Debug,
2256 T: DurableType<Value = ()>,
2257 {
2258 collection.iter().filter_map(move |(k, diff, ts)| {
2259 if *ts == op {
2260 let key = k.clone();
2261 let diff = diff.clone().try_into().expect("invalid diff");
2262 let update = DurableType::from_key_value(key, ());
2263 let kind = kind_fn(update);
2264 Some((kind, diff))
2265 } else {
2266 None
2267 }
2268 })
2269 }
2270
2271 let Transaction {
2272 durable_catalog: _,
2273 databases,
2274 schemas,
2275 items,
2276 comments,
2277 roles,
2278 role_auth,
2279 clusters,
2280 network_policies,
2281 cluster_replicas,
2282 introspection_sources,
2283 system_gid_mapping,
2284 system_configurations,
2285 default_privileges,
2286 source_references,
2287 system_privileges,
2288 audit_log_updates,
2289 storage_collection_metadata,
2290 unfinalized_shards,
2291 id_allocator: _,
2293 configs: _,
2294 settings: _,
2295 txn_wal_shard: _,
2296 upper,
2297 op_id: _,
2298 } = &self;
2299
2300 let updates = std::iter::empty()
2301 .chain(get_collection_op_updates(
2302 roles,
2303 StateUpdateKind::Role,
2304 self.op_id,
2305 ))
2306 .chain(get_collection_op_updates(
2307 role_auth,
2308 StateUpdateKind::RoleAuth,
2309 self.op_id,
2310 ))
2311 .chain(get_collection_op_updates(
2312 databases,
2313 StateUpdateKind::Database,
2314 self.op_id,
2315 ))
2316 .chain(get_collection_op_updates(
2317 schemas,
2318 StateUpdateKind::Schema,
2319 self.op_id,
2320 ))
2321 .chain(get_collection_op_updates(
2322 default_privileges,
2323 StateUpdateKind::DefaultPrivilege,
2324 self.op_id,
2325 ))
2326 .chain(get_collection_op_updates(
2327 system_privileges,
2328 StateUpdateKind::SystemPrivilege,
2329 self.op_id,
2330 ))
2331 .chain(get_collection_op_updates(
2332 system_configurations,
2333 StateUpdateKind::SystemConfiguration,
2334 self.op_id,
2335 ))
2336 .chain(get_collection_op_updates(
2337 clusters,
2338 StateUpdateKind::Cluster,
2339 self.op_id,
2340 ))
2341 .chain(get_collection_op_updates(
2342 network_policies,
2343 StateUpdateKind::NetworkPolicy,
2344 self.op_id,
2345 ))
2346 .chain(get_collection_op_updates(
2347 introspection_sources,
2348 StateUpdateKind::IntrospectionSourceIndex,
2349 self.op_id,
2350 ))
2351 .chain(get_collection_op_updates(
2352 cluster_replicas,
2353 StateUpdateKind::ClusterReplica,
2354 self.op_id,
2355 ))
2356 .chain(get_collection_op_updates(
2357 system_gid_mapping,
2358 StateUpdateKind::SystemObjectMapping,
2359 self.op_id,
2360 ))
2361 .chain(get_collection_op_updates(
2362 items,
2363 StateUpdateKind::Item,
2364 self.op_id,
2365 ))
2366 .chain(get_collection_op_updates(
2367 comments,
2368 StateUpdateKind::Comment,
2369 self.op_id,
2370 ))
2371 .chain(get_collection_op_updates(
2372 source_references,
2373 StateUpdateKind::SourceReferences,
2374 self.op_id,
2375 ))
2376 .chain(get_collection_op_updates(
2377 storage_collection_metadata,
2378 StateUpdateKind::StorageCollectionMetadata,
2379 self.op_id,
2380 ))
2381 .chain(get_collection_op_updates(
2382 unfinalized_shards,
2383 StateUpdateKind::UnfinalizedShard,
2384 self.op_id,
2385 ))
2386 .chain(get_large_collection_op_updates(
2387 audit_log_updates,
2388 StateUpdateKind::AuditLog,
2389 self.op_id,
2390 ))
2391 .map(|(kind, diff)| StateUpdate {
2392 kind,
2393 ts: upper.clone(),
2394 diff,
2395 })
2396 .collect();
2397
2398 updates
2399 }
2400
2401 pub fn is_savepoint(&self) -> bool {
2402 self.durable_catalog.is_savepoint()
2403 }
2404
2405 fn commit_op(&mut self) {
2406 self.op_id += 1;
2407 }
2408
2409 pub fn op_id(&self) -> Timestamp {
2410 self.op_id
2411 }
2412
2413 pub fn upper(&self) -> mz_repr::Timestamp {
2414 self.upper
2415 }
2416
2417 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2418 let audit_log_updates = self
2419 .audit_log_updates
2420 .into_iter()
2421 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2422 .collect();
2423
2424 let txn_batch = TransactionBatch {
2425 databases: self.databases.pending(),
2426 schemas: self.schemas.pending(),
2427 items: self.items.pending(),
2428 comments: self.comments.pending(),
2429 roles: self.roles.pending(),
2430 role_auth: self.role_auth.pending(),
2431 clusters: self.clusters.pending(),
2432 cluster_replicas: self.cluster_replicas.pending(),
2433 network_policies: self.network_policies.pending(),
2434 introspection_sources: self.introspection_sources.pending(),
2435 id_allocator: self.id_allocator.pending(),
2436 configs: self.configs.pending(),
2437 source_references: self.source_references.pending(),
2438 settings: self.settings.pending(),
2439 system_gid_mapping: self.system_gid_mapping.pending(),
2440 system_configurations: self.system_configurations.pending(),
2441 default_privileges: self.default_privileges.pending(),
2442 system_privileges: self.system_privileges.pending(),
2443 storage_collection_metadata: self.storage_collection_metadata.pending(),
2444 unfinalized_shards: self.unfinalized_shards.pending(),
2445 txn_wal_shard: self.txn_wal_shard.pending(),
2446 audit_log_updates,
2447 upper: self.upper,
2448 };
2449 (txn_batch, self.durable_catalog)
2450 }
2451
2452 #[mz_ore::instrument(level = "debug")]
2464 pub(crate) async fn commit_internal(
2465 self,
2466 commit_ts: mz_repr::Timestamp,
2467 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2468 let (mut txn_batch, durable_catalog) = self.into_parts();
2469 let TransactionBatch {
2470 databases,
2471 schemas,
2472 items,
2473 comments,
2474 roles,
2475 role_auth,
2476 clusters,
2477 cluster_replicas,
2478 network_policies,
2479 introspection_sources,
2480 id_allocator,
2481 configs,
2482 source_references,
2483 settings,
2484 system_gid_mapping,
2485 system_configurations,
2486 default_privileges,
2487 system_privileges,
2488 storage_collection_metadata,
2489 unfinalized_shards,
2490 txn_wal_shard,
2491 audit_log_updates,
2492 upper,
2493 } = &mut txn_batch;
2494 differential_dataflow::consolidation::consolidate_updates(databases);
2497 differential_dataflow::consolidation::consolidate_updates(schemas);
2498 differential_dataflow::consolidation::consolidate_updates(items);
2499 differential_dataflow::consolidation::consolidate_updates(comments);
2500 differential_dataflow::consolidation::consolidate_updates(roles);
2501 differential_dataflow::consolidation::consolidate_updates(role_auth);
2502 differential_dataflow::consolidation::consolidate_updates(clusters);
2503 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2504 differential_dataflow::consolidation::consolidate_updates(network_policies);
2505 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2506 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2507 differential_dataflow::consolidation::consolidate_updates(configs);
2508 differential_dataflow::consolidation::consolidate_updates(settings);
2509 differential_dataflow::consolidation::consolidate_updates(source_references);
2510 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2511 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2512 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2513 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2514 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2515 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2516 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2517 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2518
2519 assert!(
2520 commit_ts >= *upper,
2521 "expected commit ts, {}, to be greater than or equal to upper, {}",
2522 commit_ts,
2523 upper
2524 );
2525 let upper = durable_catalog
2526 .commit_transaction(txn_batch, commit_ts)
2527 .await?;
2528 Ok((durable_catalog, upper))
2529 }
2530
2531 #[mz_ore::instrument(level = "debug")]
2548 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2549 let op_updates = self.get_op_updates();
2550 assert!(
2551 op_updates.is_empty(),
2552 "unconsumed transaction updates: {op_updates:?}"
2553 );
2554
2555 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2556 let updates = durable_storage.sync_updates(upper).await?;
2558 soft_assert_no_log!(
2563 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2564 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2565 );
2566 Ok(())
2567 }
2568}
2569
2570use crate::durable::async_trait;
2571
2572use super::objects::{RoleAuthKey, RoleAuthValue};
2573
2574#[async_trait]
2575impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2576 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2577 self.storage_collection_metadata
2578 .items()
2579 .into_iter()
2580 .map(
2581 |(
2582 StorageCollectionMetadataKey { id },
2583 StorageCollectionMetadataValue { shard },
2584 )| { (*id, shard.clone()) },
2585 )
2586 .collect()
2587 }
2588
2589 fn insert_collection_metadata(
2590 &mut self,
2591 metadata: BTreeMap<GlobalId, ShardId>,
2592 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2593 for (id, shard) in metadata {
2594 self.storage_collection_metadata
2595 .insert(
2596 StorageCollectionMetadataKey { id },
2597 StorageCollectionMetadataValue {
2598 shard: shard.clone(),
2599 },
2600 self.op_id,
2601 )
2602 .map_err(|err| match err {
2603 DurableCatalogError::DuplicateKey => {
2604 StorageError::CollectionMetadataAlreadyExists(id)
2605 }
2606 DurableCatalogError::UniquenessViolation => {
2607 StorageError::PersistShardAlreadyInUse(shard)
2608 }
2609 err => StorageError::Generic(anyhow::anyhow!(err)),
2610 })?;
2611 }
2612 Ok(())
2613 }
2614
2615 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2616 let ks: Vec<_> = ids
2617 .into_iter()
2618 .map(|id| StorageCollectionMetadataKey { id })
2619 .collect();
2620 self.storage_collection_metadata
2621 .delete_by_keys(ks, self.op_id)
2622 .into_iter()
2623 .map(
2624 |(
2625 StorageCollectionMetadataKey { id },
2626 StorageCollectionMetadataValue { shard },
2627 )| (id, shard),
2628 )
2629 .collect()
2630 }
2631
2632 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2633 self.unfinalized_shards
2634 .items()
2635 .into_iter()
2636 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2637 .collect()
2638 }
2639
2640 fn insert_unfinalized_shards(
2641 &mut self,
2642 s: BTreeSet<ShardId>,
2643 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2644 for shard in s {
2645 match self
2646 .unfinalized_shards
2647 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2648 {
2649 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2651 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2652 };
2653 }
2654 Ok(())
2655 }
2656
2657 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2658 let ks: Vec<_> = shards
2659 .into_iter()
2660 .map(|shard| UnfinalizedShardKey { shard })
2661 .collect();
2662 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2663 }
2664
2665 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2666 self.txn_wal_shard
2667 .values()
2668 .iter()
2669 .next()
2670 .map(|TxnWalShardValue { shard }| *shard)
2671 }
2672
2673 fn write_txn_wal_shard(
2674 &mut self,
2675 shard: ShardId,
2676 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2677 self.txn_wal_shard
2678 .insert((), TxnWalShardValue { shard }, self.op_id)
2679 .map_err(|err| match err {
2680 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2681 err => StorageError::Generic(anyhow::anyhow!(err)),
2682 })
2683 }
2684}
2685
2686#[derive(Debug, Clone, Default, PartialEq)]
2688pub struct TransactionBatch {
2689 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2690 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2691 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2692 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2693 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2694 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2695 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2696 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2697 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2698 pub(crate) introspection_sources: Vec<(
2699 proto::ClusterIntrospectionSourceIndexKey,
2700 proto::ClusterIntrospectionSourceIndexValue,
2701 Diff,
2702 )>,
2703 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2704 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2705 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2706 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2707 pub(crate) system_configurations: Vec<(
2708 proto::ServerConfigurationKey,
2709 proto::ServerConfigurationValue,
2710 Diff,
2711 )>,
2712 pub(crate) default_privileges: Vec<(
2713 proto::DefaultPrivilegesKey,
2714 proto::DefaultPrivilegesValue,
2715 Diff,
2716 )>,
2717 pub(crate) source_references: Vec<(
2718 proto::SourceReferencesKey,
2719 proto::SourceReferencesValue,
2720 Diff,
2721 )>,
2722 pub(crate) system_privileges: Vec<(
2723 proto::SystemPrivilegesKey,
2724 proto::SystemPrivilegesValue,
2725 Diff,
2726 )>,
2727 pub(crate) storage_collection_metadata: Vec<(
2728 proto::StorageCollectionMetadataKey,
2729 proto::StorageCollectionMetadataValue,
2730 Diff,
2731 )>,
2732 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2733 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2734 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2735 pub(crate) upper: mz_repr::Timestamp,
2737}
2738
2739impl TransactionBatch {
2740 pub fn is_empty(&self) -> bool {
2741 let TransactionBatch {
2742 databases,
2743 schemas,
2744 items,
2745 comments,
2746 roles,
2747 role_auth,
2748 clusters,
2749 cluster_replicas,
2750 network_policies,
2751 introspection_sources,
2752 id_allocator,
2753 configs,
2754 settings,
2755 source_references,
2756 system_gid_mapping,
2757 system_configurations,
2758 default_privileges,
2759 system_privileges,
2760 storage_collection_metadata,
2761 unfinalized_shards,
2762 txn_wal_shard,
2763 audit_log_updates,
2764 upper: _,
2765 } = self;
2766 databases.is_empty()
2767 && schemas.is_empty()
2768 && items.is_empty()
2769 && comments.is_empty()
2770 && roles.is_empty()
2771 && role_auth.is_empty()
2772 && clusters.is_empty()
2773 && cluster_replicas.is_empty()
2774 && network_policies.is_empty()
2775 && introspection_sources.is_empty()
2776 && id_allocator.is_empty()
2777 && configs.is_empty()
2778 && settings.is_empty()
2779 && source_references.is_empty()
2780 && system_gid_mapping.is_empty()
2781 && system_configurations.is_empty()
2782 && default_privileges.is_empty()
2783 && system_privileges.is_empty()
2784 && storage_collection_metadata.is_empty()
2785 && unfinalized_shards.is_empty()
2786 && txn_wal_shard.is_empty()
2787 && audit_log_updates.is_empty()
2788 }
2789}
2790
2791#[derive(Debug, Clone, PartialEq, Eq)]
2792struct TransactionUpdate<V> {
2793 value: V,
2794 ts: Timestamp,
2795 diff: Diff,
2796}
2797
2798trait UniqueName {
2800 const HAS_UNIQUE_NAME: bool;
2803 fn unique_name(&self) -> &str;
2805}
2806
2807mod unique_name {
2808 use crate::durable::objects::*;
2809
2810 macro_rules! impl_unique_name {
2811 ($($t:ty),* $(,)?) => {
2812 $(
2813 impl crate::durable::transaction::UniqueName for $t {
2814 const HAS_UNIQUE_NAME: bool = true;
2815 fn unique_name(&self) -> &str {
2816 &self.name
2817 }
2818 }
2819 )*
2820 };
2821 }
2822
2823 macro_rules! impl_no_unique_name {
2824 ($($t:ty),* $(,)?) => {
2825 $(
2826 impl crate::durable::transaction::UniqueName for $t {
2827 const HAS_UNIQUE_NAME: bool = false;
2828 fn unique_name(&self) -> &str {
2829 ""
2830 }
2831 }
2832 )*
2833 };
2834 }
2835
2836 impl_unique_name! {
2837 ClusterReplicaValue,
2838 ClusterValue,
2839 DatabaseValue,
2840 ItemValue,
2841 NetworkPolicyValue,
2842 RoleValue,
2843 SchemaValue,
2844 }
2845
2846 impl_no_unique_name!(
2847 (),
2848 ClusterIntrospectionSourceIndexValue,
2849 CommentValue,
2850 ConfigValue,
2851 DefaultPrivilegesValue,
2852 GidMappingValue,
2853 IdAllocValue,
2854 ServerConfigurationValue,
2855 SettingValue,
2856 SourceReferencesValue,
2857 StorageCollectionMetadataValue,
2858 SystemPrivilegesValue,
2859 TxnWalShardValue,
2860 RoleAuthValue,
2861 );
2862
2863 #[cfg(test)]
2864 mod test {
2865 impl_no_unique_name!(String,);
2866 }
2867}
2868
2869#[derive(Debug)]
2879struct TableTransaction<K, V> {
2880 initial: BTreeMap<K, V>,
2881 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2884 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2885}
2886
2887impl<K, V> TableTransaction<K, V>
2888where
2889 K: Ord + Eq + Clone + Debug,
2890 V: Ord + Clone + Debug + UniqueName,
2891{
2892 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2899 where
2900 K: RustType<KP>,
2901 V: RustType<VP>,
2902 {
2903 let initial = initial
2904 .into_iter()
2905 .map(RustType::from_proto)
2906 .collect::<Result<_, _>>()?;
2907
2908 Ok(Self {
2909 initial,
2910 pending: BTreeMap::new(),
2911 uniqueness_violation: None,
2912 })
2913 }
2914
2915 fn new_with_uniqueness_fn<KP, VP>(
2918 initial: BTreeMap<KP, VP>,
2919 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2920 ) -> Result<Self, TryFromProtoError>
2921 where
2922 K: RustType<KP>,
2923 V: RustType<VP>,
2924 {
2925 let initial = initial
2926 .into_iter()
2927 .map(RustType::from_proto)
2928 .collect::<Result<_, _>>()?;
2929
2930 Ok(Self {
2931 initial,
2932 pending: BTreeMap::new(),
2933 uniqueness_violation: Some(uniqueness_violation),
2934 })
2935 }
2936
2937 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2940 where
2941 K: RustType<KP>,
2942 V: RustType<VP>,
2943 {
2944 soft_assert_no_log!(self.verify().is_ok());
2945 self.pending
2948 .into_iter()
2949 .flat_map(|(k, v)| {
2950 let mut v: Vec<_> = v
2951 .into_iter()
2952 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2953 .collect();
2954 differential_dataflow::consolidation::consolidate(&mut v);
2955 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2956 })
2957 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2958 .collect()
2959 }
2960
2961 fn verify(&self) -> Result<(), DurableCatalogError> {
2966 if let Some(uniqueness_violation) = self.uniqueness_violation {
2967 let items = self.values();
2969 if V::HAS_UNIQUE_NAME {
2970 let by_name: BTreeMap<_, _> = items
2971 .iter()
2972 .enumerate()
2973 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2974 .collect();
2975 for (i, vi) in items.iter().enumerate() {
2976 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
2977 if i != *j && uniqueness_violation(vi, *vj) {
2978 return Err(DurableCatalogError::UniquenessViolation);
2979 }
2980 }
2981 }
2982 } else {
2983 for (i, vi) in items.iter().enumerate() {
2984 for (j, vj) in items.iter().enumerate() {
2985 if i != j && uniqueness_violation(vi, vj) {
2986 return Err(DurableCatalogError::UniquenessViolation);
2987 }
2988 }
2989 }
2990 }
2991 }
2992 soft_assert_no_log!(
2993 self.pending
2994 .values()
2995 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
2996 "pending should be sorted by timestamp: {:?}",
2997 self.pending
2998 );
2999 Ok(())
3000 }
3001
3002 fn verify_keys<'a>(
3007 &self,
3008 keys: impl IntoIterator<Item = &'a K>,
3009 ) -> Result<(), DurableCatalogError>
3010 where
3011 K: 'a,
3012 {
3013 if let Some(uniqueness_violation) = self.uniqueness_violation {
3014 let entries: Vec<_> = keys
3015 .into_iter()
3016 .filter_map(|key| self.get(key).map(|value| (key, value)))
3017 .collect();
3018 for (ki, vi) in self.items() {
3020 for (kj, vj) in &entries {
3021 if ki != *kj && uniqueness_violation(vi, vj) {
3022 return Err(DurableCatalogError::UniquenessViolation);
3023 }
3024 }
3025 }
3026 }
3027 soft_assert_no_log!(self.verify().is_ok());
3028 Ok(())
3029 }
3030
3031 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3034 let mut seen = BTreeSet::new();
3035 for k in self.pending.keys() {
3036 seen.insert(k);
3037 let v = self.get(k);
3038 if let Some(v) = v {
3041 f(k, v);
3042 }
3043 }
3044 for (k, v) in self.initial.iter() {
3045 if !seen.contains(k) {
3047 f(k, v);
3048 }
3049 }
3050 }
3051
3052 fn get(&self, k: &K) -> Option<&V> {
3054 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3055 let mut updates = Vec::with_capacity(pending.len() + 1);
3056 if let Some(initial) = self.initial.get(k) {
3057 updates.push((initial, Diff::ONE));
3058 }
3059 updates.extend(
3060 pending
3061 .into_iter()
3062 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3063 );
3064
3065 differential_dataflow::consolidation::consolidate(&mut updates);
3066 assert!(updates.len() <= 1);
3067 updates.into_iter().next().map(|(v, _)| v)
3068 }
3069
3070 #[cfg(test)]
3075 fn items_cloned(&self) -> BTreeMap<K, V> {
3076 let mut items = BTreeMap::new();
3077 self.for_values(|k, v| {
3078 items.insert(k.clone(), v.clone());
3079 });
3080 items
3081 }
3082
3083 fn items(&self) -> BTreeMap<&K, &V> {
3086 let mut items = BTreeMap::new();
3087 self.for_values(|k, v| {
3088 items.insert(k, v);
3089 });
3090 items
3091 }
3092
3093 fn values(&self) -> BTreeSet<&V> {
3095 let mut items = BTreeSet::new();
3096 self.for_values(|_, v| {
3097 items.insert(v);
3098 });
3099 items
3100 }
3101
3102 fn len(&self) -> usize {
3104 let mut count = 0;
3105 self.for_values(|_, _| {
3106 count += 1;
3107 });
3108 count
3109 }
3110
3111 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3115 &mut self,
3116 mut f: F,
3117 ) {
3118 let mut pending = BTreeMap::new();
3119 self.for_values(|k, v| f(&mut pending, k, v));
3120 for (k, updates) in pending {
3121 self.pending.entry(k).or_default().extend(updates);
3122 }
3123 }
3124
3125 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3129 let mut violation = None;
3130 self.for_values(|for_k, for_v| {
3131 if &k == for_k {
3132 violation = Some(DurableCatalogError::DuplicateKey);
3133 }
3134 if let Some(uniqueness_violation) = self.uniqueness_violation {
3135 if uniqueness_violation(for_v, &v) {
3136 violation = Some(DurableCatalogError::UniquenessViolation);
3137 }
3138 }
3139 });
3140 if let Some(violation) = violation {
3141 return Err(violation);
3142 }
3143 self.pending.entry(k).or_default().push(TransactionUpdate {
3144 value: v,
3145 ts,
3146 diff: Diff::ONE,
3147 });
3148 soft_assert_no_log!(self.verify().is_ok());
3149 Ok(())
3150 }
3151
3152 fn update<F: Fn(&K, &V) -> Option<V>>(
3161 &mut self,
3162 f: F,
3163 ts: Timestamp,
3164 ) -> Result<Diff, DurableCatalogError> {
3165 let mut changed = Diff::ZERO;
3166 let mut keys = BTreeSet::new();
3167 let pending = self.pending.clone();
3169 self.for_values_mut(|p, k, v| {
3170 if let Some(next) = f(k, v) {
3171 changed += Diff::ONE;
3172 keys.insert(k.clone());
3173 let updates = p.entry(k.clone()).or_default();
3174 updates.push(TransactionUpdate {
3175 value: v.clone(),
3176 ts,
3177 diff: Diff::MINUS_ONE,
3178 });
3179 updates.push(TransactionUpdate {
3180 value: next,
3181 ts,
3182 diff: Diff::ONE,
3183 });
3184 }
3185 });
3186 if let Err(err) = self.verify_keys(&keys) {
3188 self.pending = pending;
3189 Err(err)
3190 } else {
3191 Ok(changed)
3192 }
3193 }
3194
3195 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3200 if let Some(cur_v) = self.get(&k) {
3201 if v != *cur_v {
3202 self.set(k, Some(v), ts)?;
3203 }
3204 Ok(true)
3205 } else {
3206 Ok(false)
3207 }
3208 }
3209
3210 fn update_by_keys(
3215 &mut self,
3216 kvs: impl IntoIterator<Item = (K, V)>,
3217 ts: Timestamp,
3218 ) -> Result<Diff, DurableCatalogError> {
3219 let kvs: Vec<_> = kvs
3220 .into_iter()
3221 .filter_map(|(k, v)| match self.get(&k) {
3222 Some(cur_v) => Some((*cur_v == v, k, v)),
3224 None => None,
3225 })
3226 .collect();
3227 let changed = kvs.len();
3228 let changed =
3229 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3230 let kvs = kvs
3231 .into_iter()
3232 .filter(|(no_op, _, _)| !no_op)
3234 .map(|(_, k, v)| (k, Some(v)))
3235 .collect();
3236 self.set_many(kvs, ts)?;
3237 Ok(changed)
3238 }
3239
3240 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3247 let prev = self.get(&k).cloned();
3248 let entry = self.pending.entry(k.clone()).or_default();
3249 let restore_len = entry.len();
3250
3251 match (v, prev.clone()) {
3252 (Some(v), Some(prev)) => {
3253 entry.push(TransactionUpdate {
3254 value: prev,
3255 ts,
3256 diff: Diff::MINUS_ONE,
3257 });
3258 entry.push(TransactionUpdate {
3259 value: v,
3260 ts,
3261 diff: Diff::ONE,
3262 });
3263 }
3264 (Some(v), None) => {
3265 entry.push(TransactionUpdate {
3266 value: v,
3267 ts,
3268 diff: Diff::ONE,
3269 });
3270 }
3271 (None, Some(prev)) => {
3272 entry.push(TransactionUpdate {
3273 value: prev,
3274 ts,
3275 diff: Diff::MINUS_ONE,
3276 });
3277 }
3278 (None, None) => {}
3279 }
3280
3281 if let Err(err) = self.verify_keys([&k]) {
3283 let pending = self.pending.get_mut(&k).expect("inserted above");
3286 pending.truncate(restore_len);
3287 Err(err)
3288 } else {
3289 Ok(prev)
3290 }
3291 }
3292
3293 fn set_many(
3298 &mut self,
3299 kvs: BTreeMap<K, Option<V>>,
3300 ts: Timestamp,
3301 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3302 if kvs.is_empty() {
3303 return Ok(BTreeMap::new());
3304 }
3305
3306 let mut prevs = BTreeMap::new();
3307 let mut restores = BTreeMap::new();
3308
3309 for (k, v) in kvs {
3310 let prev = self.get(&k).cloned();
3311 let entry = self.pending.entry(k.clone()).or_default();
3312 restores.insert(k.clone(), entry.len());
3313
3314 match (v, prev.clone()) {
3315 (Some(v), Some(prev)) => {
3316 entry.push(TransactionUpdate {
3317 value: prev,
3318 ts,
3319 diff: Diff::MINUS_ONE,
3320 });
3321 entry.push(TransactionUpdate {
3322 value: v,
3323 ts,
3324 diff: Diff::ONE,
3325 });
3326 }
3327 (Some(v), None) => {
3328 entry.push(TransactionUpdate {
3329 value: v,
3330 ts,
3331 diff: Diff::ONE,
3332 });
3333 }
3334 (None, Some(prev)) => {
3335 entry.push(TransactionUpdate {
3336 value: prev,
3337 ts,
3338 diff: Diff::MINUS_ONE,
3339 });
3340 }
3341 (None, None) => {}
3342 }
3343
3344 prevs.insert(k, prev);
3345 }
3346
3347 if let Err(err) = self.verify_keys(prevs.keys()) {
3349 for (k, restore_len) in restores {
3350 let pending = self.pending.get_mut(&k).expect("inserted above");
3353 pending.truncate(restore_len);
3354 }
3355 Err(err)
3356 } else {
3357 Ok(prevs)
3358 }
3359 }
3360
3361 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3367 let mut deleted = Vec::new();
3368 self.for_values_mut(|p, k, v| {
3369 if f(k, v) {
3370 deleted.push((k.clone(), v.clone()));
3371 p.entry(k.clone()).or_default().push(TransactionUpdate {
3372 value: v.clone(),
3373 ts,
3374 diff: Diff::MINUS_ONE,
3375 });
3376 }
3377 });
3378 soft_assert_no_log!(self.verify().is_ok());
3379 deleted
3380 }
3381
3382 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3386 self.set(k, None, ts)
3387 .expect("deleting an entry cannot violate uniqueness")
3388 }
3389
3390 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3394 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3395 let prevs = self
3396 .set_many(kvs, ts)
3397 .expect("deleting entries cannot violate uniqueness");
3398 prevs
3399 .into_iter()
3400 .filter_map(|(k, v)| v.map(|v| (k, v)))
3401 .collect()
3402 }
3403}
3404
3405#[cfg(test)]
3406#[allow(clippy::unwrap_used)]
3407mod tests {
3408 use super::*;
3409 use mz_ore::{assert_none, assert_ok};
3410
3411 use mz_ore::now::SYSTEM_TIME;
3412 use mz_persist_client::PersistClient;
3413
3414 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3415 use crate::memory;
3416
3417 #[mz_ore::test]
3418 fn test_table_transaction_simple() {
3419 fn uniqueness_violation(a: &String, b: &String) -> bool {
3420 a == b
3421 }
3422 let mut table = TableTransaction::new_with_uniqueness_fn(
3423 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3424 uniqueness_violation,
3425 )
3426 .unwrap();
3427
3428 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3431 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3432 assert!(
3433 table
3434 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3435 .is_err()
3436 );
3437 assert!(
3438 table
3439 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3440 .is_err()
3441 );
3442 }
3443
3444 #[mz_ore::test]
3445 fn test_table_transaction() {
3446 fn uniqueness_violation(a: &String, b: &String) -> bool {
3447 a == b
3448 }
3449 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3450
3451 fn commit(
3452 table: &mut BTreeMap<Vec<u8>, String>,
3453 mut pending: Vec<(Vec<u8>, String, Diff)>,
3454 ) {
3455 pending.sort_by(|a, b| a.2.cmp(&b.2));
3457 for (k, v, diff) in pending {
3458 if diff == Diff::MINUS_ONE {
3459 let prev = table.remove(&k);
3460 assert_eq!(prev, Some(v));
3461 } else if diff == Diff::ONE {
3462 let prev = table.insert(k, v);
3463 assert_eq!(prev, None);
3464 } else {
3465 panic!("unexpected diff: {diff}");
3466 }
3467 }
3468 }
3469
3470 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3471 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3472 let mut table_txn =
3473 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3474 assert_eq!(table_txn.items_cloned(), table);
3475 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3476 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3477 assert_eq!(
3478 table_txn.items_cloned(),
3479 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3480 );
3481 assert_eq!(
3482 table_txn
3483 .update(|_k, _v| Some("v3".to_string()), 2)
3484 .unwrap(),
3485 Diff::ONE
3486 );
3487
3488 table_txn
3490 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3491 .unwrap_err();
3492
3493 table_txn
3494 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3495 .unwrap();
3496 assert_eq!(
3497 table_txn.items_cloned(),
3498 BTreeMap::from([
3499 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3500 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3501 ])
3502 );
3503 let err = table_txn
3504 .update(|_k, _v| Some("v1".to_string()), 5)
3505 .unwrap_err();
3506 assert!(
3507 matches!(err, DurableCatalogError::UniquenessViolation),
3508 "unexpected err: {err:?}"
3509 );
3510 let pending = table_txn.pending();
3511 assert_eq!(
3512 pending,
3513 vec![
3514 (
3515 1i64.to_le_bytes().to_vec(),
3516 "v1".to_string(),
3517 Diff::MINUS_ONE
3518 ),
3519 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3520 (
3521 2i64.to_le_bytes().to_vec(),
3522 "v2".to_string(),
3523 Diff::MINUS_ONE
3524 ),
3525 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3526 ]
3527 );
3528 commit(&mut table, pending);
3529 assert_eq!(
3530 table,
3531 BTreeMap::from([
3532 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3533 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3534 ])
3535 );
3536
3537 let mut table_txn =
3538 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3539 assert_eq!(
3541 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3542 1
3543 );
3544 table_txn
3545 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3546 .unwrap();
3547 table_txn
3549 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3550 .unwrap_err();
3551 table_txn
3553 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3554 .unwrap_err();
3555 assert_eq!(
3556 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3557 1
3558 );
3559 table_txn
3561 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3562 .unwrap();
3563 table_txn
3564 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3565 .unwrap();
3566 let pending = table_txn.pending();
3567 assert_eq!(
3568 pending,
3569 vec![
3570 (
3571 1i64.to_le_bytes().to_vec(),
3572 "v3".to_string(),
3573 Diff::MINUS_ONE
3574 ),
3575 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3576 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3577 ]
3578 );
3579 commit(&mut table, pending);
3580 assert_eq!(
3581 table,
3582 BTreeMap::from([
3583 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3584 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3585 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3586 ])
3587 );
3588
3589 let mut table_txn =
3590 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3591 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3592 table_txn
3593 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3594 .unwrap();
3595
3596 commit(&mut table, table_txn.pending());
3597 assert_eq!(
3598 table,
3599 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3600 );
3601
3602 let mut table_txn =
3603 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3604 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3605 table_txn
3606 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3607 .unwrap();
3608 commit(&mut table, table_txn.pending());
3609 assert_eq!(
3610 table,
3611 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3612 );
3613
3614 let mut table_txn =
3616 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3617 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3618 table_txn
3619 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3620 .unwrap();
3621 table_txn
3622 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3623 .unwrap_err();
3624 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3625 table_txn
3626 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3627 .unwrap();
3628 commit(&mut table, table_txn.pending());
3629 assert_eq!(
3630 table.clone().into_iter().collect::<Vec<_>>(),
3631 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3632 );
3633
3634 let mut table_txn =
3636 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3637 table_txn
3639 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3640 .unwrap_err();
3641 table_txn
3642 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3643 .unwrap();
3644 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3645 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3646 let pending = table_txn.pending();
3647 assert_eq!(
3648 pending,
3649 vec![
3650 (
3651 1i64.to_le_bytes().to_vec(),
3652 "v5".to_string(),
3653 Diff::MINUS_ONE
3654 ),
3655 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3656 ]
3657 );
3658 commit(&mut table, pending);
3659 assert_eq!(
3660 table,
3661 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3662 );
3663
3664 let mut table_txn =
3666 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3667 table_txn
3668 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3669 .unwrap();
3670 let pending = table_txn.pending::<Vec<u8>, String>();
3671 assert!(pending.is_empty());
3672
3673 let mut table_txn =
3675 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3676 table_txn
3678 .set_many(
3679 BTreeMap::from([
3680 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3681 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3682 ]),
3683 0,
3684 )
3685 .unwrap_err();
3686 table_txn
3687 .set_many(
3688 BTreeMap::from([
3689 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3690 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3691 ]),
3692 1,
3693 )
3694 .unwrap();
3695 table_txn
3696 .set_many(
3697 BTreeMap::from([
3698 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3699 (3i64.to_le_bytes().to_vec(), None),
3700 ]),
3701 2,
3702 )
3703 .unwrap();
3704 let pending = table_txn.pending();
3705 assert_eq!(
3706 pending,
3707 vec![
3708 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3709 (
3710 3i64.to_le_bytes().to_vec(),
3711 "v6".to_string(),
3712 Diff::MINUS_ONE
3713 ),
3714 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3715 ]
3716 );
3717 commit(&mut table, pending);
3718 assert_eq!(
3719 table,
3720 BTreeMap::from([
3721 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3722 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3723 ])
3724 );
3725
3726 let mut table_txn =
3728 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3729 table_txn
3730 .set_many(
3731 BTreeMap::from([
3732 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3733 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3734 ]),
3735 0,
3736 )
3737 .unwrap();
3738 let pending = table_txn.pending::<Vec<u8>, String>();
3739 assert!(pending.is_empty());
3740 commit(&mut table, pending);
3741 assert_eq!(
3742 table,
3743 BTreeMap::from([
3744 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3745 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3746 ])
3747 );
3748
3749 let mut table_txn =
3751 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3752 table_txn
3754 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3755 .unwrap_err();
3756 assert!(
3757 table_txn
3758 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3759 .unwrap()
3760 );
3761 assert!(
3762 !table_txn
3763 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3764 .unwrap()
3765 );
3766 let pending = table_txn.pending();
3767 assert_eq!(
3768 pending,
3769 vec![
3770 (
3771 1i64.to_le_bytes().to_vec(),
3772 "v6".to_string(),
3773 Diff::MINUS_ONE
3774 ),
3775 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3776 ]
3777 );
3778 commit(&mut table, pending);
3779 assert_eq!(
3780 table,
3781 BTreeMap::from([
3782 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3783 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3784 ])
3785 );
3786
3787 let mut table_txn =
3789 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3790 assert!(
3791 table_txn
3792 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3793 .unwrap()
3794 );
3795 let pending = table_txn.pending::<Vec<u8>, String>();
3796 assert!(pending.is_empty());
3797 commit(&mut table, pending);
3798 assert_eq!(
3799 table,
3800 BTreeMap::from([
3801 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3802 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3803 ])
3804 );
3805
3806 let mut table_txn =
3808 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3809 table_txn
3811 .update_by_keys(
3812 [
3813 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3814 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3815 ],
3816 0,
3817 )
3818 .unwrap_err();
3819 let n = table_txn
3820 .update_by_keys(
3821 [
3822 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3823 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3824 ],
3825 1,
3826 )
3827 .unwrap();
3828 assert_eq!(n, Diff::ONE);
3829 let n = table_txn
3830 .update_by_keys(
3831 [
3832 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3833 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3834 ],
3835 2,
3836 )
3837 .unwrap();
3838 assert_eq!(n, Diff::ZERO);
3839 let pending = table_txn.pending();
3840 assert_eq!(
3841 pending,
3842 vec![
3843 (
3844 1i64.to_le_bytes().to_vec(),
3845 "v8".to_string(),
3846 Diff::MINUS_ONE
3847 ),
3848 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3849 ]
3850 );
3851 commit(&mut table, pending);
3852 assert_eq!(
3853 table,
3854 BTreeMap::from([
3855 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3856 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3857 ])
3858 );
3859
3860 let mut table_txn =
3862 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3863 let n = table_txn
3864 .update_by_keys(
3865 [
3866 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3867 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3868 ],
3869 0,
3870 )
3871 .unwrap();
3872 assert_eq!(n, Diff::from(2));
3873 let pending = table_txn.pending::<Vec<u8>, String>();
3874 assert!(pending.is_empty());
3875 commit(&mut table, pending);
3876 assert_eq!(
3877 table,
3878 BTreeMap::from([
3879 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3880 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3881 ])
3882 );
3883
3884 let mut table_txn =
3886 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3887 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3888 assert_eq!(prev, Some("v9".to_string()));
3889 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3890 assert_none!(prev);
3891 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3892 assert_none!(prev);
3893 let pending = table_txn.pending();
3894 assert_eq!(
3895 pending,
3896 vec![(
3897 1i64.to_le_bytes().to_vec(),
3898 "v9".to_string(),
3899 Diff::MINUS_ONE
3900 ),]
3901 );
3902 commit(&mut table, pending);
3903 assert_eq!(
3904 table,
3905 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3906 );
3907
3908 let mut table_txn =
3910 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3911 let prevs = table_txn.delete_by_keys(
3912 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3913 0,
3914 );
3915 assert_eq!(
3916 prevs,
3917 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3918 );
3919 let prevs = table_txn.delete_by_keys(
3920 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3921 1,
3922 );
3923 assert_eq!(prevs, vec![]);
3924 let prevs = table_txn.delete_by_keys(
3925 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3926 2,
3927 );
3928 assert_eq!(prevs, vec![]);
3929 let pending = table_txn.pending();
3930 assert_eq!(
3931 pending,
3932 vec![(
3933 42i64.to_le_bytes().to_vec(),
3934 "v7".to_string(),
3935 Diff::MINUS_ONE
3936 ),]
3937 );
3938 commit(&mut table, pending);
3939 assert_eq!(table, BTreeMap::new());
3940 }
3941
3942 #[mz_ore::test(tokio::test)]
3943 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
3945 let persist_client = PersistClient::new_for_tests().await;
3946 let state_builder =
3947 TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
3948
3949 let _ = state_builder
3951 .clone()
3952 .unwrap_build()
3953 .await
3954 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3955 .await
3956 .unwrap()
3957 .0;
3958 let mut savepoint_state = state_builder
3959 .unwrap_build()
3960 .await
3961 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3962 .await
3963 .unwrap()
3964 .0;
3965
3966 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3967 assert!(!initial_snapshot.is_empty());
3968
3969 let db_name = "db";
3970 let db_owner = RoleId::User(42);
3971 let db_privileges = Vec::new();
3972 let mut txn = savepoint_state.transaction().await.unwrap();
3973 let (db_id, db_oid) = txn
3974 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
3975 .unwrap();
3976 let commit_ts = txn.upper();
3977 txn.commit_internal(commit_ts).await.unwrap();
3978 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
3979 let update = updates.into_element();
3980
3981 assert_eq!(update.diff, StateDiff::Addition);
3982
3983 let db = match update.kind {
3984 memory::objects::StateUpdateKind::Database(db) => db,
3985 update => panic!("unexpected update: {update:?}"),
3986 };
3987
3988 assert_eq!(db_id, db.id);
3989 assert_eq!(db_oid, db.oid);
3990 assert_eq!(db_name, db.name);
3991 assert_eq!(db_owner, db.owner_id);
3992 assert_eq!(db_privileges, db.privileges);
3993 }
3994
3995 #[mz_ore::test]
3996 fn test_allocate_introspection_source_index_id() {
3997 let cluster_variant: u8 = 0b0000_0001;
3998 let cluster_id_inner: u64 =
3999 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4000 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4001
4002 let cluster_id = ClusterId::System(cluster_id_inner);
4003 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4004
4005 let introspection_source_index_id: u64 =
4006 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4007
4008 {
4010 let mut cluster_variant_mask = 0xFF << 56;
4011 cluster_variant_mask &= introspection_source_index_id;
4012 cluster_variant_mask >>= 56;
4013 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4014 }
4015
4016 {
4018 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4019 cluster_id_inner_mask &= introspection_source_index_id;
4020 cluster_id_inner_mask >>= 8;
4021 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4022 }
4023
4024 {
4026 let mut log_variant_mask = 0xFF;
4027 log_variant_mask &= introspection_source_index_id;
4028 assert_eq!(
4029 log_variant_mask,
4030 u64::from(timely_messages_received_log_variant)
4031 );
4032 }
4033
4034 let (catalog_item_id, global_id) =
4035 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4036
4037 assert_eq!(
4038 catalog_item_id,
4039 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4040 );
4041 assert_eq!(
4042 global_id,
4043 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4044 );
4045 }
4046}