1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::time::Duration;
13
14use anyhow::anyhow;
15use derivative::Derivative;
16use itertools::Itertools;
17use mz_audit_log::VersionedEvent;
18use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
19use mz_controller_types::{ClusterId, ReplicaId};
20use mz_ore::cast::{u64_to_usize, usize_to_u64};
21use mz_ore::collections::{CollectionExt, HashSet};
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::vec::VecExt;
24use mz_ore::{soft_assert_no_log, soft_assert_or_log};
25use mz_persist_types::ShardId;
26use mz_pgrepr::oid::FIRST_USER_OID;
27use mz_proto::{RustType, TryFromProtoError};
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::role_id::RoleId;
31use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
32use mz_sql::catalog::{
33 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, RoleAttributes, RoleMembership,
34 RoleVars,
35};
36use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
37use mz_sql::plan::NetworkPolicyRule;
38use mz_sql_parser::ast::QualifiedReplica;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::controller::StorageError;
41
42use crate::builtin::BuiltinLog;
43use crate::durable::initialize::{
44 ENABLE_0DT_DEPLOYMENT, ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
45 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
46};
47use crate::durable::objects::serialization::proto;
48use crate::durable::objects::{
49 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
50 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
51 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
52 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
53 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
54 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
55 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
56 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
57 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
58 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
59 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
60};
61use crate::durable::{
62 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
63 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
64 EXPRESSION_CACHE_SHARD_KEY, NetworkPolicy, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY,
65 STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
66 SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
67 USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
68};
69use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
70
71type Timestamp = u64;
72
73#[derive(Derivative)]
76#[derivative(Debug, PartialEq)]
77pub struct Transaction<'a> {
78 #[derivative(Debug = "ignore")]
79 #[derivative(PartialEq = "ignore")]
80 durable_catalog: &'a mut dyn DurableCatalogState,
81 databases: TableTransaction<DatabaseKey, DatabaseValue>,
82 schemas: TableTransaction<SchemaKey, SchemaValue>,
83 items: TableTransaction<ItemKey, ItemValue>,
84 comments: TableTransaction<CommentKey, CommentValue>,
85 roles: TableTransaction<RoleKey, RoleValue>,
86 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
87 clusters: TableTransaction<ClusterKey, ClusterValue>,
88 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
89 introspection_sources:
90 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
91 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
92 configs: TableTransaction<ConfigKey, ConfigValue>,
93 settings: TableTransaction<SettingKey, SettingValue>,
94 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
95 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
96 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
97 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
98 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
99 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
100 storage_collection_metadata:
101 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
102 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
103 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
104 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
107 upper: mz_repr::Timestamp,
109 op_id: Timestamp,
111}
112
113impl<'a> Transaction<'a> {
114 pub fn new(
115 durable_catalog: &'a mut dyn DurableCatalogState,
116 Snapshot {
117 databases,
118 schemas,
119 roles,
120 role_auth,
121 items,
122 comments,
123 clusters,
124 network_policies,
125 cluster_replicas,
126 introspection_sources,
127 id_allocator,
128 configs,
129 settings,
130 source_references,
131 system_object_mappings,
132 system_configurations,
133 default_privileges,
134 system_privileges,
135 storage_collection_metadata,
136 unfinalized_shards,
137 txn_wal_shard,
138 }: Snapshot,
139 upper: mz_repr::Timestamp,
140 ) -> Result<Transaction<'a>, CatalogError> {
141 Ok(Transaction {
142 durable_catalog,
143 databases: TableTransaction::new_with_uniqueness_fn(
144 databases,
145 |a: &DatabaseValue, b| a.name == b.name,
146 )?,
147 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
148 a.database_id == b.database_id && a.name == b.name
149 })?,
150 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
151 a.schema_id == b.schema_id && a.name == b.name && {
152 let a_type = a.item_type();
154 let b_type = b.item_type();
155 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
156 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
157 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
158 }
159 })?,
160 comments: TableTransaction::new(comments)?,
161 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
162 a.name == b.name
163 })?,
164 role_auth: TableTransaction::new(role_auth)?,
165 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
166 a.name == b.name
167 })?,
168 network_policies: TableTransaction::new_with_uniqueness_fn(
169 network_policies,
170 |a: &NetworkPolicyValue, b| a.name == b.name,
171 )?,
172 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
173 cluster_replicas,
174 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
175 )?,
176 introspection_sources: TableTransaction::new(introspection_sources)?,
177 id_allocator: TableTransaction::new(id_allocator)?,
178 configs: TableTransaction::new(configs)?,
179 settings: TableTransaction::new(settings)?,
180 source_references: TableTransaction::new(source_references)?,
181 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
182 system_configurations: TableTransaction::new(system_configurations)?,
183 default_privileges: TableTransaction::new(default_privileges)?,
184 system_privileges: TableTransaction::new(system_privileges)?,
185 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
186 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
187 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
191 audit_log_updates: Vec::new(),
192 upper,
193 op_id: 0,
194 })
195 }
196
197 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
198 let key = ItemKey { id: *id };
199 self.items
200 .get(&key)
201 .map(|v| DurableType::from_key_value(key, v.clone()))
202 }
203
204 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
205 self.items
206 .items()
207 .into_iter()
208 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
209 .sorted_by_key(|Item { id, .. }| *id)
210 }
211
212 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
213 self.insert_audit_log_events([event]);
214 }
215
216 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
217 let events = events
218 .into_iter()
219 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
220 self.audit_log_updates.extend(events);
221 }
222
223 pub fn insert_user_database(
224 &mut self,
225 database_name: &str,
226 owner_id: RoleId,
227 privileges: Vec<MzAclItem>,
228 temporary_oids: &HashSet<u32>,
229 ) -> Result<(DatabaseId, u32), CatalogError> {
230 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
231 let id = DatabaseId::User(id);
232 let oid = self.allocate_oid(temporary_oids)?;
233 self.insert_database(id, database_name, owner_id, privileges, oid)?;
234 Ok((id, oid))
235 }
236
237 pub(crate) fn insert_database(
238 &mut self,
239 id: DatabaseId,
240 database_name: &str,
241 owner_id: RoleId,
242 privileges: Vec<MzAclItem>,
243 oid: u32,
244 ) -> Result<u32, CatalogError> {
245 match self.databases.insert(
246 DatabaseKey { id },
247 DatabaseValue {
248 name: database_name.to_string(),
249 owner_id,
250 privileges,
251 oid,
252 },
253 self.op_id,
254 ) {
255 Ok(_) => Ok(oid),
256 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
257 }
258 }
259
260 pub fn insert_user_schema(
261 &mut self,
262 database_id: DatabaseId,
263 schema_name: &str,
264 owner_id: RoleId,
265 privileges: Vec<MzAclItem>,
266 temporary_oids: &HashSet<u32>,
267 ) -> Result<(SchemaId, u32), CatalogError> {
268 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
269 let id = SchemaId::User(id);
270 let oid = self.allocate_oid(temporary_oids)?;
271 self.insert_schema(
272 id,
273 Some(database_id),
274 schema_name.to_string(),
275 owner_id,
276 privileges,
277 oid,
278 )?;
279 Ok((id, oid))
280 }
281
282 pub fn insert_system_schema(
283 &mut self,
284 schema_id: u64,
285 schema_name: &str,
286 owner_id: RoleId,
287 privileges: Vec<MzAclItem>,
288 oid: u32,
289 ) -> Result<(), CatalogError> {
290 let id = SchemaId::System(schema_id);
291 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
292 }
293
294 pub(crate) fn insert_schema(
295 &mut self,
296 schema_id: SchemaId,
297 database_id: Option<DatabaseId>,
298 schema_name: String,
299 owner_id: RoleId,
300 privileges: Vec<MzAclItem>,
301 oid: u32,
302 ) -> Result<(), CatalogError> {
303 match self.schemas.insert(
304 SchemaKey { id: schema_id },
305 SchemaValue {
306 database_id,
307 name: schema_name.clone(),
308 owner_id,
309 privileges,
310 oid,
311 },
312 self.op_id,
313 ) {
314 Ok(_) => Ok(()),
315 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
316 }
317 }
318
319 pub fn insert_builtin_role(
320 &mut self,
321 id: RoleId,
322 name: String,
323 attributes: RoleAttributes,
324 membership: RoleMembership,
325 vars: RoleVars,
326 oid: u32,
327 ) -> Result<RoleId, CatalogError> {
328 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
329 self.insert_role(id, name, attributes, membership, vars, oid)?;
330 Ok(id)
331 }
332
333 pub fn insert_user_role(
334 &mut self,
335 name: String,
336 attributes: RoleAttributes,
337 membership: RoleMembership,
338 vars: RoleVars,
339 temporary_oids: &HashSet<u32>,
340 ) -> Result<(RoleId, u32), CatalogError> {
341 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
342 let id = RoleId::User(id);
343 let oid = self.allocate_oid(temporary_oids)?;
344 self.insert_role(id, name, attributes, membership, vars, oid)?;
345 Ok((id, oid))
346 }
347
348 fn insert_role(
349 &mut self,
350 id: RoleId,
351 name: String,
352 attributes: RoleAttributes,
353 membership: RoleMembership,
354 vars: RoleVars,
355 oid: u32,
356 ) -> Result<(), CatalogError> {
357 if let Some(ref password) = attributes.password {
358 let hash =
359 mz_auth::hash::scram256_hash(password).expect("password hash should be valid");
360 match self.role_auth.insert(
361 RoleAuthKey { role_id: id },
362 RoleAuthValue {
363 password_hash: Some(hash),
364 updated_at: SYSTEM_TIME(),
365 },
366 self.op_id,
367 ) {
368 Ok(_) => {}
369 Err(_) => {
370 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
371 }
372 }
373 }
374
375 match self.roles.insert(
376 RoleKey { id },
377 RoleValue {
378 name: name.clone(),
379 attributes,
380 membership,
381 vars,
382 oid,
383 },
384 self.op_id,
385 ) {
386 Ok(_) => Ok(()),
387 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
388 }
389 }
390
391 pub fn insert_user_cluster(
393 &mut self,
394 cluster_id: ClusterId,
395 cluster_name: &str,
396 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
397 owner_id: RoleId,
398 privileges: Vec<MzAclItem>,
399 config: ClusterConfig,
400 temporary_oids: &HashSet<u32>,
401 ) -> Result<(), CatalogError> {
402 self.insert_cluster(
403 cluster_id,
404 cluster_name,
405 introspection_source_indexes,
406 owner_id,
407 privileges,
408 config,
409 temporary_oids,
410 )
411 }
412
413 pub fn insert_system_cluster(
415 &mut self,
416 cluster_name: &str,
417 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
418 privileges: Vec<MzAclItem>,
419 owner_id: RoleId,
420 config: ClusterConfig,
421 temporary_oids: &HashSet<u32>,
422 ) -> Result<(), CatalogError> {
423 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
424 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
425 self.insert_cluster(
426 cluster_id,
427 cluster_name,
428 introspection_source_indexes,
429 owner_id,
430 privileges,
431 config,
432 temporary_oids,
433 )
434 }
435
436 fn insert_cluster(
437 &mut self,
438 cluster_id: ClusterId,
439 cluster_name: &str,
440 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
441 owner_id: RoleId,
442 privileges: Vec<MzAclItem>,
443 config: ClusterConfig,
444 temporary_oids: &HashSet<u32>,
445 ) -> Result<(), CatalogError> {
446 if let Err(_) = self.clusters.insert(
447 ClusterKey { id: cluster_id },
448 ClusterValue {
449 name: cluster_name.to_string(),
450 owner_id,
451 privileges,
452 config,
453 },
454 self.op_id,
455 ) {
456 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
457 };
458
459 let amount = usize_to_u64(introspection_source_indexes.len());
460 let oids = self.allocate_oids(amount, temporary_oids)?;
461 let introspection_source_indexes: Vec<_> = introspection_source_indexes
462 .into_iter()
463 .zip(oids)
464 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
465 .collect();
466 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
467 let introspection_source_index = IntrospectionSourceIndex {
468 cluster_id,
469 name: builtin.name.to_string(),
470 item_id,
471 index_id,
472 oid,
473 };
474 let (key, value) = introspection_source_index.into_key_value();
475 self.introspection_sources
476 .insert(key, value, self.op_id)
477 .expect("no uniqueness violation");
478 }
479
480 Ok(())
481 }
482
483 pub fn rename_cluster(
484 &mut self,
485 cluster_id: ClusterId,
486 cluster_name: &str,
487 cluster_to_name: &str,
488 ) -> Result<(), CatalogError> {
489 let key = ClusterKey { id: cluster_id };
490
491 match self.clusters.update(
492 |k, v| {
493 if *k == key {
494 let mut value = v.clone();
495 value.name = cluster_to_name.to_string();
496 Some(value)
497 } else {
498 None
499 }
500 },
501 self.op_id,
502 )? {
503 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
504 Diff::ONE => Ok(()),
505 n => panic!(
506 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
507 ),
508 }
509 }
510
511 pub fn rename_cluster_replica(
512 &mut self,
513 replica_id: ReplicaId,
514 replica_name: &QualifiedReplica,
515 replica_to_name: &str,
516 ) -> Result<(), CatalogError> {
517 let key = ClusterReplicaKey { id: replica_id };
518
519 match self.cluster_replicas.update(
520 |k, v| {
521 if *k == key {
522 let mut value = v.clone();
523 value.name = replica_to_name.to_string();
524 Some(value)
525 } else {
526 None
527 }
528 },
529 self.op_id,
530 )? {
531 Diff::ZERO => {
532 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
533 }
534 Diff::ONE => Ok(()),
535 n => panic!(
536 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
537 ),
538 }
539 }
540
541 pub fn insert_cluster_replica(
542 &mut self,
543 cluster_id: ClusterId,
544 replica_name: &str,
545 config: ReplicaConfig,
546 owner_id: RoleId,
547 ) -> Result<ReplicaId, CatalogError> {
548 let replica_id = match cluster_id {
549 ClusterId::System(_) => self.allocate_system_replica_id()?,
550 ClusterId::User(_) => self.allocate_user_replica_id()?,
551 };
552 self.insert_cluster_replica_with_id(
553 cluster_id,
554 replica_id,
555 replica_name,
556 config,
557 owner_id,
558 )?;
559 Ok(replica_id)
560 }
561
562 pub(crate) fn insert_cluster_replica_with_id(
563 &mut self,
564 cluster_id: ClusterId,
565 replica_id: ReplicaId,
566 replica_name: &str,
567 config: ReplicaConfig,
568 owner_id: RoleId,
569 ) -> Result<(), CatalogError> {
570 if let Err(_) = self.cluster_replicas.insert(
571 ClusterReplicaKey { id: replica_id },
572 ClusterReplicaValue {
573 cluster_id,
574 name: replica_name.into(),
575 config,
576 owner_id,
577 },
578 self.op_id,
579 ) {
580 let cluster = self
581 .clusters
582 .get(&ClusterKey { id: cluster_id })
583 .expect("cluster exists");
584 return Err(SqlCatalogError::DuplicateReplica(
585 replica_name.to_string(),
586 cluster.name.to_string(),
587 )
588 .into());
589 };
590 Ok(())
591 }
592
593 pub fn insert_user_network_policy(
594 &mut self,
595 name: String,
596 rules: Vec<NetworkPolicyRule>,
597 privileges: Vec<MzAclItem>,
598 owner_id: RoleId,
599 temporary_oids: &HashSet<u32>,
600 ) -> Result<NetworkPolicyId, CatalogError> {
601 let oid = self.allocate_oid(temporary_oids)?;
602 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
603 let id = NetworkPolicyId::User(id);
604 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
605 }
606
607 pub fn insert_network_policy(
608 &mut self,
609 id: NetworkPolicyId,
610 name: String,
611 rules: Vec<NetworkPolicyRule>,
612 privileges: Vec<MzAclItem>,
613 owner_id: RoleId,
614 oid: u32,
615 ) -> Result<NetworkPolicyId, CatalogError> {
616 match self.network_policies.insert(
617 NetworkPolicyKey { id },
618 NetworkPolicyValue {
619 name: name.clone(),
620 rules,
621 privileges,
622 owner_id,
623 oid,
624 },
625 self.op_id,
626 ) {
627 Ok(_) => Ok(id),
628 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
629 }
630 }
631
632 pub fn update_introspection_source_index_gids(
637 &mut self,
638 mappings: impl Iterator<
639 Item = (
640 ClusterId,
641 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
642 ),
643 >,
644 ) -> Result<(), CatalogError> {
645 for (cluster_id, updates) in mappings {
646 for (name, item_id, index_id, oid) in updates {
647 let introspection_source_index = IntrospectionSourceIndex {
648 cluster_id,
649 name,
650 item_id,
651 index_id,
652 oid,
653 };
654 let (key, value) = introspection_source_index.into_key_value();
655
656 let prev = self
657 .introspection_sources
658 .set(key, Some(value), self.op_id)?;
659 if prev.is_none() {
660 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
661 "{index_id}"
662 ))
663 .into());
664 }
665 }
666 }
667 Ok(())
668 }
669
670 pub fn insert_user_item(
671 &mut self,
672 id: CatalogItemId,
673 global_id: GlobalId,
674 schema_id: SchemaId,
675 item_name: &str,
676 create_sql: String,
677 owner_id: RoleId,
678 privileges: Vec<MzAclItem>,
679 temporary_oids: &HashSet<u32>,
680 versions: BTreeMap<RelationVersion, GlobalId>,
681 ) -> Result<u32, CatalogError> {
682 let oid = self.allocate_oid(temporary_oids)?;
683 self.insert_item(
684 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
685 )?;
686 Ok(oid)
687 }
688
689 pub fn insert_item(
690 &mut self,
691 id: CatalogItemId,
692 oid: u32,
693 global_id: GlobalId,
694 schema_id: SchemaId,
695 item_name: &str,
696 create_sql: String,
697 owner_id: RoleId,
698 privileges: Vec<MzAclItem>,
699 extra_versions: BTreeMap<RelationVersion, GlobalId>,
700 ) -> Result<(), CatalogError> {
701 match self.items.insert(
702 ItemKey { id },
703 ItemValue {
704 schema_id,
705 name: item_name.to_string(),
706 create_sql,
707 owner_id,
708 privileges,
709 oid,
710 global_id,
711 extra_versions,
712 },
713 self.op_id,
714 ) {
715 Ok(_) => Ok(()),
716 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
717 }
718 }
719
720 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
721 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
722 }
723
724 pub fn get_and_increment_id_by(
725 &mut self,
726 key: String,
727 amount: u64,
728 ) -> Result<Vec<u64>, CatalogError> {
729 assert!(
730 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
731 "system item IDs cannot be allocated outside of bootstrap"
732 );
733
734 let current_id = self
735 .id_allocator
736 .items()
737 .get(&IdAllocKey { name: key.clone() })
738 .unwrap_or_else(|| panic!("{key} id allocator missing"))
739 .next_id;
740 let next_id = current_id
741 .checked_add(amount)
742 .ok_or(SqlCatalogError::IdExhaustion)?;
743 let prev = self.id_allocator.set(
744 IdAllocKey { name: key },
745 Some(IdAllocValue { next_id }),
746 self.op_id,
747 )?;
748 assert_eq!(
749 prev,
750 Some(IdAllocValue {
751 next_id: current_id
752 })
753 );
754 Ok((current_id..next_id).collect())
755 }
756
757 pub fn allocate_system_item_ids(
758 &mut self,
759 amount: u64,
760 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
761 assert!(
762 !self.durable_catalog.is_bootstrap_complete(),
763 "we can only allocate system item IDs during bootstrap"
764 );
765 Ok(self
766 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
767 .into_iter()
768 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
770 .collect())
771 }
772
773 pub fn allocate_introspection_source_index_id(
805 cluster_id: &ClusterId,
806 log_variant: LogVariant,
807 ) -> (CatalogItemId, GlobalId) {
808 let cluster_variant: u8 = match cluster_id {
809 ClusterId::System(_) => 1,
810 ClusterId::User(_) => 2,
811 };
812 let cluster_id: u64 = cluster_id.inner_id();
813 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
814 assert_eq!(
815 CLUSTER_ID_MASK & cluster_id,
816 0,
817 "invalid cluster ID: {cluster_id}"
818 );
819 let log_variant: u8 = match log_variant {
820 LogVariant::Timely(TimelyLog::Operates) => 1,
821 LogVariant::Timely(TimelyLog::Channels) => 2,
822 LogVariant::Timely(TimelyLog::Elapsed) => 3,
823 LogVariant::Timely(TimelyLog::Histogram) => 4,
824 LogVariant::Timely(TimelyLog::Addresses) => 5,
825 LogVariant::Timely(TimelyLog::Parks) => 6,
826 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
827 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
828 LogVariant::Timely(TimelyLog::Reachability) => 9,
829 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
830 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
831 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
832 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
833 LogVariant::Differential(DifferentialLog::Sharing) => 14,
834 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
835 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
836 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
837 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
838 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
839 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
840 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
841 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
842 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
843 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
844 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
845 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
846 LogVariant::Compute(ComputeLog::ShutdownDuration) => 27,
847 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
848 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
849 LogVariant::Compute(ComputeLog::LirMapping) => 30,
850 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
851 };
852
853 let mut id: u64 = u64::from(cluster_variant) << 56;
854 id |= cluster_id << 8;
855 id |= u64::from(log_variant);
856
857 (
858 CatalogItemId::IntrospectionSourceIndex(id),
859 GlobalId::IntrospectionSourceIndex(id),
860 )
861 }
862
863 pub fn allocate_user_item_ids(
864 &mut self,
865 amount: u64,
866 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
867 Ok(self
868 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
869 .into_iter()
870 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
872 .collect())
873 }
874
875 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
876 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
877 Ok(ReplicaId::User(id))
878 }
879
880 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
881 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
882 Ok(ReplicaId::System(id))
883 }
884
885 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
886 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
887 }
888
889 pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
890 self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
891 }
892
893 #[mz_ore::instrument]
896 fn allocate_oids(
897 &mut self,
898 amount: u64,
899 temporary_oids: &HashSet<u32>,
900 ) -> Result<Vec<u32>, CatalogError> {
901 struct UserOid(u32);
904
905 impl UserOid {
906 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
907 if oid < FIRST_USER_OID {
908 Err(anyhow!("invalid user OID {oid}"))
909 } else {
910 Ok(UserOid(oid))
911 }
912 }
913 }
914
915 impl std::ops::AddAssign<u32> for UserOid {
916 fn add_assign(&mut self, rhs: u32) {
917 let (res, overflow) = self.0.overflowing_add(rhs);
918 self.0 = if overflow { FIRST_USER_OID + res } else { res };
919 }
920 }
921
922 if amount > u32::MAX.into() {
923 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
924 }
925
926 let mut allocated_oids = HashSet::with_capacity(
932 self.databases.len()
933 + self.schemas.len()
934 + self.roles.len()
935 + self.items.len()
936 + self.introspection_sources.len()
937 + temporary_oids.len(),
938 );
939 self.databases.for_values(|_, value| {
940 allocated_oids.insert(value.oid);
941 });
942 self.schemas.for_values(|_, value| {
943 allocated_oids.insert(value.oid);
944 });
945 self.roles.for_values(|_, value| {
946 allocated_oids.insert(value.oid);
947 });
948 self.items.for_values(|_, value| {
949 allocated_oids.insert(value.oid);
950 });
951 self.introspection_sources.for_values(|_, value| {
952 allocated_oids.insert(value.oid);
953 });
954
955 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
956
957 let start_oid: u32 = self
958 .id_allocator
959 .items()
960 .get(&IdAllocKey {
961 name: OID_ALLOC_KEY.to_string(),
962 })
963 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
964 .next_id
965 .try_into()
966 .expect("we should never persist an oid outside of the u32 range");
967 let mut current_oid = UserOid::new(start_oid)
968 .expect("we should never persist an oid outside of user OID range");
969 let mut oids = Vec::new();
970 while oids.len() < u64_to_usize(amount) {
971 if !is_allocated(current_oid.0) {
972 oids.push(current_oid.0);
973 }
974 current_oid += 1;
975
976 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
977 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
979 }
980 }
981
982 let next_id = current_oid.0;
983 let prev = self.id_allocator.set(
984 IdAllocKey {
985 name: OID_ALLOC_KEY.to_string(),
986 },
987 Some(IdAllocValue {
988 next_id: next_id.into(),
989 }),
990 self.op_id,
991 )?;
992 assert_eq!(
993 prev,
994 Some(IdAllocValue {
995 next_id: start_oid.into(),
996 })
997 );
998
999 Ok(oids)
1000 }
1001
1002 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1005 self.allocate_oids(1, temporary_oids)
1006 .map(|oids| oids.into_element())
1007 }
1008
1009 pub(crate) fn insert_id_allocator(
1010 &mut self,
1011 name: String,
1012 next_id: u64,
1013 ) -> Result<(), CatalogError> {
1014 match self.id_allocator.insert(
1015 IdAllocKey { name: name.clone() },
1016 IdAllocValue { next_id },
1017 self.op_id,
1018 ) {
1019 Ok(_) => Ok(()),
1020 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1021 }
1022 }
1023
1024 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1031 let prev = self
1032 .databases
1033 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1034 if prev.is_some() {
1035 Ok(())
1036 } else {
1037 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1038 }
1039 }
1040
1041 pub fn remove_databases(
1048 &mut self,
1049 databases: &BTreeSet<DatabaseId>,
1050 ) -> Result<(), CatalogError> {
1051 if databases.is_empty() {
1052 return Ok(());
1053 }
1054
1055 let to_remove = databases
1056 .iter()
1057 .map(|id| (DatabaseKey { id: *id }, None))
1058 .collect();
1059 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1060 prev.retain(|_k, val| val.is_none());
1061
1062 if !prev.is_empty() {
1063 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1064 return Err(SqlCatalogError::UnknownDatabase(err).into());
1065 }
1066
1067 Ok(())
1068 }
1069
1070 pub fn remove_schema(
1077 &mut self,
1078 database_id: &Option<DatabaseId>,
1079 schema_id: &SchemaId,
1080 ) -> Result<(), CatalogError> {
1081 let prev = self
1082 .schemas
1083 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1084 if prev.is_some() {
1085 Ok(())
1086 } else {
1087 let database_name = match database_id {
1088 Some(id) => format!("{id}."),
1089 None => "".to_string(),
1090 };
1091 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1092 }
1093 }
1094
1095 pub fn remove_schemas(
1102 &mut self,
1103 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1104 ) -> Result<(), CatalogError> {
1105 if schemas.is_empty() {
1106 return Ok(());
1107 }
1108
1109 let to_remove = schemas
1110 .iter()
1111 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1112 .collect();
1113 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1114 prev.retain(|_k, v| v.is_none());
1115
1116 if !prev.is_empty() {
1117 let err = prev
1118 .keys()
1119 .map(|k| {
1120 let db_spec = schemas.get(&k.id).expect("should_exist");
1121 let db_name = match db_spec {
1122 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1123 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1124 };
1125 format!("{}.{}", db_name, k.id)
1126 })
1127 .join(", ");
1128
1129 return Err(SqlCatalogError::UnknownSchema(err).into());
1130 }
1131
1132 Ok(())
1133 }
1134
1135 pub fn remove_source_references(
1136 &mut self,
1137 source_id: CatalogItemId,
1138 ) -> Result<(), CatalogError> {
1139 let deleted = self
1140 .source_references
1141 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1142 .is_some();
1143 if deleted {
1144 Ok(())
1145 } else {
1146 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1147 }
1148 }
1149
1150 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1157 assert!(
1158 roles.iter().all(|id| id.is_user()),
1159 "cannot delete non-user roles"
1160 );
1161 self.remove_roles(roles)
1162 }
1163
1164 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1171 if roles.is_empty() {
1172 return Ok(());
1173 }
1174
1175 let to_remove_keys = roles
1176 .iter()
1177 .map(|role_id| RoleKey { id: *role_id })
1178 .collect::<Vec<_>>();
1179
1180 let to_remove_roles = to_remove_keys
1181 .iter()
1182 .map(|role_key| (role_key.clone(), None))
1183 .collect();
1184
1185 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1186
1187 let to_remove_role_auth = to_remove_keys
1188 .iter()
1189 .map(|role_key| {
1190 (
1191 RoleAuthKey {
1192 role_id: role_key.id,
1193 },
1194 None,
1195 )
1196 })
1197 .collect();
1198
1199 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1200
1201 prev.retain(|_k, v| v.is_none());
1202 if !prev.is_empty() {
1203 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1204 return Err(SqlCatalogError::UnknownRole(err).into());
1205 }
1206
1207 role_auth_prev.retain(|_k, v| v.is_none());
1208 Ok(())
1212 }
1213
1214 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1221 if clusters.is_empty() {
1222 return Ok(());
1223 }
1224
1225 let to_remove = clusters
1226 .iter()
1227 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1228 .collect();
1229 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1230
1231 prev.retain(|_k, v| v.is_none());
1232 if !prev.is_empty() {
1233 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1234 return Err(SqlCatalogError::UnknownCluster(err).into());
1235 }
1236
1237 self.cluster_replicas
1243 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1244 self.introspection_sources
1245 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1246
1247 Ok(())
1248 }
1249
1250 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1257 let deleted = self
1258 .cluster_replicas
1259 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1260 .is_some();
1261 if deleted {
1262 Ok(())
1263 } else {
1264 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1265 }
1266 }
1267
1268 pub fn remove_cluster_replicas(
1275 &mut self,
1276 replicas: &BTreeSet<ReplicaId>,
1277 ) -> Result<(), CatalogError> {
1278 if replicas.is_empty() {
1279 return Ok(());
1280 }
1281
1282 let to_remove = replicas
1283 .iter()
1284 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1285 .collect();
1286 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1287
1288 prev.retain(|_k, v| v.is_none());
1289 if !prev.is_empty() {
1290 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1291 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1292 }
1293
1294 Ok(())
1295 }
1296
1297 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1304 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1305 if prev.is_some() {
1306 Ok(())
1307 } else {
1308 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1309 }
1310 }
1311
1312 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1319 if ids.is_empty() {
1320 return Ok(());
1321 }
1322
1323 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1324 let n = self.items.delete_by_keys(ks, self.op_id).len();
1325 if n == ids.len() {
1326 Ok(())
1327 } else {
1328 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1329 let mut unknown = ids.difference(&item_ids);
1330 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1331 }
1332 }
1333
1334 pub fn remove_system_object_mappings(
1341 &mut self,
1342 descriptions: BTreeSet<SystemObjectDescription>,
1343 ) -> Result<(), CatalogError> {
1344 if descriptions.is_empty() {
1345 return Ok(());
1346 }
1347
1348 let ks: Vec<_> = descriptions
1349 .clone()
1350 .into_iter()
1351 .map(|desc| GidMappingKey {
1352 schema_name: desc.schema_name,
1353 object_type: desc.object_type,
1354 object_name: desc.object_name,
1355 })
1356 .collect();
1357 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1358
1359 if n == descriptions.len() {
1360 Ok(())
1361 } else {
1362 let item_descriptions = self
1363 .system_gid_mapping
1364 .items()
1365 .keys()
1366 .map(|k| SystemObjectDescription {
1367 schema_name: k.schema_name.clone(),
1368 object_type: k.object_type.clone(),
1369 object_name: k.object_name.clone(),
1370 })
1371 .collect();
1372 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1373 format!(
1374 "{} {}.{}",
1375 desc.object_type, desc.schema_name, desc.object_name
1376 )
1377 });
1378 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1379 }
1380 }
1381
1382 pub fn remove_introspection_source_indexes(
1389 &mut self,
1390 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1391 ) -> Result<(), CatalogError> {
1392 if introspection_source_indexes.is_empty() {
1393 return Ok(());
1394 }
1395
1396 let ks: Vec<_> = introspection_source_indexes
1397 .clone()
1398 .into_iter()
1399 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1400 .collect();
1401 let n = self
1402 .introspection_sources
1403 .delete_by_keys(ks, self.op_id)
1404 .len();
1405 if n == introspection_source_indexes.len() {
1406 Ok(())
1407 } else {
1408 let txn_indexes = self
1409 .introspection_sources
1410 .items()
1411 .keys()
1412 .map(|k| (k.cluster_id, k.name.clone()))
1413 .collect();
1414 let mut unknown = introspection_source_indexes
1415 .difference(&txn_indexes)
1416 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1417 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1418 }
1419 }
1420
1421 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1428 let updated =
1429 self.items
1430 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1431 if updated {
1432 Ok(())
1433 } else {
1434 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1435 }
1436 }
1437
1438 pub fn update_items(
1446 &mut self,
1447 items: BTreeMap<CatalogItemId, Item>,
1448 ) -> Result<(), CatalogError> {
1449 if items.is_empty() {
1450 return Ok(());
1451 }
1452
1453 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1454 let kvs: Vec<_> = items
1455 .clone()
1456 .into_iter()
1457 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1458 .collect();
1459 let n = self.items.update_by_keys(kvs, self.op_id)?;
1460 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1461 if n == update_ids.len() {
1462 Ok(())
1463 } else {
1464 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1465 let mut unknown = update_ids.difference(&item_ids);
1466 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1467 }
1468 }
1469
1470 pub fn update_role(&mut self, id: RoleId, role: Role) -> Result<(), CatalogError> {
1478 let key = RoleKey { id };
1479 if self.roles.get(&key).is_some() {
1480 let auth_key = RoleAuthKey { role_id: id };
1481
1482 if let Some(ref password) = role.attributes.password {
1483 let hash =
1484 mz_auth::hash::scram256_hash(password).expect("password hash should be valid");
1485 let value = RoleAuthValue {
1486 password_hash: Some(hash),
1487 updated_at: SYSTEM_TIME(),
1488 };
1489
1490 if self.role_auth.get(&auth_key).is_some() {
1491 self.role_auth
1492 .update_by_key(auth_key.clone(), value, self.op_id)?;
1493 } else {
1494 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1495 }
1496 } else if self.role_auth.get(&auth_key).is_some() {
1497 let value = RoleAuthValue {
1500 password_hash: None,
1501 updated_at: SYSTEM_TIME(),
1502 };
1503
1504 self.role_auth
1505 .update_by_key(auth_key.clone(), value, self.op_id)?;
1506 }
1507
1508 self.roles
1509 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1510
1511 Ok(())
1512 } else {
1513 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1514 }
1515 }
1516
1517 pub fn update_roles_without_auth(
1528 &mut self,
1529 roles: BTreeMap<RoleId, Role>,
1530 ) -> Result<(), CatalogError> {
1531 if roles.is_empty() {
1532 return Ok(());
1533 }
1534
1535 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1536 let kvs: Vec<_> = roles
1537 .into_iter()
1538 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1539 .collect();
1540 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1541 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1542
1543 if n == update_role_ids.len() {
1544 Ok(())
1545 } else {
1546 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1547 let mut unknown = update_role_ids.difference(&role_ids);
1548 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1549 }
1550 }
1551
1552 pub fn update_system_object_mappings(
1557 &mut self,
1558 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1559 ) -> Result<(), CatalogError> {
1560 if mappings.is_empty() {
1561 return Ok(());
1562 }
1563
1564 let n = self.system_gid_mapping.update(
1565 |_k, v| {
1566 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1567 let (_, new_value) = mapping.clone().into_key_value();
1568 Some(new_value)
1569 } else {
1570 None
1571 }
1572 },
1573 self.op_id,
1574 )?;
1575
1576 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1577 != mappings.len()
1578 {
1579 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1580 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1581 }
1582
1583 Ok(())
1584 }
1585
1586 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1593 let updated = self.clusters.update_by_key(
1594 ClusterKey { id },
1595 cluster.into_key_value().1,
1596 self.op_id,
1597 )?;
1598 if updated {
1599 Ok(())
1600 } else {
1601 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1602 }
1603 }
1604
1605 pub fn update_cluster_replica(
1612 &mut self,
1613 replica_id: ReplicaId,
1614 replica: ClusterReplica,
1615 ) -> Result<(), CatalogError> {
1616 let updated = self.cluster_replicas.update_by_key(
1617 ClusterReplicaKey { id: replica_id },
1618 replica.into_key_value().1,
1619 self.op_id,
1620 )?;
1621 if updated {
1622 Ok(())
1623 } else {
1624 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1625 }
1626 }
1627
1628 pub fn update_database(
1635 &mut self,
1636 id: DatabaseId,
1637 database: Database,
1638 ) -> Result<(), CatalogError> {
1639 let updated = self.databases.update_by_key(
1640 DatabaseKey { id },
1641 database.into_key_value().1,
1642 self.op_id,
1643 )?;
1644 if updated {
1645 Ok(())
1646 } else {
1647 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1648 }
1649 }
1650
1651 pub fn update_schema(
1658 &mut self,
1659 schema_id: SchemaId,
1660 schema: Schema,
1661 ) -> Result<(), CatalogError> {
1662 let updated = self.schemas.update_by_key(
1663 SchemaKey { id: schema_id },
1664 schema.into_key_value().1,
1665 self.op_id,
1666 )?;
1667 if updated {
1668 Ok(())
1669 } else {
1670 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1671 }
1672 }
1673
1674 pub fn update_network_policy(
1681 &mut self,
1682 id: NetworkPolicyId,
1683 network_policy: NetworkPolicy,
1684 ) -> Result<(), CatalogError> {
1685 let updated = self.network_policies.update_by_key(
1686 NetworkPolicyKey { id },
1687 network_policy.into_key_value().1,
1688 self.op_id,
1689 )?;
1690 if updated {
1691 Ok(())
1692 } else {
1693 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1694 }
1695 }
1696 pub fn remove_network_policies(
1703 &mut self,
1704 network_policies: &BTreeSet<NetworkPolicyId>,
1705 ) -> Result<(), CatalogError> {
1706 if network_policies.is_empty() {
1707 return Ok(());
1708 }
1709
1710 let to_remove = network_policies
1711 .iter()
1712 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1713 .collect();
1714 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1715 assert!(
1716 prev.iter().all(|(k, _)| k.id.is_user()),
1717 "cannot delete non-user network policy"
1718 );
1719
1720 prev.retain(|_k, v| v.is_none());
1721 if !prev.is_empty() {
1722 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1723 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1724 }
1725
1726 Ok(())
1727 }
1728 pub fn set_default_privilege(
1732 &mut self,
1733 role_id: RoleId,
1734 database_id: Option<DatabaseId>,
1735 schema_id: Option<SchemaId>,
1736 object_type: ObjectType,
1737 grantee: RoleId,
1738 privileges: Option<AclMode>,
1739 ) -> Result<(), CatalogError> {
1740 self.default_privileges.set(
1741 DefaultPrivilegesKey {
1742 role_id,
1743 database_id,
1744 schema_id,
1745 object_type,
1746 grantee,
1747 },
1748 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1749 self.op_id,
1750 )?;
1751 Ok(())
1752 }
1753
1754 pub fn set_default_privileges(
1756 &mut self,
1757 default_privileges: Vec<DefaultPrivilege>,
1758 ) -> Result<(), CatalogError> {
1759 if default_privileges.is_empty() {
1760 return Ok(());
1761 }
1762
1763 let default_privileges = default_privileges
1764 .into_iter()
1765 .map(DurableType::into_key_value)
1766 .map(|(k, v)| (k, Some(v)))
1767 .collect();
1768 self.default_privileges
1769 .set_many(default_privileges, self.op_id)?;
1770 Ok(())
1771 }
1772
1773 pub fn set_system_privilege(
1777 &mut self,
1778 grantee: RoleId,
1779 grantor: RoleId,
1780 acl_mode: Option<AclMode>,
1781 ) -> Result<(), CatalogError> {
1782 self.system_privileges.set(
1783 SystemPrivilegesKey { grantee, grantor },
1784 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1785 self.op_id,
1786 )?;
1787 Ok(())
1788 }
1789
1790 pub fn set_system_privileges(
1792 &mut self,
1793 system_privileges: Vec<MzAclItem>,
1794 ) -> Result<(), CatalogError> {
1795 if system_privileges.is_empty() {
1796 return Ok(());
1797 }
1798
1799 let system_privileges = system_privileges
1800 .into_iter()
1801 .map(DurableType::into_key_value)
1802 .map(|(k, v)| (k, Some(v)))
1803 .collect();
1804 self.system_privileges
1805 .set_many(system_privileges, self.op_id)?;
1806 Ok(())
1807 }
1808
1809 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1811 self.settings.set(
1812 SettingKey { name },
1813 value.map(|value| SettingValue { value }),
1814 self.op_id,
1815 )?;
1816 Ok(())
1817 }
1818
1819 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1820 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1821 }
1822
1823 pub fn insert_introspection_source_indexes(
1825 &mut self,
1826 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1827 temporary_oids: &HashSet<u32>,
1828 ) -> Result<(), CatalogError> {
1829 if introspection_source_indexes.is_empty() {
1830 return Ok(());
1831 }
1832
1833 let amount = usize_to_u64(introspection_source_indexes.len());
1834 let oids = self.allocate_oids(amount, temporary_oids)?;
1835 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1836 .into_iter()
1837 .zip(oids)
1838 .map(
1839 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1840 cluster_id,
1841 name,
1842 item_id,
1843 index_id,
1844 oid,
1845 },
1846 )
1847 .collect();
1848
1849 for introspection_source_index in introspection_source_indexes {
1850 let (key, value) = introspection_source_index.into_key_value();
1851 self.introspection_sources.insert(key, value, self.op_id)?;
1852 }
1853
1854 Ok(())
1855 }
1856
1857 pub fn set_system_object_mappings(
1859 &mut self,
1860 mappings: Vec<SystemObjectMapping>,
1861 ) -> Result<(), CatalogError> {
1862 if mappings.is_empty() {
1863 return Ok(());
1864 }
1865
1866 let mappings = mappings
1867 .into_iter()
1868 .map(DurableType::into_key_value)
1869 .map(|(k, v)| (k, Some(v)))
1870 .collect();
1871 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1872 Ok(())
1873 }
1874
1875 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1877 if replicas.is_empty() {
1878 return Ok(());
1879 }
1880
1881 let replicas = replicas
1882 .into_iter()
1883 .map(DurableType::into_key_value)
1884 .map(|(k, v)| (k, Some(v)))
1885 .collect();
1886 self.cluster_replicas.set_many(replicas, self.op_id)?;
1887 Ok(())
1888 }
1889
1890 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1892 match value {
1893 Some(value) => {
1894 let config = Config { key, value };
1895 let (key, value) = config.into_key_value();
1896 self.configs.set(key, Some(value), self.op_id)?;
1897 }
1898 None => {
1899 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1900 }
1901 }
1902 Ok(())
1903 }
1904
1905 pub fn get_config(&self, key: String) -> Option<u64> {
1907 self.configs
1908 .get(&ConfigKey { key })
1909 .map(|entry| entry.value)
1910 }
1911
1912 fn get_setting(&self, name: String) -> Option<&str> {
1914 self.settings
1915 .get(&SettingKey { name })
1916 .map(|entry| &*entry.value)
1917 }
1918
1919 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1920 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1921 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1922 }
1923
1924 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1925 self.set_setting(
1926 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1927 Some(shard_id.to_string()),
1928 )
1929 }
1930
1931 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1932 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1933 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1934 }
1935
1936 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1937 self.set_setting(
1938 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1939 Some(shard_id.to_string()),
1940 )
1941 }
1942
1943 pub fn set_enable_0dt_deployment(&mut self, value: bool) -> Result<(), CatalogError> {
1949 self.set_config(ENABLE_0DT_DEPLOYMENT.into(), Some(u64::from(value)))
1950 }
1951
1952 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1958 self.set_config(
1959 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1960 Some(
1961 value
1962 .as_millis()
1963 .try_into()
1964 .expect("max wait fits into u64"),
1965 ),
1966 )
1967 }
1968
1969 pub fn set_0dt_deployment_ddl_check_interval(
1976 &mut self,
1977 value: Duration,
1978 ) -> Result<(), CatalogError> {
1979 self.set_config(
1980 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
1981 Some(
1982 value
1983 .as_millis()
1984 .try_into()
1985 .expect("ddl check interval fits into u64"),
1986 ),
1987 )
1988 }
1989
1990 pub fn set_enable_0dt_deployment_panic_after_timeout(
1996 &mut self,
1997 value: bool,
1998 ) -> Result<(), CatalogError> {
1999 self.set_config(
2000 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2001 Some(u64::from(value)),
2002 )
2003 }
2004
2005 pub fn reset_enable_0dt_deployment(&mut self) -> Result<(), CatalogError> {
2011 self.set_config(ENABLE_0DT_DEPLOYMENT.into(), None)
2012 }
2013
2014 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2020 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2021 }
2022
2023 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2030 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2031 }
2032
2033 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2040 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2041 }
2042
2043 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2045 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2046 }
2047
2048 pub fn update_comment(
2049 &mut self,
2050 object_id: CommentObjectId,
2051 sub_component: Option<usize>,
2052 comment: Option<String>,
2053 ) -> Result<(), CatalogError> {
2054 let key = CommentKey {
2055 object_id,
2056 sub_component,
2057 };
2058 let value = comment.map(|c| CommentValue { comment: c });
2059 self.comments.set(key, value, self.op_id)?;
2060
2061 Ok(())
2062 }
2063
2064 pub fn drop_comments(
2065 &mut self,
2066 object_ids: &BTreeSet<CommentObjectId>,
2067 ) -> Result<(), CatalogError> {
2068 if object_ids.is_empty() {
2069 return Ok(());
2070 }
2071
2072 self.comments
2073 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2074 Ok(())
2075 }
2076
2077 pub fn update_source_references(
2078 &mut self,
2079 source_id: CatalogItemId,
2080 references: Vec<SourceReference>,
2081 updated_at: u64,
2082 ) -> Result<(), CatalogError> {
2083 let key = SourceReferencesKey { source_id };
2084 let value = SourceReferencesValue {
2085 references,
2086 updated_at,
2087 };
2088 self.source_references.set(key, Some(value), self.op_id)?;
2089 Ok(())
2090 }
2091
2092 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2094 let key = ServerConfigurationKey {
2095 name: name.to_string(),
2096 };
2097 let value = ServerConfigurationValue { value };
2098 self.system_configurations
2099 .set(key, Some(value), self.op_id)?;
2100 Ok(())
2101 }
2102
2103 pub fn remove_system_config(&mut self, name: &str) {
2105 let key = ServerConfigurationKey {
2106 name: name.to_string(),
2107 };
2108 self.system_configurations
2109 .set(key, None, self.op_id)
2110 .expect("cannot have uniqueness violation");
2111 }
2112
2113 pub fn clear_system_configs(&mut self) {
2115 self.system_configurations.delete(|_k, _v| true, self.op_id);
2116 }
2117
2118 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2119 match self.configs.insert(
2120 ConfigKey { key: key.clone() },
2121 ConfigValue { value },
2122 self.op_id,
2123 ) {
2124 Ok(_) => Ok(()),
2125 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2126 }
2127 }
2128
2129 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2130 self.clusters
2131 .items()
2132 .into_iter()
2133 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2134 }
2135
2136 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2137 self.cluster_replicas
2138 .items()
2139 .into_iter()
2140 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2141 }
2142
2143 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2144 self.roles
2145 .items()
2146 .into_iter()
2147 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2148 }
2149
2150 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2151 self.network_policies
2152 .items()
2153 .into_iter()
2154 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2155 }
2156
2157 pub fn get_system_object_mappings(
2158 &self,
2159 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2160 self.system_gid_mapping
2161 .items()
2162 .into_iter()
2163 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2164 }
2165
2166 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2167 self.schemas
2168 .items()
2169 .into_iter()
2170 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2171 }
2172
2173 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2174 self.system_configurations
2175 .items()
2176 .into_iter()
2177 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2178 }
2179
2180 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2181 let key = SchemaKey { id: *id };
2182 self.schemas
2183 .get(&key)
2184 .map(|v| DurableType::from_key_value(key, v.clone()))
2185 }
2186
2187 pub fn get_introspection_source_indexes(
2188 &self,
2189 cluster_id: ClusterId,
2190 ) -> BTreeMap<&str, (GlobalId, u32)> {
2191 self.introspection_sources
2192 .items()
2193 .into_iter()
2194 .filter(|(k, _v)| k.cluster_id == cluster_id)
2195 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2196 .collect()
2197 }
2198
2199 pub fn get_catalog_content_version(&self) -> Option<&str> {
2200 self.settings
2201 .get(&SettingKey {
2202 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2203 })
2204 .map(|value| &*value.value)
2205 }
2206
2207 #[must_use]
2213 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2214 let updates = self.get_op_updates();
2215 self.commit_op();
2216 updates
2217 }
2218
2219 fn get_op_updates(&self) -> Vec<StateUpdate> {
2220 fn get_collection_op_updates<'a, T>(
2221 table_txn: &'a TableTransaction<T::Key, T::Value>,
2222 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2223 op: Timestamp,
2224 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2225 where
2226 T::Key: Ord + Eq + Clone + Debug,
2227 T::Value: Ord + Clone + Debug,
2228 T: DurableType,
2229 {
2230 table_txn
2231 .pending
2232 .iter()
2233 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2234 .filter_map(move |(k, v)| {
2235 if v.ts == op {
2236 let key = k.clone();
2237 let value = v.value.clone();
2238 let diff = v.diff.clone().try_into().expect("invalid diff");
2239 let update = DurableType::from_key_value(key, value);
2240 let kind = kind_fn(update);
2241 Some((kind, diff))
2242 } else {
2243 None
2244 }
2245 })
2246 }
2247
2248 fn get_large_collection_op_updates<'a, T>(
2249 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2250 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2251 op: Timestamp,
2252 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2253 where
2254 T::Key: Ord + Eq + Clone + Debug,
2255 T: DurableType<Value = ()>,
2256 {
2257 collection.iter().filter_map(move |(k, diff, ts)| {
2258 if *ts == op {
2259 let key = k.clone();
2260 let diff = diff.clone().try_into().expect("invalid diff");
2261 let update = DurableType::from_key_value(key, ());
2262 let kind = kind_fn(update);
2263 Some((kind, diff))
2264 } else {
2265 None
2266 }
2267 })
2268 }
2269
2270 let Transaction {
2271 durable_catalog: _,
2272 databases,
2273 schemas,
2274 items,
2275 comments,
2276 roles,
2277 role_auth,
2278 clusters,
2279 network_policies,
2280 cluster_replicas,
2281 introspection_sources,
2282 system_gid_mapping,
2283 system_configurations,
2284 default_privileges,
2285 source_references,
2286 system_privileges,
2287 audit_log_updates,
2288 storage_collection_metadata,
2289 unfinalized_shards,
2290 id_allocator: _,
2292 configs: _,
2293 settings: _,
2294 txn_wal_shard: _,
2295 upper,
2296 op_id: _,
2297 } = &self;
2298
2299 let updates = std::iter::empty()
2300 .chain(get_collection_op_updates(
2301 roles,
2302 StateUpdateKind::Role,
2303 self.op_id,
2304 ))
2305 .chain(get_collection_op_updates(
2306 role_auth,
2307 StateUpdateKind::RoleAuth,
2308 self.op_id,
2309 ))
2310 .chain(get_collection_op_updates(
2311 databases,
2312 StateUpdateKind::Database,
2313 self.op_id,
2314 ))
2315 .chain(get_collection_op_updates(
2316 schemas,
2317 StateUpdateKind::Schema,
2318 self.op_id,
2319 ))
2320 .chain(get_collection_op_updates(
2321 default_privileges,
2322 StateUpdateKind::DefaultPrivilege,
2323 self.op_id,
2324 ))
2325 .chain(get_collection_op_updates(
2326 system_privileges,
2327 StateUpdateKind::SystemPrivilege,
2328 self.op_id,
2329 ))
2330 .chain(get_collection_op_updates(
2331 system_configurations,
2332 StateUpdateKind::SystemConfiguration,
2333 self.op_id,
2334 ))
2335 .chain(get_collection_op_updates(
2336 clusters,
2337 StateUpdateKind::Cluster,
2338 self.op_id,
2339 ))
2340 .chain(get_collection_op_updates(
2341 network_policies,
2342 StateUpdateKind::NetworkPolicy,
2343 self.op_id,
2344 ))
2345 .chain(get_collection_op_updates(
2346 introspection_sources,
2347 StateUpdateKind::IntrospectionSourceIndex,
2348 self.op_id,
2349 ))
2350 .chain(get_collection_op_updates(
2351 cluster_replicas,
2352 StateUpdateKind::ClusterReplica,
2353 self.op_id,
2354 ))
2355 .chain(get_collection_op_updates(
2356 system_gid_mapping,
2357 StateUpdateKind::SystemObjectMapping,
2358 self.op_id,
2359 ))
2360 .chain(get_collection_op_updates(
2361 items,
2362 StateUpdateKind::Item,
2363 self.op_id,
2364 ))
2365 .chain(get_collection_op_updates(
2366 comments,
2367 StateUpdateKind::Comment,
2368 self.op_id,
2369 ))
2370 .chain(get_collection_op_updates(
2371 source_references,
2372 StateUpdateKind::SourceReferences,
2373 self.op_id,
2374 ))
2375 .chain(get_collection_op_updates(
2376 storage_collection_metadata,
2377 StateUpdateKind::StorageCollectionMetadata,
2378 self.op_id,
2379 ))
2380 .chain(get_collection_op_updates(
2381 unfinalized_shards,
2382 StateUpdateKind::UnfinalizedShard,
2383 self.op_id,
2384 ))
2385 .chain(get_large_collection_op_updates(
2386 audit_log_updates,
2387 StateUpdateKind::AuditLog,
2388 self.op_id,
2389 ))
2390 .map(|(kind, diff)| StateUpdate {
2391 kind,
2392 ts: upper.clone(),
2393 diff,
2394 })
2395 .collect();
2396
2397 updates
2398 }
2399
2400 pub fn is_savepoint(&self) -> bool {
2401 self.durable_catalog.is_savepoint()
2402 }
2403
2404 fn commit_op(&mut self) {
2405 self.op_id += 1;
2406 }
2407
2408 pub fn op_id(&self) -> Timestamp {
2409 self.op_id
2410 }
2411
2412 pub fn upper(&self) -> mz_repr::Timestamp {
2413 self.upper
2414 }
2415
2416 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2417 let audit_log_updates = self
2418 .audit_log_updates
2419 .into_iter()
2420 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2421 .collect();
2422
2423 let txn_batch = TransactionBatch {
2424 databases: self.databases.pending(),
2425 schemas: self.schemas.pending(),
2426 items: self.items.pending(),
2427 comments: self.comments.pending(),
2428 roles: self.roles.pending(),
2429 role_auth: self.role_auth.pending(),
2430 clusters: self.clusters.pending(),
2431 cluster_replicas: self.cluster_replicas.pending(),
2432 network_policies: self.network_policies.pending(),
2433 introspection_sources: self.introspection_sources.pending(),
2434 id_allocator: self.id_allocator.pending(),
2435 configs: self.configs.pending(),
2436 source_references: self.source_references.pending(),
2437 settings: self.settings.pending(),
2438 system_gid_mapping: self.system_gid_mapping.pending(),
2439 system_configurations: self.system_configurations.pending(),
2440 default_privileges: self.default_privileges.pending(),
2441 system_privileges: self.system_privileges.pending(),
2442 storage_collection_metadata: self.storage_collection_metadata.pending(),
2443 unfinalized_shards: self.unfinalized_shards.pending(),
2444 txn_wal_shard: self.txn_wal_shard.pending(),
2445 audit_log_updates,
2446 upper: self.upper,
2447 };
2448 (txn_batch, self.durable_catalog)
2449 }
2450
2451 #[mz_ore::instrument(level = "debug")]
2463 pub(crate) async fn commit_internal(
2464 self,
2465 commit_ts: mz_repr::Timestamp,
2466 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2467 let (mut txn_batch, durable_catalog) = self.into_parts();
2468 let TransactionBatch {
2469 databases,
2470 schemas,
2471 items,
2472 comments,
2473 roles,
2474 role_auth,
2475 clusters,
2476 cluster_replicas,
2477 network_policies,
2478 introspection_sources,
2479 id_allocator,
2480 configs,
2481 source_references,
2482 settings,
2483 system_gid_mapping,
2484 system_configurations,
2485 default_privileges,
2486 system_privileges,
2487 storage_collection_metadata,
2488 unfinalized_shards,
2489 txn_wal_shard,
2490 audit_log_updates,
2491 upper,
2492 } = &mut txn_batch;
2493 differential_dataflow::consolidation::consolidate_updates(databases);
2496 differential_dataflow::consolidation::consolidate_updates(schemas);
2497 differential_dataflow::consolidation::consolidate_updates(items);
2498 differential_dataflow::consolidation::consolidate_updates(comments);
2499 differential_dataflow::consolidation::consolidate_updates(roles);
2500 differential_dataflow::consolidation::consolidate_updates(role_auth);
2501 differential_dataflow::consolidation::consolidate_updates(clusters);
2502 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2503 differential_dataflow::consolidation::consolidate_updates(network_policies);
2504 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2505 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2506 differential_dataflow::consolidation::consolidate_updates(configs);
2507 differential_dataflow::consolidation::consolidate_updates(settings);
2508 differential_dataflow::consolidation::consolidate_updates(source_references);
2509 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2510 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2511 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2512 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2513 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2514 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2515 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2516 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2517
2518 assert!(
2519 commit_ts >= *upper,
2520 "expected commit ts, {}, to be greater than or equal to upper, {}",
2521 commit_ts,
2522 upper
2523 );
2524 let upper = durable_catalog
2525 .commit_transaction(txn_batch, commit_ts)
2526 .await?;
2527 Ok((durable_catalog, upper))
2528 }
2529
2530 #[mz_ore::instrument(level = "debug")]
2547 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2548 let op_updates = self.get_op_updates();
2549 assert!(
2550 op_updates.is_empty(),
2551 "unconsumed transaction updates: {op_updates:?}"
2552 );
2553
2554 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2555 let updates = durable_storage.sync_updates(upper).await?;
2557 soft_assert_no_log!(
2562 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2563 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2564 );
2565 Ok(())
2566 }
2567}
2568
2569use crate::durable::async_trait;
2570
2571use super::objects::{RoleAuthKey, RoleAuthValue};
2572
2573#[async_trait]
2574impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2575 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2576 self.storage_collection_metadata
2577 .items()
2578 .into_iter()
2579 .map(
2580 |(
2581 StorageCollectionMetadataKey { id },
2582 StorageCollectionMetadataValue { shard },
2583 )| { (*id, shard.clone()) },
2584 )
2585 .collect()
2586 }
2587
2588 fn insert_collection_metadata(
2589 &mut self,
2590 metadata: BTreeMap<GlobalId, ShardId>,
2591 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2592 for (id, shard) in metadata {
2593 self.storage_collection_metadata
2594 .insert(
2595 StorageCollectionMetadataKey { id },
2596 StorageCollectionMetadataValue {
2597 shard: shard.clone(),
2598 },
2599 self.op_id,
2600 )
2601 .map_err(|err| match err {
2602 DurableCatalogError::DuplicateKey => {
2603 StorageError::CollectionMetadataAlreadyExists(id)
2604 }
2605 DurableCatalogError::UniquenessViolation => {
2606 StorageError::PersistShardAlreadyInUse(shard)
2607 }
2608 err => StorageError::Generic(anyhow::anyhow!(err)),
2609 })?;
2610 }
2611 Ok(())
2612 }
2613
2614 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2615 let ks: Vec<_> = ids
2616 .into_iter()
2617 .map(|id| StorageCollectionMetadataKey { id })
2618 .collect();
2619 self.storage_collection_metadata
2620 .delete_by_keys(ks, self.op_id)
2621 .into_iter()
2622 .map(
2623 |(
2624 StorageCollectionMetadataKey { id },
2625 StorageCollectionMetadataValue { shard },
2626 )| (id, shard),
2627 )
2628 .collect()
2629 }
2630
2631 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2632 self.unfinalized_shards
2633 .items()
2634 .into_iter()
2635 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2636 .collect()
2637 }
2638
2639 fn insert_unfinalized_shards(
2640 &mut self,
2641 s: BTreeSet<ShardId>,
2642 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2643 for shard in s {
2644 match self
2645 .unfinalized_shards
2646 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2647 {
2648 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2650 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2651 };
2652 }
2653 Ok(())
2654 }
2655
2656 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2657 let ks: Vec<_> = shards
2658 .into_iter()
2659 .map(|shard| UnfinalizedShardKey { shard })
2660 .collect();
2661 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2662 }
2663
2664 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2665 self.txn_wal_shard
2666 .values()
2667 .iter()
2668 .next()
2669 .map(|TxnWalShardValue { shard }| *shard)
2670 }
2671
2672 fn write_txn_wal_shard(
2673 &mut self,
2674 shard: ShardId,
2675 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2676 self.txn_wal_shard
2677 .insert((), TxnWalShardValue { shard }, self.op_id)
2678 .map_err(|err| match err {
2679 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2680 err => StorageError::Generic(anyhow::anyhow!(err)),
2681 })
2682 }
2683}
2684
2685#[derive(Debug, Clone, Default, PartialEq)]
2687pub struct TransactionBatch {
2688 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2689 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2690 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2691 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2692 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2693 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2694 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2695 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2696 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2697 pub(crate) introspection_sources: Vec<(
2698 proto::ClusterIntrospectionSourceIndexKey,
2699 proto::ClusterIntrospectionSourceIndexValue,
2700 Diff,
2701 )>,
2702 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2703 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2704 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2705 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2706 pub(crate) system_configurations: Vec<(
2707 proto::ServerConfigurationKey,
2708 proto::ServerConfigurationValue,
2709 Diff,
2710 )>,
2711 pub(crate) default_privileges: Vec<(
2712 proto::DefaultPrivilegesKey,
2713 proto::DefaultPrivilegesValue,
2714 Diff,
2715 )>,
2716 pub(crate) source_references: Vec<(
2717 proto::SourceReferencesKey,
2718 proto::SourceReferencesValue,
2719 Diff,
2720 )>,
2721 pub(crate) system_privileges: Vec<(
2722 proto::SystemPrivilegesKey,
2723 proto::SystemPrivilegesValue,
2724 Diff,
2725 )>,
2726 pub(crate) storage_collection_metadata: Vec<(
2727 proto::StorageCollectionMetadataKey,
2728 proto::StorageCollectionMetadataValue,
2729 Diff,
2730 )>,
2731 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2732 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2733 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2734 pub(crate) upper: mz_repr::Timestamp,
2736}
2737
2738impl TransactionBatch {
2739 pub fn is_empty(&self) -> bool {
2740 let TransactionBatch {
2741 databases,
2742 schemas,
2743 items,
2744 comments,
2745 roles,
2746 role_auth,
2747 clusters,
2748 cluster_replicas,
2749 network_policies,
2750 introspection_sources,
2751 id_allocator,
2752 configs,
2753 settings,
2754 source_references,
2755 system_gid_mapping,
2756 system_configurations,
2757 default_privileges,
2758 system_privileges,
2759 storage_collection_metadata,
2760 unfinalized_shards,
2761 txn_wal_shard,
2762 audit_log_updates,
2763 upper: _,
2764 } = self;
2765 databases.is_empty()
2766 && schemas.is_empty()
2767 && items.is_empty()
2768 && comments.is_empty()
2769 && roles.is_empty()
2770 && role_auth.is_empty()
2771 && clusters.is_empty()
2772 && cluster_replicas.is_empty()
2773 && network_policies.is_empty()
2774 && introspection_sources.is_empty()
2775 && id_allocator.is_empty()
2776 && configs.is_empty()
2777 && settings.is_empty()
2778 && source_references.is_empty()
2779 && system_gid_mapping.is_empty()
2780 && system_configurations.is_empty()
2781 && default_privileges.is_empty()
2782 && system_privileges.is_empty()
2783 && storage_collection_metadata.is_empty()
2784 && unfinalized_shards.is_empty()
2785 && txn_wal_shard.is_empty()
2786 && audit_log_updates.is_empty()
2787 }
2788}
2789
2790#[derive(Debug, Clone, PartialEq, Eq)]
2791struct TransactionUpdate<V> {
2792 value: V,
2793 ts: Timestamp,
2794 diff: Diff,
2795}
2796
2797trait UniqueName {
2799 const HAS_UNIQUE_NAME: bool;
2802 fn unique_name(&self) -> &str;
2804}
2805
2806mod unique_name {
2807 use crate::durable::objects::*;
2808
2809 macro_rules! impl_unique_name {
2810 ($($t:ty),* $(,)?) => {
2811 $(
2812 impl crate::durable::transaction::UniqueName for $t {
2813 const HAS_UNIQUE_NAME: bool = true;
2814 fn unique_name(&self) -> &str {
2815 &self.name
2816 }
2817 }
2818 )*
2819 };
2820 }
2821
2822 macro_rules! impl_no_unique_name {
2823 ($($t:ty),* $(,)?) => {
2824 $(
2825 impl crate::durable::transaction::UniqueName for $t {
2826 const HAS_UNIQUE_NAME: bool = false;
2827 fn unique_name(&self) -> &str {
2828 ""
2829 }
2830 }
2831 )*
2832 };
2833 }
2834
2835 impl_unique_name! {
2836 ClusterReplicaValue,
2837 ClusterValue,
2838 DatabaseValue,
2839 ItemValue,
2840 NetworkPolicyValue,
2841 RoleValue,
2842 SchemaValue,
2843 }
2844
2845 impl_no_unique_name!(
2846 (),
2847 ClusterIntrospectionSourceIndexValue,
2848 CommentValue,
2849 ConfigValue,
2850 DefaultPrivilegesValue,
2851 GidMappingValue,
2852 IdAllocValue,
2853 ServerConfigurationValue,
2854 SettingValue,
2855 SourceReferencesValue,
2856 StorageCollectionMetadataValue,
2857 SystemPrivilegesValue,
2858 TxnWalShardValue,
2859 RoleAuthValue,
2860 );
2861
2862 #[cfg(test)]
2863 mod test {
2864 impl_no_unique_name!(String,);
2865 }
2866}
2867
2868#[derive(Debug, PartialEq, Eq)]
2878struct TableTransaction<K, V> {
2879 initial: BTreeMap<K, V>,
2880 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2883 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2884}
2885
2886impl<K, V> TableTransaction<K, V>
2887where
2888 K: Ord + Eq + Clone + Debug,
2889 V: Ord + Clone + Debug + UniqueName,
2890{
2891 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2898 where
2899 K: RustType<KP>,
2900 V: RustType<VP>,
2901 {
2902 let initial = initial
2903 .into_iter()
2904 .map(RustType::from_proto)
2905 .collect::<Result<_, _>>()?;
2906
2907 Ok(Self {
2908 initial,
2909 pending: BTreeMap::new(),
2910 uniqueness_violation: None,
2911 })
2912 }
2913
2914 fn new_with_uniqueness_fn<KP, VP>(
2917 initial: BTreeMap<KP, VP>,
2918 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2919 ) -> Result<Self, TryFromProtoError>
2920 where
2921 K: RustType<KP>,
2922 V: RustType<VP>,
2923 {
2924 let initial = initial
2925 .into_iter()
2926 .map(RustType::from_proto)
2927 .collect::<Result<_, _>>()?;
2928
2929 Ok(Self {
2930 initial,
2931 pending: BTreeMap::new(),
2932 uniqueness_violation: Some(uniqueness_violation),
2933 })
2934 }
2935
2936 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2939 where
2940 K: RustType<KP>,
2941 V: RustType<VP>,
2942 {
2943 soft_assert_no_log!(self.verify().is_ok());
2944 self.pending
2947 .into_iter()
2948 .flat_map(|(k, v)| {
2949 let mut v: Vec<_> = v
2950 .into_iter()
2951 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2952 .collect();
2953 differential_dataflow::consolidation::consolidate(&mut v);
2954 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2955 })
2956 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2957 .collect()
2958 }
2959
2960 fn verify(&self) -> Result<(), DurableCatalogError> {
2965 if let Some(uniqueness_violation) = self.uniqueness_violation {
2966 let items = self.values();
2968 if V::HAS_UNIQUE_NAME {
2969 let by_name: BTreeMap<_, _> = items
2970 .iter()
2971 .enumerate()
2972 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2973 .collect();
2974 for (i, vi) in items.iter().enumerate() {
2975 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
2976 if i != *j && uniqueness_violation(vi, *vj) {
2977 return Err(DurableCatalogError::UniquenessViolation);
2978 }
2979 }
2980 }
2981 } else {
2982 for (i, vi) in items.iter().enumerate() {
2983 for (j, vj) in items.iter().enumerate() {
2984 if i != j && uniqueness_violation(vi, vj) {
2985 return Err(DurableCatalogError::UniquenessViolation);
2986 }
2987 }
2988 }
2989 }
2990 }
2991 soft_assert_no_log!(
2992 self.pending
2993 .values()
2994 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
2995 "pending should be sorted by timestamp: {:?}",
2996 self.pending
2997 );
2998 Ok(())
2999 }
3000
3001 fn verify_keys<'a>(
3006 &self,
3007 keys: impl IntoIterator<Item = &'a K>,
3008 ) -> Result<(), DurableCatalogError>
3009 where
3010 K: 'a,
3011 {
3012 if let Some(uniqueness_violation) = self.uniqueness_violation {
3013 let entries: Vec<_> = keys
3014 .into_iter()
3015 .filter_map(|key| self.get(key).map(|value| (key, value)))
3016 .collect();
3017 for (ki, vi) in self.items() {
3019 for (kj, vj) in &entries {
3020 if ki != *kj && uniqueness_violation(vi, vj) {
3021 return Err(DurableCatalogError::UniquenessViolation);
3022 }
3023 }
3024 }
3025 }
3026 soft_assert_no_log!(self.verify().is_ok());
3027 Ok(())
3028 }
3029
3030 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3033 let mut seen = BTreeSet::new();
3034 for k in self.pending.keys() {
3035 seen.insert(k);
3036 let v = self.get(k);
3037 if let Some(v) = v {
3040 f(k, v);
3041 }
3042 }
3043 for (k, v) in self.initial.iter() {
3044 if !seen.contains(k) {
3046 f(k, v);
3047 }
3048 }
3049 }
3050
3051 fn get(&self, k: &K) -> Option<&V> {
3053 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3054 let mut updates = Vec::with_capacity(pending.len() + 1);
3055 if let Some(initial) = self.initial.get(k) {
3056 updates.push((initial, Diff::ONE));
3057 }
3058 updates.extend(
3059 pending
3060 .into_iter()
3061 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3062 );
3063
3064 differential_dataflow::consolidation::consolidate(&mut updates);
3065 assert!(updates.len() <= 1);
3066 updates.into_iter().next().map(|(v, _)| v)
3067 }
3068
3069 #[cfg(test)]
3074 fn items_cloned(&self) -> BTreeMap<K, V> {
3075 let mut items = BTreeMap::new();
3076 self.for_values(|k, v| {
3077 items.insert(k.clone(), v.clone());
3078 });
3079 items
3080 }
3081
3082 fn items(&self) -> BTreeMap<&K, &V> {
3085 let mut items = BTreeMap::new();
3086 self.for_values(|k, v| {
3087 items.insert(k, v);
3088 });
3089 items
3090 }
3091
3092 fn values(&self) -> BTreeSet<&V> {
3094 let mut items = BTreeSet::new();
3095 self.for_values(|_, v| {
3096 items.insert(v);
3097 });
3098 items
3099 }
3100
3101 fn len(&self) -> usize {
3103 let mut count = 0;
3104 self.for_values(|_, _| {
3105 count += 1;
3106 });
3107 count
3108 }
3109
3110 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3114 &mut self,
3115 mut f: F,
3116 ) {
3117 let mut pending = BTreeMap::new();
3118 self.for_values(|k, v| f(&mut pending, k, v));
3119 for (k, updates) in pending {
3120 self.pending.entry(k).or_default().extend(updates);
3121 }
3122 }
3123
3124 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3128 let mut violation = None;
3129 self.for_values(|for_k, for_v| {
3130 if &k == for_k {
3131 violation = Some(DurableCatalogError::DuplicateKey);
3132 }
3133 if let Some(uniqueness_violation) = self.uniqueness_violation {
3134 if uniqueness_violation(for_v, &v) {
3135 violation = Some(DurableCatalogError::UniquenessViolation);
3136 }
3137 }
3138 });
3139 if let Some(violation) = violation {
3140 return Err(violation);
3141 }
3142 self.pending.entry(k).or_default().push(TransactionUpdate {
3143 value: v,
3144 ts,
3145 diff: Diff::ONE,
3146 });
3147 soft_assert_no_log!(self.verify().is_ok());
3148 Ok(())
3149 }
3150
3151 fn update<F: Fn(&K, &V) -> Option<V>>(
3160 &mut self,
3161 f: F,
3162 ts: Timestamp,
3163 ) -> Result<Diff, DurableCatalogError> {
3164 let mut changed = Diff::ZERO;
3165 let mut keys = BTreeSet::new();
3166 let pending = self.pending.clone();
3168 self.for_values_mut(|p, k, v| {
3169 if let Some(next) = f(k, v) {
3170 changed += Diff::ONE;
3171 keys.insert(k.clone());
3172 let updates = p.entry(k.clone()).or_default();
3173 updates.push(TransactionUpdate {
3174 value: v.clone(),
3175 ts,
3176 diff: Diff::MINUS_ONE,
3177 });
3178 updates.push(TransactionUpdate {
3179 value: next,
3180 ts,
3181 diff: Diff::ONE,
3182 });
3183 }
3184 });
3185 if let Err(err) = self.verify_keys(&keys) {
3187 self.pending = pending;
3188 Err(err)
3189 } else {
3190 Ok(changed)
3191 }
3192 }
3193
3194 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3199 if let Some(cur_v) = self.get(&k) {
3200 if v != *cur_v {
3201 self.set(k, Some(v), ts)?;
3202 }
3203 Ok(true)
3204 } else {
3205 Ok(false)
3206 }
3207 }
3208
3209 fn update_by_keys(
3214 &mut self,
3215 kvs: impl IntoIterator<Item = (K, V)>,
3216 ts: Timestamp,
3217 ) -> Result<Diff, DurableCatalogError> {
3218 let kvs: Vec<_> = kvs
3219 .into_iter()
3220 .filter_map(|(k, v)| match self.get(&k) {
3221 Some(cur_v) => Some((*cur_v == v, k, v)),
3223 None => None,
3224 })
3225 .collect();
3226 let changed = kvs.len();
3227 let changed =
3228 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3229 let kvs = kvs
3230 .into_iter()
3231 .filter(|(no_op, _, _)| !no_op)
3233 .map(|(_, k, v)| (k, Some(v)))
3234 .collect();
3235 self.set_many(kvs, ts)?;
3236 Ok(changed)
3237 }
3238
3239 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3246 let prev = self.get(&k).cloned();
3247 let entry = self.pending.entry(k.clone()).or_default();
3248 let restore_len = entry.len();
3249
3250 match (v, prev.clone()) {
3251 (Some(v), Some(prev)) => {
3252 entry.push(TransactionUpdate {
3253 value: prev,
3254 ts,
3255 diff: Diff::MINUS_ONE,
3256 });
3257 entry.push(TransactionUpdate {
3258 value: v,
3259 ts,
3260 diff: Diff::ONE,
3261 });
3262 }
3263 (Some(v), None) => {
3264 entry.push(TransactionUpdate {
3265 value: v,
3266 ts,
3267 diff: Diff::ONE,
3268 });
3269 }
3270 (None, Some(prev)) => {
3271 entry.push(TransactionUpdate {
3272 value: prev,
3273 ts,
3274 diff: Diff::MINUS_ONE,
3275 });
3276 }
3277 (None, None) => {}
3278 }
3279
3280 if let Err(err) = self.verify_keys([&k]) {
3282 let pending = self.pending.get_mut(&k).expect("inserted above");
3285 pending.truncate(restore_len);
3286 Err(err)
3287 } else {
3288 Ok(prev)
3289 }
3290 }
3291
3292 fn set_many(
3297 &mut self,
3298 kvs: BTreeMap<K, Option<V>>,
3299 ts: Timestamp,
3300 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3301 if kvs.is_empty() {
3302 return Ok(BTreeMap::new());
3303 }
3304
3305 let mut prevs = BTreeMap::new();
3306 let mut restores = BTreeMap::new();
3307
3308 for (k, v) in kvs {
3309 let prev = self.get(&k).cloned();
3310 let entry = self.pending.entry(k.clone()).or_default();
3311 restores.insert(k.clone(), entry.len());
3312
3313 match (v, prev.clone()) {
3314 (Some(v), Some(prev)) => {
3315 entry.push(TransactionUpdate {
3316 value: prev,
3317 ts,
3318 diff: Diff::MINUS_ONE,
3319 });
3320 entry.push(TransactionUpdate {
3321 value: v,
3322 ts,
3323 diff: Diff::ONE,
3324 });
3325 }
3326 (Some(v), None) => {
3327 entry.push(TransactionUpdate {
3328 value: v,
3329 ts,
3330 diff: Diff::ONE,
3331 });
3332 }
3333 (None, Some(prev)) => {
3334 entry.push(TransactionUpdate {
3335 value: prev,
3336 ts,
3337 diff: Diff::MINUS_ONE,
3338 });
3339 }
3340 (None, None) => {}
3341 }
3342
3343 prevs.insert(k, prev);
3344 }
3345
3346 if let Err(err) = self.verify_keys(prevs.keys()) {
3348 for (k, restore_len) in restores {
3349 let pending = self.pending.get_mut(&k).expect("inserted above");
3352 pending.truncate(restore_len);
3353 }
3354 Err(err)
3355 } else {
3356 Ok(prevs)
3357 }
3358 }
3359
3360 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3366 let mut deleted = Vec::new();
3367 self.for_values_mut(|p, k, v| {
3368 if f(k, v) {
3369 deleted.push((k.clone(), v.clone()));
3370 p.entry(k.clone()).or_default().push(TransactionUpdate {
3371 value: v.clone(),
3372 ts,
3373 diff: Diff::MINUS_ONE,
3374 });
3375 }
3376 });
3377 soft_assert_no_log!(self.verify().is_ok());
3378 deleted
3379 }
3380
3381 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3385 self.set(k, None, ts)
3386 .expect("deleting an entry cannot violate uniqueness")
3387 }
3388
3389 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3393 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3394 let prevs = self
3395 .set_many(kvs, ts)
3396 .expect("deleting entries cannot violate uniqueness");
3397 prevs
3398 .into_iter()
3399 .filter_map(|(k, v)| v.map(|v| (k, v)))
3400 .collect()
3401 }
3402}
3403
3404#[cfg(test)]
3405#[allow(clippy::unwrap_used)]
3406mod tests {
3407 use super::*;
3408 use mz_ore::{assert_none, assert_ok};
3409
3410 use mz_ore::now::SYSTEM_TIME;
3411 use mz_persist_client::PersistClient;
3412
3413 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3414 use crate::memory;
3415
3416 #[mz_ore::test]
3417 fn test_table_transaction_simple() {
3418 fn uniqueness_violation(a: &String, b: &String) -> bool {
3419 a == b
3420 }
3421 let mut table = TableTransaction::new_with_uniqueness_fn(
3422 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3423 uniqueness_violation,
3424 )
3425 .unwrap();
3426
3427 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3430 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3431 assert!(
3432 table
3433 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3434 .is_err()
3435 );
3436 assert!(
3437 table
3438 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3439 .is_err()
3440 );
3441 }
3442
3443 #[mz_ore::test]
3444 fn test_table_transaction() {
3445 fn uniqueness_violation(a: &String, b: &String) -> bool {
3446 a == b
3447 }
3448 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3449
3450 fn commit(
3451 table: &mut BTreeMap<Vec<u8>, String>,
3452 mut pending: Vec<(Vec<u8>, String, Diff)>,
3453 ) {
3454 pending.sort_by(|a, b| a.2.cmp(&b.2));
3456 for (k, v, diff) in pending {
3457 if diff == Diff::MINUS_ONE {
3458 let prev = table.remove(&k);
3459 assert_eq!(prev, Some(v));
3460 } else if diff == Diff::ONE {
3461 let prev = table.insert(k, v);
3462 assert_eq!(prev, None);
3463 } else {
3464 panic!("unexpected diff: {diff}");
3465 }
3466 }
3467 }
3468
3469 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3470 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3471 let mut table_txn =
3472 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3473 assert_eq!(table_txn.items_cloned(), table);
3474 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3475 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3476 assert_eq!(
3477 table_txn.items_cloned(),
3478 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3479 );
3480 assert_eq!(
3481 table_txn
3482 .update(|_k, _v| Some("v3".to_string()), 2)
3483 .unwrap(),
3484 Diff::ONE
3485 );
3486
3487 table_txn
3489 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3490 .unwrap_err();
3491
3492 table_txn
3493 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3494 .unwrap();
3495 assert_eq!(
3496 table_txn.items_cloned(),
3497 BTreeMap::from([
3498 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3499 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3500 ])
3501 );
3502 let err = table_txn
3503 .update(|_k, _v| Some("v1".to_string()), 5)
3504 .unwrap_err();
3505 assert!(
3506 matches!(err, DurableCatalogError::UniquenessViolation),
3507 "unexpected err: {err:?}"
3508 );
3509 let pending = table_txn.pending();
3510 assert_eq!(
3511 pending,
3512 vec![
3513 (
3514 1i64.to_le_bytes().to_vec(),
3515 "v1".to_string(),
3516 Diff::MINUS_ONE
3517 ),
3518 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3519 (
3520 2i64.to_le_bytes().to_vec(),
3521 "v2".to_string(),
3522 Diff::MINUS_ONE
3523 ),
3524 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3525 ]
3526 );
3527 commit(&mut table, pending);
3528 assert_eq!(
3529 table,
3530 BTreeMap::from([
3531 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3532 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3533 ])
3534 );
3535
3536 let mut table_txn =
3537 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3538 assert_eq!(
3540 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3541 1
3542 );
3543 table_txn
3544 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3545 .unwrap();
3546 table_txn
3548 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3549 .unwrap_err();
3550 table_txn
3552 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3553 .unwrap_err();
3554 assert_eq!(
3555 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3556 1
3557 );
3558 table_txn
3560 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3561 .unwrap();
3562 table_txn
3563 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3564 .unwrap();
3565 let pending = table_txn.pending();
3566 assert_eq!(
3567 pending,
3568 vec![
3569 (
3570 1i64.to_le_bytes().to_vec(),
3571 "v3".to_string(),
3572 Diff::MINUS_ONE
3573 ),
3574 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3575 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3576 ]
3577 );
3578 commit(&mut table, pending);
3579 assert_eq!(
3580 table,
3581 BTreeMap::from([
3582 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3583 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3584 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3585 ])
3586 );
3587
3588 let mut table_txn =
3589 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3590 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3591 table_txn
3592 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3593 .unwrap();
3594
3595 commit(&mut table, table_txn.pending());
3596 assert_eq!(
3597 table,
3598 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3599 );
3600
3601 let mut table_txn =
3602 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3603 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3604 table_txn
3605 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3606 .unwrap();
3607 commit(&mut table, table_txn.pending());
3608 assert_eq!(
3609 table,
3610 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3611 );
3612
3613 let mut table_txn =
3615 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3616 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3617 table_txn
3618 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3619 .unwrap();
3620 table_txn
3621 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3622 .unwrap_err();
3623 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3624 table_txn
3625 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3626 .unwrap();
3627 commit(&mut table, table_txn.pending());
3628 assert_eq!(
3629 table.clone().into_iter().collect::<Vec<_>>(),
3630 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3631 );
3632
3633 let mut table_txn =
3635 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3636 table_txn
3638 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3639 .unwrap_err();
3640 table_txn
3641 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3642 .unwrap();
3643 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3644 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3645 let pending = table_txn.pending();
3646 assert_eq!(
3647 pending,
3648 vec![
3649 (
3650 1i64.to_le_bytes().to_vec(),
3651 "v5".to_string(),
3652 Diff::MINUS_ONE
3653 ),
3654 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3655 ]
3656 );
3657 commit(&mut table, pending);
3658 assert_eq!(
3659 table,
3660 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3661 );
3662
3663 let mut table_txn =
3665 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3666 table_txn
3667 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3668 .unwrap();
3669 let pending = table_txn.pending::<Vec<u8>, String>();
3670 assert!(pending.is_empty());
3671
3672 let mut table_txn =
3674 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3675 table_txn
3677 .set_many(
3678 BTreeMap::from([
3679 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3680 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3681 ]),
3682 0,
3683 )
3684 .unwrap_err();
3685 table_txn
3686 .set_many(
3687 BTreeMap::from([
3688 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3689 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3690 ]),
3691 1,
3692 )
3693 .unwrap();
3694 table_txn
3695 .set_many(
3696 BTreeMap::from([
3697 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3698 (3i64.to_le_bytes().to_vec(), None),
3699 ]),
3700 2,
3701 )
3702 .unwrap();
3703 let pending = table_txn.pending();
3704 assert_eq!(
3705 pending,
3706 vec![
3707 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3708 (
3709 3i64.to_le_bytes().to_vec(),
3710 "v6".to_string(),
3711 Diff::MINUS_ONE
3712 ),
3713 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3714 ]
3715 );
3716 commit(&mut table, pending);
3717 assert_eq!(
3718 table,
3719 BTreeMap::from([
3720 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3721 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3722 ])
3723 );
3724
3725 let mut table_txn =
3727 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3728 table_txn
3729 .set_many(
3730 BTreeMap::from([
3731 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3732 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3733 ]),
3734 0,
3735 )
3736 .unwrap();
3737 let pending = table_txn.pending::<Vec<u8>, String>();
3738 assert!(pending.is_empty());
3739 commit(&mut table, pending);
3740 assert_eq!(
3741 table,
3742 BTreeMap::from([
3743 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3744 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3745 ])
3746 );
3747
3748 let mut table_txn =
3750 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3751 table_txn
3753 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3754 .unwrap_err();
3755 assert!(
3756 table_txn
3757 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3758 .unwrap()
3759 );
3760 assert!(
3761 !table_txn
3762 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3763 .unwrap()
3764 );
3765 let pending = table_txn.pending();
3766 assert_eq!(
3767 pending,
3768 vec![
3769 (
3770 1i64.to_le_bytes().to_vec(),
3771 "v6".to_string(),
3772 Diff::MINUS_ONE
3773 ),
3774 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3775 ]
3776 );
3777 commit(&mut table, pending);
3778 assert_eq!(
3779 table,
3780 BTreeMap::from([
3781 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3782 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3783 ])
3784 );
3785
3786 let mut table_txn =
3788 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3789 assert!(
3790 table_txn
3791 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3792 .unwrap()
3793 );
3794 let pending = table_txn.pending::<Vec<u8>, String>();
3795 assert!(pending.is_empty());
3796 commit(&mut table, pending);
3797 assert_eq!(
3798 table,
3799 BTreeMap::from([
3800 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3801 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3802 ])
3803 );
3804
3805 let mut table_txn =
3807 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3808 table_txn
3810 .update_by_keys(
3811 [
3812 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3813 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3814 ],
3815 0,
3816 )
3817 .unwrap_err();
3818 let n = table_txn
3819 .update_by_keys(
3820 [
3821 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3822 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3823 ],
3824 1,
3825 )
3826 .unwrap();
3827 assert_eq!(n, Diff::ONE);
3828 let n = table_txn
3829 .update_by_keys(
3830 [
3831 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3832 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3833 ],
3834 2,
3835 )
3836 .unwrap();
3837 assert_eq!(n, Diff::ZERO);
3838 let pending = table_txn.pending();
3839 assert_eq!(
3840 pending,
3841 vec![
3842 (
3843 1i64.to_le_bytes().to_vec(),
3844 "v8".to_string(),
3845 Diff::MINUS_ONE
3846 ),
3847 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3848 ]
3849 );
3850 commit(&mut table, pending);
3851 assert_eq!(
3852 table,
3853 BTreeMap::from([
3854 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3855 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3856 ])
3857 );
3858
3859 let mut table_txn =
3861 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3862 let n = table_txn
3863 .update_by_keys(
3864 [
3865 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3866 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3867 ],
3868 0,
3869 )
3870 .unwrap();
3871 assert_eq!(n, Diff::from(2));
3872 let pending = table_txn.pending::<Vec<u8>, String>();
3873 assert!(pending.is_empty());
3874 commit(&mut table, pending);
3875 assert_eq!(
3876 table,
3877 BTreeMap::from([
3878 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3879 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3880 ])
3881 );
3882
3883 let mut table_txn =
3885 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3886 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3887 assert_eq!(prev, Some("v9".to_string()));
3888 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3889 assert_none!(prev);
3890 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3891 assert_none!(prev);
3892 let pending = table_txn.pending();
3893 assert_eq!(
3894 pending,
3895 vec![(
3896 1i64.to_le_bytes().to_vec(),
3897 "v9".to_string(),
3898 Diff::MINUS_ONE
3899 ),]
3900 );
3901 commit(&mut table, pending);
3902 assert_eq!(
3903 table,
3904 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3905 );
3906
3907 let mut table_txn =
3909 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3910 let prevs = table_txn.delete_by_keys(
3911 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3912 0,
3913 );
3914 assert_eq!(
3915 prevs,
3916 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3917 );
3918 let prevs = table_txn.delete_by_keys(
3919 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3920 1,
3921 );
3922 assert_eq!(prevs, vec![]);
3923 let prevs = table_txn.delete_by_keys(
3924 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3925 2,
3926 );
3927 assert_eq!(prevs, vec![]);
3928 let pending = table_txn.pending();
3929 assert_eq!(
3930 pending,
3931 vec![(
3932 42i64.to_le_bytes().to_vec(),
3933 "v7".to_string(),
3934 Diff::MINUS_ONE
3935 ),]
3936 );
3937 commit(&mut table, pending);
3938 assert_eq!(table, BTreeMap::new());
3939 }
3940
3941 #[mz_ore::test(tokio::test)]
3942 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
3944 let persist_client = PersistClient::new_for_tests().await;
3945 let state_builder =
3946 TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
3947
3948 let _ = state_builder
3950 .clone()
3951 .unwrap_build()
3952 .await
3953 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3954 .await
3955 .unwrap()
3956 .0;
3957 let mut savepoint_state = state_builder
3958 .unwrap_build()
3959 .await
3960 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3961 .await
3962 .unwrap()
3963 .0;
3964
3965 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3966 assert!(!initial_snapshot.is_empty());
3967
3968 let db_name = "db";
3969 let db_owner = RoleId::User(42);
3970 let db_privileges = Vec::new();
3971 let mut txn = savepoint_state.transaction().await.unwrap();
3972 let (db_id, db_oid) = txn
3973 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
3974 .unwrap();
3975 let commit_ts = txn.upper();
3976 txn.commit_internal(commit_ts).await.unwrap();
3977 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
3978 let update = updates.into_element();
3979
3980 assert_eq!(update.diff, StateDiff::Addition);
3981
3982 let db = match update.kind {
3983 memory::objects::StateUpdateKind::Database(db) => db,
3984 update => panic!("unexpected update: {update:?}"),
3985 };
3986
3987 assert_eq!(db_id, db.id);
3988 assert_eq!(db_oid, db.oid);
3989 assert_eq!(db_name, db.name);
3990 assert_eq!(db_owner, db.owner_id);
3991 assert_eq!(db_privileges, db.privileges);
3992 }
3993
3994 #[mz_ore::test]
3995 fn test_allocate_introspection_source_index_id() {
3996 let cluster_variant: u8 = 0b0000_0001;
3997 let cluster_id_inner: u64 =
3998 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
3999 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4000
4001 let cluster_id = ClusterId::System(cluster_id_inner);
4002 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4003
4004 let introspection_source_index_id: u64 =
4005 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4006
4007 {
4009 let mut cluster_variant_mask = 0xFF << 56;
4010 cluster_variant_mask &= introspection_source_index_id;
4011 cluster_variant_mask >>= 56;
4012 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4013 }
4014
4015 {
4017 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4018 cluster_id_inner_mask &= introspection_source_index_id;
4019 cluster_id_inner_mask >>= 8;
4020 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4021 }
4022
4023 {
4025 let mut log_variant_mask = 0xFF;
4026 log_variant_mask &= introspection_source_index_id;
4027 assert_eq!(
4028 log_variant_mask,
4029 u64::from(timely_messages_received_log_variant)
4030 );
4031 }
4032
4033 let (catalog_item_id, global_id) =
4034 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4035
4036 assert_eq!(
4037 catalog_item_id,
4038 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4039 );
4040 assert_eq!(
4041 global_id,
4042 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4043 );
4044 }
4045}