1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68 SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69 USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70 USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81 #[derivative(Debug = "ignore")]
82 #[derivative(PartialEq = "ignore")]
83 durable_catalog: &'a mut dyn DurableCatalogState,
84 databases: TableTransaction<DatabaseKey, DatabaseValue>,
85 schemas: TableTransaction<SchemaKey, SchemaValue>,
86 items: TableTransaction<ItemKey, ItemValue>,
87 comments: TableTransaction<CommentKey, CommentValue>,
88 roles: TableTransaction<RoleKey, RoleValue>,
89 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90 clusters: TableTransaction<ClusterKey, ClusterValue>,
91 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92 introspection_sources:
93 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95 configs: TableTransaction<ConfigKey, ConfigValue>,
96 settings: TableTransaction<SettingKey, SettingValue>,
97 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103 storage_collection_metadata:
104 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110 upper: mz_repr::Timestamp,
112 op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117 pub fn new(
118 durable_catalog: &'a mut dyn DurableCatalogState,
119 Snapshot {
120 databases,
121 schemas,
122 roles,
123 role_auth,
124 items,
125 comments,
126 clusters,
127 network_policies,
128 cluster_replicas,
129 introspection_sources,
130 id_allocator,
131 configs,
132 settings,
133 source_references,
134 system_object_mappings,
135 system_configurations,
136 default_privileges,
137 system_privileges,
138 storage_collection_metadata,
139 unfinalized_shards,
140 txn_wal_shard,
141 }: Snapshot,
142 upper: mz_repr::Timestamp,
143 ) -> Result<Transaction<'a>, CatalogError> {
144 Ok(Transaction {
145 durable_catalog,
146 databases: TableTransaction::new_with_uniqueness_fn(
147 databases,
148 |a: &DatabaseValue, b| a.name == b.name,
149 )?,
150 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151 a.database_id == b.database_id && a.name == b.name
152 })?,
153 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154 a.schema_id == b.schema_id && a.name == b.name && {
155 let a_type = a.item_type();
157 let b_type = b.item_type();
158 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161 }
162 })?,
163 comments: TableTransaction::new(comments)?,
164 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165 a.name == b.name
166 })?,
167 role_auth: TableTransaction::new(role_auth)?,
168 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169 a.name == b.name
170 })?,
171 network_policies: TableTransaction::new_with_uniqueness_fn(
172 network_policies,
173 |a: &NetworkPolicyValue, b| a.name == b.name,
174 )?,
175 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176 cluster_replicas,
177 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178 )?,
179 introspection_sources: TableTransaction::new(introspection_sources)?,
180 id_allocator: TableTransaction::new(id_allocator)?,
181 configs: TableTransaction::new(configs)?,
182 settings: TableTransaction::new(settings)?,
183 source_references: TableTransaction::new(source_references)?,
184 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185 system_configurations: TableTransaction::new(system_configurations)?,
186 default_privileges: TableTransaction::new(default_privileges)?,
187 system_privileges: TableTransaction::new(system_privileges)?,
188 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194 audit_log_updates: Vec::new(),
195 upper,
196 op_id: 0,
197 })
198 }
199
200 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201 let key = ItemKey { id: *id };
202 self.items
203 .get(&key)
204 .map(|v| DurableType::from_key_value(key, v.clone()))
205 }
206
207 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208 self.items
209 .items()
210 .into_iter()
211 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212 .sorted_by_key(|Item { id, .. }| *id)
213 }
214
215 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216 self.insert_audit_log_events([event]);
217 }
218
219 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220 let events = events
221 .into_iter()
222 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223 self.audit_log_updates.extend(events);
224 }
225
226 pub fn insert_user_database(
227 &mut self,
228 database_name: &str,
229 owner_id: RoleId,
230 privileges: Vec<MzAclItem>,
231 temporary_oids: &HashSet<u32>,
232 ) -> Result<(DatabaseId, u32), CatalogError> {
233 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234 let id = DatabaseId::User(id);
235 let oid = self.allocate_oid(temporary_oids)?;
236 self.insert_database(id, database_name, owner_id, privileges, oid)?;
237 Ok((id, oid))
238 }
239
240 pub(crate) fn insert_database(
241 &mut self,
242 id: DatabaseId,
243 database_name: &str,
244 owner_id: RoleId,
245 privileges: Vec<MzAclItem>,
246 oid: u32,
247 ) -> Result<u32, CatalogError> {
248 match self.databases.insert(
249 DatabaseKey { id },
250 DatabaseValue {
251 name: database_name.to_string(),
252 owner_id,
253 privileges,
254 oid,
255 },
256 self.op_id,
257 ) {
258 Ok(_) => Ok(oid),
259 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260 }
261 }
262
263 pub fn insert_user_schema(
264 &mut self,
265 database_id: DatabaseId,
266 schema_name: &str,
267 owner_id: RoleId,
268 privileges: Vec<MzAclItem>,
269 temporary_oids: &HashSet<u32>,
270 ) -> Result<(SchemaId, u32), CatalogError> {
271 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272 let id = SchemaId::User(id);
273 let oid = self.allocate_oid(temporary_oids)?;
274 self.insert_schema(
275 id,
276 Some(database_id),
277 schema_name.to_string(),
278 owner_id,
279 privileges,
280 oid,
281 )?;
282 Ok((id, oid))
283 }
284
285 pub fn insert_system_schema(
286 &mut self,
287 schema_id: u64,
288 schema_name: &str,
289 owner_id: RoleId,
290 privileges: Vec<MzAclItem>,
291 oid: u32,
292 ) -> Result<(), CatalogError> {
293 let id = SchemaId::System(schema_id);
294 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295 }
296
297 pub(crate) fn insert_schema(
298 &mut self,
299 schema_id: SchemaId,
300 database_id: Option<DatabaseId>,
301 schema_name: String,
302 owner_id: RoleId,
303 privileges: Vec<MzAclItem>,
304 oid: u32,
305 ) -> Result<(), CatalogError> {
306 match self.schemas.insert(
307 SchemaKey { id: schema_id },
308 SchemaValue {
309 database_id,
310 name: schema_name.clone(),
311 owner_id,
312 privileges,
313 oid,
314 },
315 self.op_id,
316 ) {
317 Ok(_) => Ok(()),
318 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319 }
320 }
321
322 pub fn insert_builtin_role(
323 &mut self,
324 id: RoleId,
325 name: String,
326 attributes: RoleAttributesRaw,
327 membership: RoleMembership,
328 vars: RoleVars,
329 oid: u32,
330 ) -> Result<RoleId, CatalogError> {
331 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332 self.insert_role(id, name, attributes, membership, vars, oid)?;
333 Ok(id)
334 }
335
336 pub fn insert_user_role(
337 &mut self,
338 name: String,
339 attributes: RoleAttributesRaw,
340 membership: RoleMembership,
341 vars: RoleVars,
342 temporary_oids: &HashSet<u32>,
343 ) -> Result<(RoleId, u32), CatalogError> {
344 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345 let id = RoleId::User(id);
346 let oid = self.allocate_oid(temporary_oids)?;
347 self.insert_role(id, name, attributes, membership, vars, oid)?;
348 Ok((id, oid))
349 }
350
351 fn insert_role(
352 &mut self,
353 id: RoleId,
354 name: String,
355 attributes: RoleAttributesRaw,
356 membership: RoleMembership,
357 vars: RoleVars,
358 oid: u32,
359 ) -> Result<(), CatalogError> {
360 if let Some(ref password) = attributes.password {
361 let hash = mz_auth::hash::scram256_hash(
362 password,
363 &attributes
364 .scram_iterations
365 .or_else(|| {
366 soft_panic_or_log!(
367 "Hash iterations must be set if a password is provided."
368 );
369 None
370 })
371 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374 )
375 .expect("password hash should be valid");
376 match self.role_auth.insert(
377 RoleAuthKey { role_id: id },
378 RoleAuthValue {
379 password_hash: Some(hash),
380 updated_at: SYSTEM_TIME(),
381 },
382 self.op_id,
383 ) {
384 Ok(_) => {}
385 Err(_) => {
386 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387 }
388 }
389 }
390
391 match self.roles.insert(
392 RoleKey { id },
393 RoleValue {
394 name: name.clone(),
395 attributes: attributes.into(),
396 membership,
397 vars,
398 oid,
399 },
400 self.op_id,
401 ) {
402 Ok(_) => Ok(()),
403 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404 }
405 }
406
407 pub fn insert_user_cluster(
409 &mut self,
410 cluster_id: ClusterId,
411 cluster_name: &str,
412 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413 owner_id: RoleId,
414 privileges: Vec<MzAclItem>,
415 config: ClusterConfig,
416 temporary_oids: &HashSet<u32>,
417 ) -> Result<(), CatalogError> {
418 self.insert_cluster(
419 cluster_id,
420 cluster_name,
421 introspection_source_indexes,
422 owner_id,
423 privileges,
424 config,
425 temporary_oids,
426 )
427 }
428
429 pub fn insert_system_cluster(
431 &mut self,
432 cluster_name: &str,
433 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434 privileges: Vec<MzAclItem>,
435 owner_id: RoleId,
436 config: ClusterConfig,
437 temporary_oids: &HashSet<u32>,
438 ) -> Result<(), CatalogError> {
439 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441 self.insert_cluster(
442 cluster_id,
443 cluster_name,
444 introspection_source_indexes,
445 owner_id,
446 privileges,
447 config,
448 temporary_oids,
449 )
450 }
451
452 fn insert_cluster(
453 &mut self,
454 cluster_id: ClusterId,
455 cluster_name: &str,
456 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457 owner_id: RoleId,
458 privileges: Vec<MzAclItem>,
459 config: ClusterConfig,
460 temporary_oids: &HashSet<u32>,
461 ) -> Result<(), CatalogError> {
462 if let Err(_) = self.clusters.insert(
463 ClusterKey { id: cluster_id },
464 ClusterValue {
465 name: cluster_name.to_string(),
466 owner_id,
467 privileges,
468 config,
469 },
470 self.op_id,
471 ) {
472 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473 };
474
475 let amount = usize_to_u64(introspection_source_indexes.len());
476 let oids = self.allocate_oids(amount, temporary_oids)?;
477 let introspection_source_indexes: Vec<_> = introspection_source_indexes
478 .into_iter()
479 .zip_eq(oids)
480 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481 .collect();
482 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483 let introspection_source_index = IntrospectionSourceIndex {
484 cluster_id,
485 name: builtin.name.to_string(),
486 item_id,
487 index_id,
488 oid,
489 };
490 let (key, value) = introspection_source_index.into_key_value();
491 self.introspection_sources
492 .insert(key, value, self.op_id)
493 .expect("no uniqueness violation");
494 }
495
496 Ok(())
497 }
498
499 pub fn rename_cluster(
500 &mut self,
501 cluster_id: ClusterId,
502 cluster_name: &str,
503 cluster_to_name: &str,
504 ) -> Result<(), CatalogError> {
505 let key = ClusterKey { id: cluster_id };
506
507 match self.clusters.update(
508 |k, v| {
509 if *k == key {
510 let mut value = v.clone();
511 value.name = cluster_to_name.to_string();
512 Some(value)
513 } else {
514 None
515 }
516 },
517 self.op_id,
518 )? {
519 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520 Diff::ONE => Ok(()),
521 n => panic!(
522 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523 ),
524 }
525 }
526
527 pub fn rename_cluster_replica(
528 &mut self,
529 replica_id: ReplicaId,
530 replica_name: &QualifiedReplica,
531 replica_to_name: &str,
532 ) -> Result<(), CatalogError> {
533 let key = ClusterReplicaKey { id: replica_id };
534
535 match self.cluster_replicas.update(
536 |k, v| {
537 if *k == key {
538 let mut value = v.clone();
539 value.name = replica_to_name.to_string();
540 Some(value)
541 } else {
542 None
543 }
544 },
545 self.op_id,
546 )? {
547 Diff::ZERO => {
548 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549 }
550 Diff::ONE => Ok(()),
551 n => panic!(
552 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553 ),
554 }
555 }
556
557 pub fn insert_cluster_replica(
558 &mut self,
559 cluster_id: ClusterId,
560 replica_name: &str,
561 config: ReplicaConfig,
562 owner_id: RoleId,
563 ) -> Result<ReplicaId, CatalogError> {
564 let replica_id = match cluster_id {
565 ClusterId::System(_) => self.allocate_system_replica_id()?,
566 ClusterId::User(_) => self.allocate_user_replica_id()?,
567 };
568 self.insert_cluster_replica_with_id(
569 cluster_id,
570 replica_id,
571 replica_name,
572 config,
573 owner_id,
574 )?;
575 Ok(replica_id)
576 }
577
578 pub(crate) fn insert_cluster_replica_with_id(
579 &mut self,
580 cluster_id: ClusterId,
581 replica_id: ReplicaId,
582 replica_name: &str,
583 config: ReplicaConfig,
584 owner_id: RoleId,
585 ) -> Result<(), CatalogError> {
586 if let Err(_) = self.cluster_replicas.insert(
587 ClusterReplicaKey { id: replica_id },
588 ClusterReplicaValue {
589 cluster_id,
590 name: replica_name.into(),
591 config,
592 owner_id,
593 },
594 self.op_id,
595 ) {
596 let cluster = self
597 .clusters
598 .get(&ClusterKey { id: cluster_id })
599 .expect("cluster exists");
600 return Err(SqlCatalogError::DuplicateReplica(
601 replica_name.to_string(),
602 cluster.name.to_string(),
603 )
604 .into());
605 };
606 Ok(())
607 }
608
609 pub fn insert_user_network_policy(
610 &mut self,
611 name: String,
612 rules: Vec<NetworkPolicyRule>,
613 privileges: Vec<MzAclItem>,
614 owner_id: RoleId,
615 temporary_oids: &HashSet<u32>,
616 ) -> Result<NetworkPolicyId, CatalogError> {
617 let oid = self.allocate_oid(temporary_oids)?;
618 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619 let id = NetworkPolicyId::User(id);
620 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621 }
622
623 pub fn insert_network_policy(
624 &mut self,
625 id: NetworkPolicyId,
626 name: String,
627 rules: Vec<NetworkPolicyRule>,
628 privileges: Vec<MzAclItem>,
629 owner_id: RoleId,
630 oid: u32,
631 ) -> Result<NetworkPolicyId, CatalogError> {
632 match self.network_policies.insert(
633 NetworkPolicyKey { id },
634 NetworkPolicyValue {
635 name: name.clone(),
636 rules,
637 privileges,
638 owner_id,
639 oid,
640 },
641 self.op_id,
642 ) {
643 Ok(_) => Ok(id),
644 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645 }
646 }
647
648 pub fn update_introspection_source_index_gids(
653 &mut self,
654 mappings: impl Iterator<
655 Item = (
656 ClusterId,
657 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658 ),
659 >,
660 ) -> Result<(), CatalogError> {
661 for (cluster_id, updates) in mappings {
662 for (name, item_id, index_id, oid) in updates {
663 let introspection_source_index = IntrospectionSourceIndex {
664 cluster_id,
665 name,
666 item_id,
667 index_id,
668 oid,
669 };
670 let (key, value) = introspection_source_index.into_key_value();
671
672 let prev = self
673 .introspection_sources
674 .set(key, Some(value), self.op_id)?;
675 if prev.is_none() {
676 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677 "{index_id}"
678 ))
679 .into());
680 }
681 }
682 }
683 Ok(())
684 }
685
686 pub fn insert_user_item(
687 &mut self,
688 id: CatalogItemId,
689 global_id: GlobalId,
690 schema_id: SchemaId,
691 item_name: &str,
692 create_sql: String,
693 owner_id: RoleId,
694 privileges: Vec<MzAclItem>,
695 temporary_oids: &HashSet<u32>,
696 versions: BTreeMap<RelationVersion, GlobalId>,
697 ) -> Result<u32, CatalogError> {
698 let oid = self.allocate_oid(temporary_oids)?;
699 self.insert_item(
700 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701 )?;
702 Ok(oid)
703 }
704
705 pub fn insert_item(
706 &mut self,
707 id: CatalogItemId,
708 oid: u32,
709 global_id: GlobalId,
710 schema_id: SchemaId,
711 item_name: &str,
712 create_sql: String,
713 owner_id: RoleId,
714 privileges: Vec<MzAclItem>,
715 extra_versions: BTreeMap<RelationVersion, GlobalId>,
716 ) -> Result<(), CatalogError> {
717 match self.items.insert(
718 ItemKey { id },
719 ItemValue {
720 schema_id,
721 name: item_name.to_string(),
722 create_sql,
723 owner_id,
724 privileges,
725 oid,
726 global_id,
727 extra_versions,
728 },
729 self.op_id,
730 ) {
731 Ok(_) => Ok(()),
732 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733 }
734 }
735
736 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738 }
739
740 pub fn get_and_increment_id_by(
741 &mut self,
742 key: String,
743 amount: u64,
744 ) -> Result<Vec<u64>, CatalogError> {
745 assert!(
746 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747 "system item IDs cannot be allocated outside of bootstrap"
748 );
749
750 let current_id = self
751 .id_allocator
752 .items()
753 .get(&IdAllocKey { name: key.clone() })
754 .unwrap_or_else(|| panic!("{key} id allocator missing"))
755 .next_id;
756 let next_id = current_id
757 .checked_add(amount)
758 .ok_or(SqlCatalogError::IdExhaustion)?;
759 let prev = self.id_allocator.set(
760 IdAllocKey { name: key },
761 Some(IdAllocValue { next_id }),
762 self.op_id,
763 )?;
764 assert_eq!(
765 prev,
766 Some(IdAllocValue {
767 next_id: current_id
768 })
769 );
770 Ok((current_id..next_id).collect())
771 }
772
773 pub fn allocate_system_item_ids(
774 &mut self,
775 amount: u64,
776 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777 assert!(
778 !self.durable_catalog.is_bootstrap_complete(),
779 "we can only allocate system item IDs during bootstrap"
780 );
781 Ok(self
782 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783 .into_iter()
784 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786 .collect())
787 }
788
789 pub fn allocate_introspection_source_index_id(
821 cluster_id: &ClusterId,
822 log_variant: LogVariant,
823 ) -> (CatalogItemId, GlobalId) {
824 let cluster_variant: u8 = match cluster_id {
825 ClusterId::System(_) => 1,
826 ClusterId::User(_) => 2,
827 };
828 let cluster_id: u64 = cluster_id.inner_id();
829 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830 assert_eq!(
831 CLUSTER_ID_MASK & cluster_id,
832 0,
833 "invalid cluster ID: {cluster_id}"
834 );
835 let log_variant: u8 = match log_variant {
836 LogVariant::Timely(TimelyLog::Operates) => 1,
837 LogVariant::Timely(TimelyLog::Channels) => 2,
838 LogVariant::Timely(TimelyLog::Elapsed) => 3,
839 LogVariant::Timely(TimelyLog::Histogram) => 4,
840 LogVariant::Timely(TimelyLog::Addresses) => 5,
841 LogVariant::Timely(TimelyLog::Parks) => 6,
842 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844 LogVariant::Timely(TimelyLog::Reachability) => 9,
845 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849 LogVariant::Differential(DifferentialLog::Sharing) => 14,
850 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862 LogVariant::Compute(ComputeLog::ShutdownDuration) => 27,
863 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
864 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
865 LogVariant::Compute(ComputeLog::LirMapping) => 30,
866 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
867 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
868 };
869
870 let mut id: u64 = u64::from(cluster_variant) << 56;
871 id |= cluster_id << 8;
872 id |= u64::from(log_variant);
873
874 (
875 CatalogItemId::IntrospectionSourceIndex(id),
876 GlobalId::IntrospectionSourceIndex(id),
877 )
878 }
879
880 pub fn allocate_user_item_ids(
881 &mut self,
882 amount: u64,
883 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
884 Ok(self
885 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
886 .into_iter()
887 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
889 .collect())
890 }
891
892 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
893 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
894 Ok(ReplicaId::User(id))
895 }
896
897 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
898 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
899 Ok(ReplicaId::System(id))
900 }
901
902 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
903 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
904 }
905
906 pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
907 self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
908 }
909
910 #[mz_ore::instrument]
913 fn allocate_oids(
914 &mut self,
915 amount: u64,
916 temporary_oids: &HashSet<u32>,
917 ) -> Result<Vec<u32>, CatalogError> {
918 struct UserOid(u32);
921
922 impl UserOid {
923 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
924 if oid < FIRST_USER_OID {
925 Err(anyhow!("invalid user OID {oid}"))
926 } else {
927 Ok(UserOid(oid))
928 }
929 }
930 }
931
932 impl std::ops::AddAssign<u32> for UserOid {
933 fn add_assign(&mut self, rhs: u32) {
934 let (res, overflow) = self.0.overflowing_add(rhs);
935 self.0 = if overflow { FIRST_USER_OID + res } else { res };
936 }
937 }
938
939 if amount > u32::MAX.into() {
940 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
941 }
942
943 let mut allocated_oids = HashSet::with_capacity(
949 self.databases.len()
950 + self.schemas.len()
951 + self.roles.len()
952 + self.items.len()
953 + self.introspection_sources.len()
954 + temporary_oids.len(),
955 );
956 self.databases.for_values(|_, value| {
957 allocated_oids.insert(value.oid);
958 });
959 self.schemas.for_values(|_, value| {
960 allocated_oids.insert(value.oid);
961 });
962 self.roles.for_values(|_, value| {
963 allocated_oids.insert(value.oid);
964 });
965 self.items.for_values(|_, value| {
966 allocated_oids.insert(value.oid);
967 });
968 self.introspection_sources.for_values(|_, value| {
969 allocated_oids.insert(value.oid);
970 });
971
972 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
973
974 let start_oid: u32 = self
975 .id_allocator
976 .items()
977 .get(&IdAllocKey {
978 name: OID_ALLOC_KEY.to_string(),
979 })
980 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
981 .next_id
982 .try_into()
983 .expect("we should never persist an oid outside of the u32 range");
984 let mut current_oid = UserOid::new(start_oid)
985 .expect("we should never persist an oid outside of user OID range");
986 let mut oids = Vec::new();
987 while oids.len() < u64_to_usize(amount) {
988 if !is_allocated(current_oid.0) {
989 oids.push(current_oid.0);
990 }
991 current_oid += 1;
992
993 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
994 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
996 }
997 }
998
999 let next_id = current_oid.0;
1000 let prev = self.id_allocator.set(
1001 IdAllocKey {
1002 name: OID_ALLOC_KEY.to_string(),
1003 },
1004 Some(IdAllocValue {
1005 next_id: next_id.into(),
1006 }),
1007 self.op_id,
1008 )?;
1009 assert_eq!(
1010 prev,
1011 Some(IdAllocValue {
1012 next_id: start_oid.into(),
1013 })
1014 );
1015
1016 Ok(oids)
1017 }
1018
1019 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1022 self.allocate_oids(1, temporary_oids)
1023 .map(|oids| oids.into_element())
1024 }
1025
1026 pub(crate) fn insert_id_allocator(
1027 &mut self,
1028 name: String,
1029 next_id: u64,
1030 ) -> Result<(), CatalogError> {
1031 match self.id_allocator.insert(
1032 IdAllocKey { name: name.clone() },
1033 IdAllocValue { next_id },
1034 self.op_id,
1035 ) {
1036 Ok(_) => Ok(()),
1037 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1038 }
1039 }
1040
1041 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1048 let prev = self
1049 .databases
1050 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1051 if prev.is_some() {
1052 Ok(())
1053 } else {
1054 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1055 }
1056 }
1057
1058 pub fn remove_databases(
1065 &mut self,
1066 databases: &BTreeSet<DatabaseId>,
1067 ) -> Result<(), CatalogError> {
1068 if databases.is_empty() {
1069 return Ok(());
1070 }
1071
1072 let to_remove = databases
1073 .iter()
1074 .map(|id| (DatabaseKey { id: *id }, None))
1075 .collect();
1076 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1077 prev.retain(|_k, val| val.is_none());
1078
1079 if !prev.is_empty() {
1080 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1081 return Err(SqlCatalogError::UnknownDatabase(err).into());
1082 }
1083
1084 Ok(())
1085 }
1086
1087 pub fn remove_schema(
1094 &mut self,
1095 database_id: &Option<DatabaseId>,
1096 schema_id: &SchemaId,
1097 ) -> Result<(), CatalogError> {
1098 let prev = self
1099 .schemas
1100 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1101 if prev.is_some() {
1102 Ok(())
1103 } else {
1104 let database_name = match database_id {
1105 Some(id) => format!("{id}."),
1106 None => "".to_string(),
1107 };
1108 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1109 }
1110 }
1111
1112 pub fn remove_schemas(
1119 &mut self,
1120 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1121 ) -> Result<(), CatalogError> {
1122 if schemas.is_empty() {
1123 return Ok(());
1124 }
1125
1126 let to_remove = schemas
1127 .iter()
1128 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1129 .collect();
1130 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1131 prev.retain(|_k, v| v.is_none());
1132
1133 if !prev.is_empty() {
1134 let err = prev
1135 .keys()
1136 .map(|k| {
1137 let db_spec = schemas.get(&k.id).expect("should_exist");
1138 let db_name = match db_spec {
1139 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1140 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1141 };
1142 format!("{}.{}", db_name, k.id)
1143 })
1144 .join(", ");
1145
1146 return Err(SqlCatalogError::UnknownSchema(err).into());
1147 }
1148
1149 Ok(())
1150 }
1151
1152 pub fn remove_source_references(
1153 &mut self,
1154 source_id: CatalogItemId,
1155 ) -> Result<(), CatalogError> {
1156 let deleted = self
1157 .source_references
1158 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1159 .is_some();
1160 if deleted {
1161 Ok(())
1162 } else {
1163 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1164 }
1165 }
1166
1167 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1174 assert!(
1175 roles.iter().all(|id| id.is_user()),
1176 "cannot delete non-user roles"
1177 );
1178 self.remove_roles(roles)
1179 }
1180
1181 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1188 if roles.is_empty() {
1189 return Ok(());
1190 }
1191
1192 let to_remove_keys = roles
1193 .iter()
1194 .map(|role_id| RoleKey { id: *role_id })
1195 .collect::<Vec<_>>();
1196
1197 let to_remove_roles = to_remove_keys
1198 .iter()
1199 .map(|role_key| (role_key.clone(), None))
1200 .collect();
1201
1202 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1203
1204 let to_remove_role_auth = to_remove_keys
1205 .iter()
1206 .map(|role_key| {
1207 (
1208 RoleAuthKey {
1209 role_id: role_key.id,
1210 },
1211 None,
1212 )
1213 })
1214 .collect();
1215
1216 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1217
1218 prev.retain(|_k, v| v.is_none());
1219 if !prev.is_empty() {
1220 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1221 return Err(SqlCatalogError::UnknownRole(err).into());
1222 }
1223
1224 role_auth_prev.retain(|_k, v| v.is_none());
1225 Ok(())
1229 }
1230
1231 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1238 if clusters.is_empty() {
1239 return Ok(());
1240 }
1241
1242 let to_remove = clusters
1243 .iter()
1244 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1245 .collect();
1246 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1247
1248 prev.retain(|_k, v| v.is_none());
1249 if !prev.is_empty() {
1250 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1251 return Err(SqlCatalogError::UnknownCluster(err).into());
1252 }
1253
1254 self.cluster_replicas
1260 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1261 self.introspection_sources
1262 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1263
1264 Ok(())
1265 }
1266
1267 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1274 let deleted = self
1275 .cluster_replicas
1276 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1277 .is_some();
1278 if deleted {
1279 Ok(())
1280 } else {
1281 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1282 }
1283 }
1284
1285 pub fn remove_cluster_replicas(
1292 &mut self,
1293 replicas: &BTreeSet<ReplicaId>,
1294 ) -> Result<(), CatalogError> {
1295 if replicas.is_empty() {
1296 return Ok(());
1297 }
1298
1299 let to_remove = replicas
1300 .iter()
1301 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1302 .collect();
1303 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1304
1305 prev.retain(|_k, v| v.is_none());
1306 if !prev.is_empty() {
1307 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1308 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1309 }
1310
1311 Ok(())
1312 }
1313
1314 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1321 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1322 if prev.is_some() {
1323 Ok(())
1324 } else {
1325 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1326 }
1327 }
1328
1329 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1336 if ids.is_empty() {
1337 return Ok(());
1338 }
1339
1340 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1341 let n = self.items.delete_by_keys(ks, self.op_id).len();
1342 if n == ids.len() {
1343 Ok(())
1344 } else {
1345 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1346 let mut unknown = ids.difference(&item_ids);
1347 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1348 }
1349 }
1350
1351 pub fn remove_system_object_mappings(
1358 &mut self,
1359 descriptions: BTreeSet<SystemObjectDescription>,
1360 ) -> Result<(), CatalogError> {
1361 if descriptions.is_empty() {
1362 return Ok(());
1363 }
1364
1365 let ks: Vec<_> = descriptions
1366 .clone()
1367 .into_iter()
1368 .map(|desc| GidMappingKey {
1369 schema_name: desc.schema_name,
1370 object_type: desc.object_type,
1371 object_name: desc.object_name,
1372 })
1373 .collect();
1374 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1375
1376 if n == descriptions.len() {
1377 Ok(())
1378 } else {
1379 let item_descriptions = self
1380 .system_gid_mapping
1381 .items()
1382 .keys()
1383 .map(|k| SystemObjectDescription {
1384 schema_name: k.schema_name.clone(),
1385 object_type: k.object_type.clone(),
1386 object_name: k.object_name.clone(),
1387 })
1388 .collect();
1389 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1390 format!(
1391 "{} {}.{}",
1392 desc.object_type, desc.schema_name, desc.object_name
1393 )
1394 });
1395 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1396 }
1397 }
1398
1399 pub fn remove_introspection_source_indexes(
1406 &mut self,
1407 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1408 ) -> Result<(), CatalogError> {
1409 if introspection_source_indexes.is_empty() {
1410 return Ok(());
1411 }
1412
1413 let ks: Vec<_> = introspection_source_indexes
1414 .clone()
1415 .into_iter()
1416 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1417 .collect();
1418 let n = self
1419 .introspection_sources
1420 .delete_by_keys(ks, self.op_id)
1421 .len();
1422 if n == introspection_source_indexes.len() {
1423 Ok(())
1424 } else {
1425 let txn_indexes = self
1426 .introspection_sources
1427 .items()
1428 .keys()
1429 .map(|k| (k.cluster_id, k.name.clone()))
1430 .collect();
1431 let mut unknown = introspection_source_indexes
1432 .difference(&txn_indexes)
1433 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1434 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1435 }
1436 }
1437
1438 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1445 let updated =
1446 self.items
1447 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1448 if updated {
1449 Ok(())
1450 } else {
1451 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1452 }
1453 }
1454
1455 pub fn update_items(
1463 &mut self,
1464 items: BTreeMap<CatalogItemId, Item>,
1465 ) -> Result<(), CatalogError> {
1466 if items.is_empty() {
1467 return Ok(());
1468 }
1469
1470 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1471 let kvs: Vec<_> = items
1472 .clone()
1473 .into_iter()
1474 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1475 .collect();
1476 let n = self.items.update_by_keys(kvs, self.op_id)?;
1477 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1478 if n == update_ids.len() {
1479 Ok(())
1480 } else {
1481 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1482 let mut unknown = update_ids.difference(&item_ids);
1483 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1484 }
1485 }
1486
1487 pub fn update_role(
1495 &mut self,
1496 id: RoleId,
1497 role: Role,
1498 password: PasswordAction,
1499 ) -> Result<(), CatalogError> {
1500 let key = RoleKey { id };
1501 if self.roles.get(&key).is_some() {
1502 let auth_key = RoleAuthKey { role_id: id };
1503
1504 match password {
1505 PasswordAction::Set(new_password) => {
1506 let hash = mz_auth::hash::scram256_hash(
1507 &new_password.password,
1508 &new_password.scram_iterations,
1509 )
1510 .expect("password hash should be valid");
1511 let value = RoleAuthValue {
1512 password_hash: Some(hash),
1513 updated_at: SYSTEM_TIME(),
1514 };
1515
1516 if self.role_auth.get(&auth_key).is_some() {
1517 self.role_auth
1518 .update_by_key(auth_key.clone(), value, self.op_id)?;
1519 } else {
1520 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1521 }
1522 }
1523 PasswordAction::Clear => {
1524 let value = RoleAuthValue {
1525 password_hash: None,
1526 updated_at: SYSTEM_TIME(),
1527 };
1528 if self.role_auth.get(&auth_key).is_some() {
1529 self.role_auth
1530 .update_by_key(auth_key.clone(), value, self.op_id)?;
1531 }
1532 }
1533 PasswordAction::NoChange => {}
1534 }
1535
1536 self.roles
1537 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1538
1539 Ok(())
1540 } else {
1541 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1542 }
1543 }
1544
1545 pub fn update_roles_without_auth(
1556 &mut self,
1557 roles: BTreeMap<RoleId, Role>,
1558 ) -> Result<(), CatalogError> {
1559 if roles.is_empty() {
1560 return Ok(());
1561 }
1562
1563 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1564 let kvs: Vec<_> = roles
1565 .into_iter()
1566 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1567 .collect();
1568 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1569 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1570
1571 if n == update_role_ids.len() {
1572 Ok(())
1573 } else {
1574 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1575 let mut unknown = update_role_ids.difference(&role_ids);
1576 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1577 }
1578 }
1579
1580 pub fn update_system_object_mappings(
1585 &mut self,
1586 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1587 ) -> Result<(), CatalogError> {
1588 if mappings.is_empty() {
1589 return Ok(());
1590 }
1591
1592 let n = self.system_gid_mapping.update(
1593 |_k, v| {
1594 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1595 let (_, new_value) = mapping.clone().into_key_value();
1596 Some(new_value)
1597 } else {
1598 None
1599 }
1600 },
1601 self.op_id,
1602 )?;
1603
1604 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1605 != mappings.len()
1606 {
1607 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1608 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1609 }
1610
1611 Ok(())
1612 }
1613
1614 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1621 let updated = self.clusters.update_by_key(
1622 ClusterKey { id },
1623 cluster.into_key_value().1,
1624 self.op_id,
1625 )?;
1626 if updated {
1627 Ok(())
1628 } else {
1629 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1630 }
1631 }
1632
1633 pub fn update_cluster_replica(
1640 &mut self,
1641 replica_id: ReplicaId,
1642 replica: ClusterReplica,
1643 ) -> Result<(), CatalogError> {
1644 let updated = self.cluster_replicas.update_by_key(
1645 ClusterReplicaKey { id: replica_id },
1646 replica.into_key_value().1,
1647 self.op_id,
1648 )?;
1649 if updated {
1650 Ok(())
1651 } else {
1652 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1653 }
1654 }
1655
1656 pub fn update_database(
1663 &mut self,
1664 id: DatabaseId,
1665 database: Database,
1666 ) -> Result<(), CatalogError> {
1667 let updated = self.databases.update_by_key(
1668 DatabaseKey { id },
1669 database.into_key_value().1,
1670 self.op_id,
1671 )?;
1672 if updated {
1673 Ok(())
1674 } else {
1675 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1676 }
1677 }
1678
1679 pub fn update_schema(
1686 &mut self,
1687 schema_id: SchemaId,
1688 schema: Schema,
1689 ) -> Result<(), CatalogError> {
1690 let updated = self.schemas.update_by_key(
1691 SchemaKey { id: schema_id },
1692 schema.into_key_value().1,
1693 self.op_id,
1694 )?;
1695 if updated {
1696 Ok(())
1697 } else {
1698 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1699 }
1700 }
1701
1702 pub fn update_network_policy(
1709 &mut self,
1710 id: NetworkPolicyId,
1711 network_policy: NetworkPolicy,
1712 ) -> Result<(), CatalogError> {
1713 let updated = self.network_policies.update_by_key(
1714 NetworkPolicyKey { id },
1715 network_policy.into_key_value().1,
1716 self.op_id,
1717 )?;
1718 if updated {
1719 Ok(())
1720 } else {
1721 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1722 }
1723 }
1724 pub fn remove_network_policies(
1731 &mut self,
1732 network_policies: &BTreeSet<NetworkPolicyId>,
1733 ) -> Result<(), CatalogError> {
1734 if network_policies.is_empty() {
1735 return Ok(());
1736 }
1737
1738 let to_remove = network_policies
1739 .iter()
1740 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1741 .collect();
1742 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1743 assert!(
1744 prev.iter().all(|(k, _)| k.id.is_user()),
1745 "cannot delete non-user network policy"
1746 );
1747
1748 prev.retain(|_k, v| v.is_none());
1749 if !prev.is_empty() {
1750 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1751 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1752 }
1753
1754 Ok(())
1755 }
1756 pub fn set_default_privilege(
1760 &mut self,
1761 role_id: RoleId,
1762 database_id: Option<DatabaseId>,
1763 schema_id: Option<SchemaId>,
1764 object_type: ObjectType,
1765 grantee: RoleId,
1766 privileges: Option<AclMode>,
1767 ) -> Result<(), CatalogError> {
1768 self.default_privileges.set(
1769 DefaultPrivilegesKey {
1770 role_id,
1771 database_id,
1772 schema_id,
1773 object_type,
1774 grantee,
1775 },
1776 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1777 self.op_id,
1778 )?;
1779 Ok(())
1780 }
1781
1782 pub fn set_default_privileges(
1784 &mut self,
1785 default_privileges: Vec<DefaultPrivilege>,
1786 ) -> Result<(), CatalogError> {
1787 if default_privileges.is_empty() {
1788 return Ok(());
1789 }
1790
1791 let default_privileges = default_privileges
1792 .into_iter()
1793 .map(DurableType::into_key_value)
1794 .map(|(k, v)| (k, Some(v)))
1795 .collect();
1796 self.default_privileges
1797 .set_many(default_privileges, self.op_id)?;
1798 Ok(())
1799 }
1800
1801 pub fn set_system_privilege(
1805 &mut self,
1806 grantee: RoleId,
1807 grantor: RoleId,
1808 acl_mode: Option<AclMode>,
1809 ) -> Result<(), CatalogError> {
1810 self.system_privileges.set(
1811 SystemPrivilegesKey { grantee, grantor },
1812 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1813 self.op_id,
1814 )?;
1815 Ok(())
1816 }
1817
1818 pub fn set_system_privileges(
1820 &mut self,
1821 system_privileges: Vec<MzAclItem>,
1822 ) -> Result<(), CatalogError> {
1823 if system_privileges.is_empty() {
1824 return Ok(());
1825 }
1826
1827 let system_privileges = system_privileges
1828 .into_iter()
1829 .map(DurableType::into_key_value)
1830 .map(|(k, v)| (k, Some(v)))
1831 .collect();
1832 self.system_privileges
1833 .set_many(system_privileges, self.op_id)?;
1834 Ok(())
1835 }
1836
1837 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1839 self.settings.set(
1840 SettingKey { name },
1841 value.map(|value| SettingValue { value }),
1842 self.op_id,
1843 )?;
1844 Ok(())
1845 }
1846
1847 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1848 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1849 }
1850
1851 pub fn insert_introspection_source_indexes(
1853 &mut self,
1854 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1855 temporary_oids: &HashSet<u32>,
1856 ) -> Result<(), CatalogError> {
1857 if introspection_source_indexes.is_empty() {
1858 return Ok(());
1859 }
1860
1861 let amount = usize_to_u64(introspection_source_indexes.len());
1862 let oids = self.allocate_oids(amount, temporary_oids)?;
1863 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1864 .into_iter()
1865 .zip_eq(oids)
1866 .map(
1867 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1868 cluster_id,
1869 name,
1870 item_id,
1871 index_id,
1872 oid,
1873 },
1874 )
1875 .collect();
1876
1877 for introspection_source_index in introspection_source_indexes {
1878 let (key, value) = introspection_source_index.into_key_value();
1879 self.introspection_sources.insert(key, value, self.op_id)?;
1880 }
1881
1882 Ok(())
1883 }
1884
1885 pub fn set_system_object_mappings(
1887 &mut self,
1888 mappings: Vec<SystemObjectMapping>,
1889 ) -> Result<(), CatalogError> {
1890 if mappings.is_empty() {
1891 return Ok(());
1892 }
1893
1894 let mappings = mappings
1895 .into_iter()
1896 .map(DurableType::into_key_value)
1897 .map(|(k, v)| (k, Some(v)))
1898 .collect();
1899 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1900 Ok(())
1901 }
1902
1903 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1905 if replicas.is_empty() {
1906 return Ok(());
1907 }
1908
1909 let replicas = replicas
1910 .into_iter()
1911 .map(DurableType::into_key_value)
1912 .map(|(k, v)| (k, Some(v)))
1913 .collect();
1914 self.cluster_replicas.set_many(replicas, self.op_id)?;
1915 Ok(())
1916 }
1917
1918 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1920 match value {
1921 Some(value) => {
1922 let config = Config { key, value };
1923 let (key, value) = config.into_key_value();
1924 self.configs.set(key, Some(value), self.op_id)?;
1925 }
1926 None => {
1927 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1928 }
1929 }
1930 Ok(())
1931 }
1932
1933 pub fn get_config(&self, key: String) -> Option<u64> {
1935 self.configs
1936 .get(&ConfigKey { key })
1937 .map(|entry| entry.value)
1938 }
1939
1940 pub fn get_setting(&self, name: String) -> Option<&str> {
1942 self.settings
1943 .get(&SettingKey { name })
1944 .map(|entry| &*entry.value)
1945 }
1946
1947 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1948 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1949 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1950 }
1951
1952 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1953 self.set_setting(
1954 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1955 Some(shard_id.to_string()),
1956 )
1957 }
1958
1959 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1960 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1961 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1962 }
1963
1964 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1965 self.set_setting(
1966 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1967 Some(shard_id.to_string()),
1968 )
1969 }
1970
1971 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1977 self.set_config(
1978 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1979 Some(
1980 value
1981 .as_millis()
1982 .try_into()
1983 .expect("max wait fits into u64"),
1984 ),
1985 )
1986 }
1987
1988 pub fn set_0dt_deployment_ddl_check_interval(
1995 &mut self,
1996 value: Duration,
1997 ) -> Result<(), CatalogError> {
1998 self.set_config(
1999 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2000 Some(
2001 value
2002 .as_millis()
2003 .try_into()
2004 .expect("ddl check interval fits into u64"),
2005 ),
2006 )
2007 }
2008
2009 pub fn set_enable_0dt_deployment_panic_after_timeout(
2015 &mut self,
2016 value: bool,
2017 ) -> Result<(), CatalogError> {
2018 self.set_config(
2019 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2020 Some(u64::from(value)),
2021 )
2022 }
2023
2024 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2030 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2031 }
2032
2033 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2040 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2041 }
2042
2043 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2050 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2051 }
2052
2053 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2055 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2056 }
2057
2058 pub fn update_comment(
2059 &mut self,
2060 object_id: CommentObjectId,
2061 sub_component: Option<usize>,
2062 comment: Option<String>,
2063 ) -> Result<(), CatalogError> {
2064 let key = CommentKey {
2065 object_id,
2066 sub_component,
2067 };
2068 let value = comment.map(|c| CommentValue { comment: c });
2069 self.comments.set(key, value, self.op_id)?;
2070
2071 Ok(())
2072 }
2073
2074 pub fn drop_comments(
2075 &mut self,
2076 object_ids: &BTreeSet<CommentObjectId>,
2077 ) -> Result<(), CatalogError> {
2078 if object_ids.is_empty() {
2079 return Ok(());
2080 }
2081
2082 self.comments
2083 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2084 Ok(())
2085 }
2086
2087 pub fn update_source_references(
2088 &mut self,
2089 source_id: CatalogItemId,
2090 references: Vec<SourceReference>,
2091 updated_at: u64,
2092 ) -> Result<(), CatalogError> {
2093 let key = SourceReferencesKey { source_id };
2094 let value = SourceReferencesValue {
2095 references,
2096 updated_at,
2097 };
2098 self.source_references.set(key, Some(value), self.op_id)?;
2099 Ok(())
2100 }
2101
2102 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2104 let key = ServerConfigurationKey {
2105 name: name.to_string(),
2106 };
2107 let value = ServerConfigurationValue { value };
2108 self.system_configurations
2109 .set(key, Some(value), self.op_id)?;
2110 Ok(())
2111 }
2112
2113 pub fn remove_system_config(&mut self, name: &str) {
2115 let key = ServerConfigurationKey {
2116 name: name.to_string(),
2117 };
2118 self.system_configurations
2119 .set(key, None, self.op_id)
2120 .expect("cannot have uniqueness violation");
2121 }
2122
2123 pub fn clear_system_configs(&mut self) {
2125 self.system_configurations.delete(|_k, _v| true, self.op_id);
2126 }
2127
2128 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2129 match self.configs.insert(
2130 ConfigKey { key: key.clone() },
2131 ConfigValue { value },
2132 self.op_id,
2133 ) {
2134 Ok(_) => Ok(()),
2135 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2136 }
2137 }
2138
2139 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2140 self.clusters
2141 .items()
2142 .into_iter()
2143 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2144 }
2145
2146 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2147 self.cluster_replicas
2148 .items()
2149 .into_iter()
2150 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2151 }
2152
2153 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2154 self.roles
2155 .items()
2156 .into_iter()
2157 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2158 }
2159
2160 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2161 self.network_policies
2162 .items()
2163 .into_iter()
2164 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2165 }
2166
2167 pub fn get_system_object_mappings(
2168 &self,
2169 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2170 self.system_gid_mapping
2171 .items()
2172 .into_iter()
2173 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2174 }
2175
2176 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2177 self.schemas
2178 .items()
2179 .into_iter()
2180 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2181 }
2182
2183 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2184 self.system_configurations
2185 .items()
2186 .into_iter()
2187 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2188 }
2189
2190 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2191 let key = SchemaKey { id: *id };
2192 self.schemas
2193 .get(&key)
2194 .map(|v| DurableType::from_key_value(key, v.clone()))
2195 }
2196
2197 pub fn get_introspection_source_indexes(
2198 &self,
2199 cluster_id: ClusterId,
2200 ) -> BTreeMap<&str, (GlobalId, u32)> {
2201 self.introspection_sources
2202 .items()
2203 .into_iter()
2204 .filter(|(k, _v)| k.cluster_id == cluster_id)
2205 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2206 .collect()
2207 }
2208
2209 pub fn get_catalog_content_version(&self) -> Option<&str> {
2210 self.settings
2211 .get(&SettingKey {
2212 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2213 })
2214 .map(|value| &*value.value)
2215 }
2216
2217 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2218 self.settings
2219 .get(&SettingKey {
2220 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2221 })
2222 .map(|value| value.value.clone())
2223 }
2224
2225 #[must_use]
2231 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2232 let updates = self.get_op_updates();
2233 self.commit_op();
2234 updates
2235 }
2236
2237 fn get_op_updates(&self) -> Vec<StateUpdate> {
2238 fn get_collection_op_updates<'a, T>(
2239 table_txn: &'a TableTransaction<T::Key, T::Value>,
2240 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2241 op: Timestamp,
2242 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2243 where
2244 T::Key: Ord + Eq + Clone + Debug,
2245 T::Value: Ord + Clone + Debug,
2246 T: DurableType,
2247 {
2248 table_txn
2249 .pending
2250 .iter()
2251 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2252 .filter_map(move |(k, v)| {
2253 if v.ts == op {
2254 let key = k.clone();
2255 let value = v.value.clone();
2256 let diff = v.diff.clone().try_into().expect("invalid diff");
2257 let update = DurableType::from_key_value(key, value);
2258 let kind = kind_fn(update);
2259 Some((kind, diff))
2260 } else {
2261 None
2262 }
2263 })
2264 }
2265
2266 fn get_large_collection_op_updates<'a, T>(
2267 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2268 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2269 op: Timestamp,
2270 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2271 where
2272 T::Key: Ord + Eq + Clone + Debug,
2273 T: DurableType<Value = ()>,
2274 {
2275 collection.iter().filter_map(move |(k, diff, ts)| {
2276 if *ts == op {
2277 let key = k.clone();
2278 let diff = diff.clone().try_into().expect("invalid diff");
2279 let update = DurableType::from_key_value(key, ());
2280 let kind = kind_fn(update);
2281 Some((kind, diff))
2282 } else {
2283 None
2284 }
2285 })
2286 }
2287
2288 let Transaction {
2289 durable_catalog: _,
2290 databases,
2291 schemas,
2292 items,
2293 comments,
2294 roles,
2295 role_auth,
2296 clusters,
2297 network_policies,
2298 cluster_replicas,
2299 introspection_sources,
2300 system_gid_mapping,
2301 system_configurations,
2302 default_privileges,
2303 source_references,
2304 system_privileges,
2305 audit_log_updates,
2306 storage_collection_metadata,
2307 unfinalized_shards,
2308 id_allocator: _,
2310 configs: _,
2311 settings: _,
2312 txn_wal_shard: _,
2313 upper,
2314 op_id: _,
2315 } = &self;
2316
2317 let updates = std::iter::empty()
2318 .chain(get_collection_op_updates(
2319 roles,
2320 StateUpdateKind::Role,
2321 self.op_id,
2322 ))
2323 .chain(get_collection_op_updates(
2324 role_auth,
2325 StateUpdateKind::RoleAuth,
2326 self.op_id,
2327 ))
2328 .chain(get_collection_op_updates(
2329 databases,
2330 StateUpdateKind::Database,
2331 self.op_id,
2332 ))
2333 .chain(get_collection_op_updates(
2334 schemas,
2335 StateUpdateKind::Schema,
2336 self.op_id,
2337 ))
2338 .chain(get_collection_op_updates(
2339 default_privileges,
2340 StateUpdateKind::DefaultPrivilege,
2341 self.op_id,
2342 ))
2343 .chain(get_collection_op_updates(
2344 system_privileges,
2345 StateUpdateKind::SystemPrivilege,
2346 self.op_id,
2347 ))
2348 .chain(get_collection_op_updates(
2349 system_configurations,
2350 StateUpdateKind::SystemConfiguration,
2351 self.op_id,
2352 ))
2353 .chain(get_collection_op_updates(
2354 clusters,
2355 StateUpdateKind::Cluster,
2356 self.op_id,
2357 ))
2358 .chain(get_collection_op_updates(
2359 network_policies,
2360 StateUpdateKind::NetworkPolicy,
2361 self.op_id,
2362 ))
2363 .chain(get_collection_op_updates(
2364 introspection_sources,
2365 StateUpdateKind::IntrospectionSourceIndex,
2366 self.op_id,
2367 ))
2368 .chain(get_collection_op_updates(
2369 cluster_replicas,
2370 StateUpdateKind::ClusterReplica,
2371 self.op_id,
2372 ))
2373 .chain(get_collection_op_updates(
2374 system_gid_mapping,
2375 StateUpdateKind::SystemObjectMapping,
2376 self.op_id,
2377 ))
2378 .chain(get_collection_op_updates(
2379 items,
2380 StateUpdateKind::Item,
2381 self.op_id,
2382 ))
2383 .chain(get_collection_op_updates(
2384 comments,
2385 StateUpdateKind::Comment,
2386 self.op_id,
2387 ))
2388 .chain(get_collection_op_updates(
2389 source_references,
2390 StateUpdateKind::SourceReferences,
2391 self.op_id,
2392 ))
2393 .chain(get_collection_op_updates(
2394 storage_collection_metadata,
2395 StateUpdateKind::StorageCollectionMetadata,
2396 self.op_id,
2397 ))
2398 .chain(get_collection_op_updates(
2399 unfinalized_shards,
2400 StateUpdateKind::UnfinalizedShard,
2401 self.op_id,
2402 ))
2403 .chain(get_large_collection_op_updates(
2404 audit_log_updates,
2405 StateUpdateKind::AuditLog,
2406 self.op_id,
2407 ))
2408 .map(|(kind, diff)| StateUpdate {
2409 kind,
2410 ts: upper.clone(),
2411 diff,
2412 })
2413 .collect();
2414
2415 updates
2416 }
2417
2418 pub fn is_savepoint(&self) -> bool {
2419 self.durable_catalog.is_savepoint()
2420 }
2421
2422 fn commit_op(&mut self) {
2423 self.op_id += 1;
2424 }
2425
2426 pub fn op_id(&self) -> Timestamp {
2427 self.op_id
2428 }
2429
2430 pub fn upper(&self) -> mz_repr::Timestamp {
2431 self.upper
2432 }
2433
2434 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2435 let audit_log_updates = self
2436 .audit_log_updates
2437 .into_iter()
2438 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2439 .collect();
2440
2441 let txn_batch = TransactionBatch {
2442 databases: self.databases.pending(),
2443 schemas: self.schemas.pending(),
2444 items: self.items.pending(),
2445 comments: self.comments.pending(),
2446 roles: self.roles.pending(),
2447 role_auth: self.role_auth.pending(),
2448 clusters: self.clusters.pending(),
2449 cluster_replicas: self.cluster_replicas.pending(),
2450 network_policies: self.network_policies.pending(),
2451 introspection_sources: self.introspection_sources.pending(),
2452 id_allocator: self.id_allocator.pending(),
2453 configs: self.configs.pending(),
2454 source_references: self.source_references.pending(),
2455 settings: self.settings.pending(),
2456 system_gid_mapping: self.system_gid_mapping.pending(),
2457 system_configurations: self.system_configurations.pending(),
2458 default_privileges: self.default_privileges.pending(),
2459 system_privileges: self.system_privileges.pending(),
2460 storage_collection_metadata: self.storage_collection_metadata.pending(),
2461 unfinalized_shards: self.unfinalized_shards.pending(),
2462 txn_wal_shard: self.txn_wal_shard.pending(),
2463 audit_log_updates,
2464 upper: self.upper,
2465 };
2466 (txn_batch, self.durable_catalog)
2467 }
2468
2469 #[mz_ore::instrument(level = "debug")]
2481 pub(crate) async fn commit_internal(
2482 self,
2483 commit_ts: mz_repr::Timestamp,
2484 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2485 let (mut txn_batch, durable_catalog) = self.into_parts();
2486 let TransactionBatch {
2487 databases,
2488 schemas,
2489 items,
2490 comments,
2491 roles,
2492 role_auth,
2493 clusters,
2494 cluster_replicas,
2495 network_policies,
2496 introspection_sources,
2497 id_allocator,
2498 configs,
2499 source_references,
2500 settings,
2501 system_gid_mapping,
2502 system_configurations,
2503 default_privileges,
2504 system_privileges,
2505 storage_collection_metadata,
2506 unfinalized_shards,
2507 txn_wal_shard,
2508 audit_log_updates,
2509 upper,
2510 } = &mut txn_batch;
2511 differential_dataflow::consolidation::consolidate_updates(databases);
2514 differential_dataflow::consolidation::consolidate_updates(schemas);
2515 differential_dataflow::consolidation::consolidate_updates(items);
2516 differential_dataflow::consolidation::consolidate_updates(comments);
2517 differential_dataflow::consolidation::consolidate_updates(roles);
2518 differential_dataflow::consolidation::consolidate_updates(role_auth);
2519 differential_dataflow::consolidation::consolidate_updates(clusters);
2520 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2521 differential_dataflow::consolidation::consolidate_updates(network_policies);
2522 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2523 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2524 differential_dataflow::consolidation::consolidate_updates(configs);
2525 differential_dataflow::consolidation::consolidate_updates(settings);
2526 differential_dataflow::consolidation::consolidate_updates(source_references);
2527 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2528 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2529 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2530 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2531 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2532 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2533 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2534 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2535
2536 assert!(
2537 commit_ts >= *upper,
2538 "expected commit ts, {}, to be greater than or equal to upper, {}",
2539 commit_ts,
2540 upper
2541 );
2542 let upper = durable_catalog
2543 .commit_transaction(txn_batch, commit_ts)
2544 .await?;
2545 Ok((durable_catalog, upper))
2546 }
2547
2548 #[mz_ore::instrument(level = "debug")]
2565 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2566 let op_updates = self.get_op_updates();
2567 assert!(
2568 op_updates.is_empty(),
2569 "unconsumed transaction updates: {op_updates:?}"
2570 );
2571
2572 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2573 let updates = durable_storage.sync_updates(upper).await?;
2575 soft_assert_no_log!(
2580 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2581 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2582 );
2583 Ok(())
2584 }
2585}
2586
2587use crate::durable::async_trait;
2588
2589use super::objects::{RoleAuthKey, RoleAuthValue};
2590
2591#[async_trait]
2592impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2593 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2594 self.storage_collection_metadata
2595 .items()
2596 .into_iter()
2597 .map(
2598 |(
2599 StorageCollectionMetadataKey { id },
2600 StorageCollectionMetadataValue { shard },
2601 )| { (*id, shard.clone()) },
2602 )
2603 .collect()
2604 }
2605
2606 fn insert_collection_metadata(
2607 &mut self,
2608 metadata: BTreeMap<GlobalId, ShardId>,
2609 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2610 for (id, shard) in metadata {
2611 self.storage_collection_metadata
2612 .insert(
2613 StorageCollectionMetadataKey { id },
2614 StorageCollectionMetadataValue {
2615 shard: shard.clone(),
2616 },
2617 self.op_id,
2618 )
2619 .map_err(|err| match err {
2620 DurableCatalogError::DuplicateKey => {
2621 StorageError::CollectionMetadataAlreadyExists(id)
2622 }
2623 DurableCatalogError::UniquenessViolation => {
2624 StorageError::PersistShardAlreadyInUse(shard)
2625 }
2626 err => StorageError::Generic(anyhow::anyhow!(err)),
2627 })?;
2628 }
2629 Ok(())
2630 }
2631
2632 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2633 let ks: Vec<_> = ids
2634 .into_iter()
2635 .map(|id| StorageCollectionMetadataKey { id })
2636 .collect();
2637 self.storage_collection_metadata
2638 .delete_by_keys(ks, self.op_id)
2639 .into_iter()
2640 .map(
2641 |(
2642 StorageCollectionMetadataKey { id },
2643 StorageCollectionMetadataValue { shard },
2644 )| (id, shard),
2645 )
2646 .collect()
2647 }
2648
2649 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2650 self.unfinalized_shards
2651 .items()
2652 .into_iter()
2653 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2654 .collect()
2655 }
2656
2657 fn insert_unfinalized_shards(
2658 &mut self,
2659 s: BTreeSet<ShardId>,
2660 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2661 for shard in s {
2662 match self
2663 .unfinalized_shards
2664 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2665 {
2666 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2668 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2669 };
2670 }
2671 Ok(())
2672 }
2673
2674 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2675 let ks: Vec<_> = shards
2676 .into_iter()
2677 .map(|shard| UnfinalizedShardKey { shard })
2678 .collect();
2679 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2680 }
2681
2682 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2683 self.txn_wal_shard
2684 .values()
2685 .iter()
2686 .next()
2687 .map(|TxnWalShardValue { shard }| *shard)
2688 }
2689
2690 fn write_txn_wal_shard(
2691 &mut self,
2692 shard: ShardId,
2693 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2694 self.txn_wal_shard
2695 .insert((), TxnWalShardValue { shard }, self.op_id)
2696 .map_err(|err| match err {
2697 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2698 err => StorageError::Generic(anyhow::anyhow!(err)),
2699 })
2700 }
2701}
2702
2703#[derive(Debug, Clone, Default, PartialEq)]
2705pub struct TransactionBatch {
2706 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2707 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2708 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2709 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2710 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2711 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2712 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2713 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2714 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2715 pub(crate) introspection_sources: Vec<(
2716 proto::ClusterIntrospectionSourceIndexKey,
2717 proto::ClusterIntrospectionSourceIndexValue,
2718 Diff,
2719 )>,
2720 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2721 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2722 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2723 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2724 pub(crate) system_configurations: Vec<(
2725 proto::ServerConfigurationKey,
2726 proto::ServerConfigurationValue,
2727 Diff,
2728 )>,
2729 pub(crate) default_privileges: Vec<(
2730 proto::DefaultPrivilegesKey,
2731 proto::DefaultPrivilegesValue,
2732 Diff,
2733 )>,
2734 pub(crate) source_references: Vec<(
2735 proto::SourceReferencesKey,
2736 proto::SourceReferencesValue,
2737 Diff,
2738 )>,
2739 pub(crate) system_privileges: Vec<(
2740 proto::SystemPrivilegesKey,
2741 proto::SystemPrivilegesValue,
2742 Diff,
2743 )>,
2744 pub(crate) storage_collection_metadata: Vec<(
2745 proto::StorageCollectionMetadataKey,
2746 proto::StorageCollectionMetadataValue,
2747 Diff,
2748 )>,
2749 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2750 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2751 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2752 pub(crate) upper: mz_repr::Timestamp,
2754}
2755
2756impl TransactionBatch {
2757 pub fn is_empty(&self) -> bool {
2758 let TransactionBatch {
2759 databases,
2760 schemas,
2761 items,
2762 comments,
2763 roles,
2764 role_auth,
2765 clusters,
2766 cluster_replicas,
2767 network_policies,
2768 introspection_sources,
2769 id_allocator,
2770 configs,
2771 settings,
2772 source_references,
2773 system_gid_mapping,
2774 system_configurations,
2775 default_privileges,
2776 system_privileges,
2777 storage_collection_metadata,
2778 unfinalized_shards,
2779 txn_wal_shard,
2780 audit_log_updates,
2781 upper: _,
2782 } = self;
2783 databases.is_empty()
2784 && schemas.is_empty()
2785 && items.is_empty()
2786 && comments.is_empty()
2787 && roles.is_empty()
2788 && role_auth.is_empty()
2789 && clusters.is_empty()
2790 && cluster_replicas.is_empty()
2791 && network_policies.is_empty()
2792 && introspection_sources.is_empty()
2793 && id_allocator.is_empty()
2794 && configs.is_empty()
2795 && settings.is_empty()
2796 && source_references.is_empty()
2797 && system_gid_mapping.is_empty()
2798 && system_configurations.is_empty()
2799 && default_privileges.is_empty()
2800 && system_privileges.is_empty()
2801 && storage_collection_metadata.is_empty()
2802 && unfinalized_shards.is_empty()
2803 && txn_wal_shard.is_empty()
2804 && audit_log_updates.is_empty()
2805 }
2806}
2807
2808#[derive(Debug, Clone, PartialEq, Eq)]
2809struct TransactionUpdate<V> {
2810 value: V,
2811 ts: Timestamp,
2812 diff: Diff,
2813}
2814
2815trait UniqueName {
2817 const HAS_UNIQUE_NAME: bool;
2820 fn unique_name(&self) -> &str;
2822}
2823
2824mod unique_name {
2825 use crate::durable::objects::*;
2826
2827 macro_rules! impl_unique_name {
2828 ($($t:ty),* $(,)?) => {
2829 $(
2830 impl crate::durable::transaction::UniqueName for $t {
2831 const HAS_UNIQUE_NAME: bool = true;
2832 fn unique_name(&self) -> &str {
2833 &self.name
2834 }
2835 }
2836 )*
2837 };
2838 }
2839
2840 macro_rules! impl_no_unique_name {
2841 ($($t:ty),* $(,)?) => {
2842 $(
2843 impl crate::durable::transaction::UniqueName for $t {
2844 const HAS_UNIQUE_NAME: bool = false;
2845 fn unique_name(&self) -> &str {
2846 ""
2847 }
2848 }
2849 )*
2850 };
2851 }
2852
2853 impl_unique_name! {
2854 ClusterReplicaValue,
2855 ClusterValue,
2856 DatabaseValue,
2857 ItemValue,
2858 NetworkPolicyValue,
2859 RoleValue,
2860 SchemaValue,
2861 }
2862
2863 impl_no_unique_name!(
2864 (),
2865 ClusterIntrospectionSourceIndexValue,
2866 CommentValue,
2867 ConfigValue,
2868 DefaultPrivilegesValue,
2869 GidMappingValue,
2870 IdAllocValue,
2871 ServerConfigurationValue,
2872 SettingValue,
2873 SourceReferencesValue,
2874 StorageCollectionMetadataValue,
2875 SystemPrivilegesValue,
2876 TxnWalShardValue,
2877 RoleAuthValue,
2878 );
2879
2880 #[cfg(test)]
2881 mod test {
2882 impl_no_unique_name!(String,);
2883 }
2884}
2885
2886#[derive(Debug)]
2896struct TableTransaction<K, V> {
2897 initial: BTreeMap<K, V>,
2898 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2901 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2902}
2903
2904impl<K, V> TableTransaction<K, V>
2905where
2906 K: Ord + Eq + Clone + Debug,
2907 V: Ord + Clone + Debug + UniqueName,
2908{
2909 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2916 where
2917 K: RustType<KP>,
2918 V: RustType<VP>,
2919 {
2920 let initial = initial
2921 .into_iter()
2922 .map(RustType::from_proto)
2923 .collect::<Result<_, _>>()?;
2924
2925 Ok(Self {
2926 initial,
2927 pending: BTreeMap::new(),
2928 uniqueness_violation: None,
2929 })
2930 }
2931
2932 fn new_with_uniqueness_fn<KP, VP>(
2935 initial: BTreeMap<KP, VP>,
2936 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2937 ) -> Result<Self, TryFromProtoError>
2938 where
2939 K: RustType<KP>,
2940 V: RustType<VP>,
2941 {
2942 let initial = initial
2943 .into_iter()
2944 .map(RustType::from_proto)
2945 .collect::<Result<_, _>>()?;
2946
2947 Ok(Self {
2948 initial,
2949 pending: BTreeMap::new(),
2950 uniqueness_violation: Some(uniqueness_violation),
2951 })
2952 }
2953
2954 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2957 where
2958 K: RustType<KP>,
2959 V: RustType<VP>,
2960 {
2961 soft_assert_no_log!(self.verify().is_ok());
2962 self.pending
2965 .into_iter()
2966 .flat_map(|(k, v)| {
2967 let mut v: Vec<_> = v
2968 .into_iter()
2969 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2970 .collect();
2971 differential_dataflow::consolidation::consolidate(&mut v);
2972 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2973 })
2974 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2975 .collect()
2976 }
2977
2978 fn verify(&self) -> Result<(), DurableCatalogError> {
2983 if let Some(uniqueness_violation) = self.uniqueness_violation {
2984 let items = self.values();
2986 if V::HAS_UNIQUE_NAME {
2987 let by_name: BTreeMap<_, _> = items
2988 .iter()
2989 .enumerate()
2990 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2991 .collect();
2992 for (i, vi) in items.iter().enumerate() {
2993 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
2994 if i != *j && uniqueness_violation(vi, *vj) {
2995 return Err(DurableCatalogError::UniquenessViolation);
2996 }
2997 }
2998 }
2999 } else {
3000 for (i, vi) in items.iter().enumerate() {
3001 for (j, vj) in items.iter().enumerate() {
3002 if i != j && uniqueness_violation(vi, vj) {
3003 return Err(DurableCatalogError::UniquenessViolation);
3004 }
3005 }
3006 }
3007 }
3008 }
3009 soft_assert_no_log!(
3010 self.pending
3011 .values()
3012 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3013 "pending should be sorted by timestamp: {:?}",
3014 self.pending
3015 );
3016 Ok(())
3017 }
3018
3019 fn verify_keys<'a>(
3024 &self,
3025 keys: impl IntoIterator<Item = &'a K>,
3026 ) -> Result<(), DurableCatalogError>
3027 where
3028 K: 'a,
3029 {
3030 if let Some(uniqueness_violation) = self.uniqueness_violation {
3031 let entries: Vec<_> = keys
3032 .into_iter()
3033 .filter_map(|key| self.get(key).map(|value| (key, value)))
3034 .collect();
3035 for (ki, vi) in self.items() {
3037 for (kj, vj) in &entries {
3038 if ki != *kj && uniqueness_violation(vi, vj) {
3039 return Err(DurableCatalogError::UniquenessViolation);
3040 }
3041 }
3042 }
3043 }
3044 soft_assert_no_log!(self.verify().is_ok());
3045 Ok(())
3046 }
3047
3048 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3051 let mut seen = BTreeSet::new();
3052 for k in self.pending.keys() {
3053 seen.insert(k);
3054 let v = self.get(k);
3055 if let Some(v) = v {
3058 f(k, v);
3059 }
3060 }
3061 for (k, v) in self.initial.iter() {
3062 if !seen.contains(k) {
3064 f(k, v);
3065 }
3066 }
3067 }
3068
3069 fn get(&self, k: &K) -> Option<&V> {
3071 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3072 let mut updates = Vec::with_capacity(pending.len() + 1);
3073 if let Some(initial) = self.initial.get(k) {
3074 updates.push((initial, Diff::ONE));
3075 }
3076 updates.extend(
3077 pending
3078 .into_iter()
3079 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3080 );
3081
3082 differential_dataflow::consolidation::consolidate(&mut updates);
3083 assert!(updates.len() <= 1);
3084 updates.into_iter().next().map(|(v, _)| v)
3085 }
3086
3087 #[cfg(test)]
3092 fn items_cloned(&self) -> BTreeMap<K, V> {
3093 let mut items = BTreeMap::new();
3094 self.for_values(|k, v| {
3095 items.insert(k.clone(), v.clone());
3096 });
3097 items
3098 }
3099
3100 fn items(&self) -> BTreeMap<&K, &V> {
3103 let mut items = BTreeMap::new();
3104 self.for_values(|k, v| {
3105 items.insert(k, v);
3106 });
3107 items
3108 }
3109
3110 fn values(&self) -> BTreeSet<&V> {
3112 let mut items = BTreeSet::new();
3113 self.for_values(|_, v| {
3114 items.insert(v);
3115 });
3116 items
3117 }
3118
3119 fn len(&self) -> usize {
3121 let mut count = 0;
3122 self.for_values(|_, _| {
3123 count += 1;
3124 });
3125 count
3126 }
3127
3128 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3132 &mut self,
3133 mut f: F,
3134 ) {
3135 let mut pending = BTreeMap::new();
3136 self.for_values(|k, v| f(&mut pending, k, v));
3137 for (k, updates) in pending {
3138 self.pending.entry(k).or_default().extend(updates);
3139 }
3140 }
3141
3142 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3146 let mut violation = None;
3147 self.for_values(|for_k, for_v| {
3148 if &k == for_k {
3149 violation = Some(DurableCatalogError::DuplicateKey);
3150 }
3151 if let Some(uniqueness_violation) = self.uniqueness_violation {
3152 if uniqueness_violation(for_v, &v) {
3153 violation = Some(DurableCatalogError::UniquenessViolation);
3154 }
3155 }
3156 });
3157 if let Some(violation) = violation {
3158 return Err(violation);
3159 }
3160 self.pending.entry(k).or_default().push(TransactionUpdate {
3161 value: v,
3162 ts,
3163 diff: Diff::ONE,
3164 });
3165 soft_assert_no_log!(self.verify().is_ok());
3166 Ok(())
3167 }
3168
3169 fn update<F: Fn(&K, &V) -> Option<V>>(
3178 &mut self,
3179 f: F,
3180 ts: Timestamp,
3181 ) -> Result<Diff, DurableCatalogError> {
3182 let mut changed = Diff::ZERO;
3183 let mut keys = BTreeSet::new();
3184 let pending = self.pending.clone();
3186 self.for_values_mut(|p, k, v| {
3187 if let Some(next) = f(k, v) {
3188 changed += Diff::ONE;
3189 keys.insert(k.clone());
3190 let updates = p.entry(k.clone()).or_default();
3191 updates.push(TransactionUpdate {
3192 value: v.clone(),
3193 ts,
3194 diff: Diff::MINUS_ONE,
3195 });
3196 updates.push(TransactionUpdate {
3197 value: next,
3198 ts,
3199 diff: Diff::ONE,
3200 });
3201 }
3202 });
3203 if let Err(err) = self.verify_keys(&keys) {
3205 self.pending = pending;
3206 Err(err)
3207 } else {
3208 Ok(changed)
3209 }
3210 }
3211
3212 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3217 if let Some(cur_v) = self.get(&k) {
3218 if v != *cur_v {
3219 self.set(k, Some(v), ts)?;
3220 }
3221 Ok(true)
3222 } else {
3223 Ok(false)
3224 }
3225 }
3226
3227 fn update_by_keys(
3232 &mut self,
3233 kvs: impl IntoIterator<Item = (K, V)>,
3234 ts: Timestamp,
3235 ) -> Result<Diff, DurableCatalogError> {
3236 let kvs: Vec<_> = kvs
3237 .into_iter()
3238 .filter_map(|(k, v)| match self.get(&k) {
3239 Some(cur_v) => Some((*cur_v == v, k, v)),
3241 None => None,
3242 })
3243 .collect();
3244 let changed = kvs.len();
3245 let changed =
3246 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3247 let kvs = kvs
3248 .into_iter()
3249 .filter(|(no_op, _, _)| !no_op)
3251 .map(|(_, k, v)| (k, Some(v)))
3252 .collect();
3253 self.set_many(kvs, ts)?;
3254 Ok(changed)
3255 }
3256
3257 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3264 let prev = self.get(&k).cloned();
3265 let entry = self.pending.entry(k.clone()).or_default();
3266 let restore_len = entry.len();
3267
3268 match (v, prev.clone()) {
3269 (Some(v), Some(prev)) => {
3270 entry.push(TransactionUpdate {
3271 value: prev,
3272 ts,
3273 diff: Diff::MINUS_ONE,
3274 });
3275 entry.push(TransactionUpdate {
3276 value: v,
3277 ts,
3278 diff: Diff::ONE,
3279 });
3280 }
3281 (Some(v), None) => {
3282 entry.push(TransactionUpdate {
3283 value: v,
3284 ts,
3285 diff: Diff::ONE,
3286 });
3287 }
3288 (None, Some(prev)) => {
3289 entry.push(TransactionUpdate {
3290 value: prev,
3291 ts,
3292 diff: Diff::MINUS_ONE,
3293 });
3294 }
3295 (None, None) => {}
3296 }
3297
3298 if let Err(err) = self.verify_keys([&k]) {
3300 let pending = self.pending.get_mut(&k).expect("inserted above");
3303 pending.truncate(restore_len);
3304 Err(err)
3305 } else {
3306 Ok(prev)
3307 }
3308 }
3309
3310 fn set_many(
3315 &mut self,
3316 kvs: BTreeMap<K, Option<V>>,
3317 ts: Timestamp,
3318 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3319 if kvs.is_empty() {
3320 return Ok(BTreeMap::new());
3321 }
3322
3323 let mut prevs = BTreeMap::new();
3324 let mut restores = BTreeMap::new();
3325
3326 for (k, v) in kvs {
3327 let prev = self.get(&k).cloned();
3328 let entry = self.pending.entry(k.clone()).or_default();
3329 restores.insert(k.clone(), entry.len());
3330
3331 match (v, prev.clone()) {
3332 (Some(v), Some(prev)) => {
3333 entry.push(TransactionUpdate {
3334 value: prev,
3335 ts,
3336 diff: Diff::MINUS_ONE,
3337 });
3338 entry.push(TransactionUpdate {
3339 value: v,
3340 ts,
3341 diff: Diff::ONE,
3342 });
3343 }
3344 (Some(v), None) => {
3345 entry.push(TransactionUpdate {
3346 value: v,
3347 ts,
3348 diff: Diff::ONE,
3349 });
3350 }
3351 (None, Some(prev)) => {
3352 entry.push(TransactionUpdate {
3353 value: prev,
3354 ts,
3355 diff: Diff::MINUS_ONE,
3356 });
3357 }
3358 (None, None) => {}
3359 }
3360
3361 prevs.insert(k, prev);
3362 }
3363
3364 if let Err(err) = self.verify_keys(prevs.keys()) {
3366 for (k, restore_len) in restores {
3367 let pending = self.pending.get_mut(&k).expect("inserted above");
3370 pending.truncate(restore_len);
3371 }
3372 Err(err)
3373 } else {
3374 Ok(prevs)
3375 }
3376 }
3377
3378 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3384 let mut deleted = Vec::new();
3385 self.for_values_mut(|p, k, v| {
3386 if f(k, v) {
3387 deleted.push((k.clone(), v.clone()));
3388 p.entry(k.clone()).or_default().push(TransactionUpdate {
3389 value: v.clone(),
3390 ts,
3391 diff: Diff::MINUS_ONE,
3392 });
3393 }
3394 });
3395 soft_assert_no_log!(self.verify().is_ok());
3396 deleted
3397 }
3398
3399 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3403 self.set(k, None, ts)
3404 .expect("deleting an entry cannot violate uniqueness")
3405 }
3406
3407 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3411 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3412 let prevs = self
3413 .set_many(kvs, ts)
3414 .expect("deleting entries cannot violate uniqueness");
3415 prevs
3416 .into_iter()
3417 .filter_map(|(k, v)| v.map(|v| (k, v)))
3418 .collect()
3419 }
3420}
3421
3422#[cfg(test)]
3423#[allow(clippy::unwrap_used)]
3424mod tests {
3425 use super::*;
3426
3427 use mz_ore::now::SYSTEM_TIME;
3428 use mz_ore::{assert_none, assert_ok};
3429 use mz_persist_client::cache::PersistClientCache;
3430 use mz_persist_types::PersistLocation;
3431 use semver::Version;
3432
3433 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3434 use crate::memory;
3435
3436 #[mz_ore::test]
3437 fn test_table_transaction_simple() {
3438 fn uniqueness_violation(a: &String, b: &String) -> bool {
3439 a == b
3440 }
3441 let mut table = TableTransaction::new_with_uniqueness_fn(
3442 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3443 uniqueness_violation,
3444 )
3445 .unwrap();
3446
3447 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3450 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3451 assert!(
3452 table
3453 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3454 .is_err()
3455 );
3456 assert!(
3457 table
3458 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3459 .is_err()
3460 );
3461 }
3462
3463 #[mz_ore::test]
3464 fn test_table_transaction() {
3465 fn uniqueness_violation(a: &String, b: &String) -> bool {
3466 a == b
3467 }
3468 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3469
3470 fn commit(
3471 table: &mut BTreeMap<Vec<u8>, String>,
3472 mut pending: Vec<(Vec<u8>, String, Diff)>,
3473 ) {
3474 pending.sort_by(|a, b| a.2.cmp(&b.2));
3476 for (k, v, diff) in pending {
3477 if diff == Diff::MINUS_ONE {
3478 let prev = table.remove(&k);
3479 assert_eq!(prev, Some(v));
3480 } else if diff == Diff::ONE {
3481 let prev = table.insert(k, v);
3482 assert_eq!(prev, None);
3483 } else {
3484 panic!("unexpected diff: {diff}");
3485 }
3486 }
3487 }
3488
3489 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3490 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3491 let mut table_txn =
3492 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3493 assert_eq!(table_txn.items_cloned(), table);
3494 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3495 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3496 assert_eq!(
3497 table_txn.items_cloned(),
3498 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3499 );
3500 assert_eq!(
3501 table_txn
3502 .update(|_k, _v| Some("v3".to_string()), 2)
3503 .unwrap(),
3504 Diff::ONE
3505 );
3506
3507 table_txn
3509 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3510 .unwrap_err();
3511
3512 table_txn
3513 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3514 .unwrap();
3515 assert_eq!(
3516 table_txn.items_cloned(),
3517 BTreeMap::from([
3518 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3519 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3520 ])
3521 );
3522 let err = table_txn
3523 .update(|_k, _v| Some("v1".to_string()), 5)
3524 .unwrap_err();
3525 assert!(
3526 matches!(err, DurableCatalogError::UniquenessViolation),
3527 "unexpected err: {err:?}"
3528 );
3529 let pending = table_txn.pending();
3530 assert_eq!(
3531 pending,
3532 vec![
3533 (
3534 1i64.to_le_bytes().to_vec(),
3535 "v1".to_string(),
3536 Diff::MINUS_ONE
3537 ),
3538 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3539 (
3540 2i64.to_le_bytes().to_vec(),
3541 "v2".to_string(),
3542 Diff::MINUS_ONE
3543 ),
3544 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3545 ]
3546 );
3547 commit(&mut table, pending);
3548 assert_eq!(
3549 table,
3550 BTreeMap::from([
3551 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3552 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3553 ])
3554 );
3555
3556 let mut table_txn =
3557 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3558 assert_eq!(
3560 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3561 1
3562 );
3563 table_txn
3564 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3565 .unwrap();
3566 table_txn
3568 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3569 .unwrap_err();
3570 table_txn
3572 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3573 .unwrap_err();
3574 assert_eq!(
3575 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3576 1
3577 );
3578 table_txn
3580 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3581 .unwrap();
3582 table_txn
3583 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3584 .unwrap();
3585 let pending = table_txn.pending();
3586 assert_eq!(
3587 pending,
3588 vec![
3589 (
3590 1i64.to_le_bytes().to_vec(),
3591 "v3".to_string(),
3592 Diff::MINUS_ONE
3593 ),
3594 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3595 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3596 ]
3597 );
3598 commit(&mut table, pending);
3599 assert_eq!(
3600 table,
3601 BTreeMap::from([
3602 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3603 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3604 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3605 ])
3606 );
3607
3608 let mut table_txn =
3609 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3610 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3611 table_txn
3612 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3613 .unwrap();
3614
3615 commit(&mut table, table_txn.pending());
3616 assert_eq!(
3617 table,
3618 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3619 );
3620
3621 let mut table_txn =
3622 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3623 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3624 table_txn
3625 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3626 .unwrap();
3627 commit(&mut table, table_txn.pending());
3628 assert_eq!(
3629 table,
3630 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3631 );
3632
3633 let mut table_txn =
3635 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3636 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3637 table_txn
3638 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3639 .unwrap();
3640 table_txn
3641 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3642 .unwrap_err();
3643 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3644 table_txn
3645 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3646 .unwrap();
3647 commit(&mut table, table_txn.pending());
3648 assert_eq!(
3649 table.clone().into_iter().collect::<Vec<_>>(),
3650 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3651 );
3652
3653 let mut table_txn =
3655 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3656 table_txn
3658 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3659 .unwrap_err();
3660 table_txn
3661 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3662 .unwrap();
3663 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3664 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3665 let pending = table_txn.pending();
3666 assert_eq!(
3667 pending,
3668 vec![
3669 (
3670 1i64.to_le_bytes().to_vec(),
3671 "v5".to_string(),
3672 Diff::MINUS_ONE
3673 ),
3674 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3675 ]
3676 );
3677 commit(&mut table, pending);
3678 assert_eq!(
3679 table,
3680 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3681 );
3682
3683 let mut table_txn =
3685 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3686 table_txn
3687 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3688 .unwrap();
3689 let pending = table_txn.pending::<Vec<u8>, String>();
3690 assert!(pending.is_empty());
3691
3692 let mut table_txn =
3694 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3695 table_txn
3697 .set_many(
3698 BTreeMap::from([
3699 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3700 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3701 ]),
3702 0,
3703 )
3704 .unwrap_err();
3705 table_txn
3706 .set_many(
3707 BTreeMap::from([
3708 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3709 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3710 ]),
3711 1,
3712 )
3713 .unwrap();
3714 table_txn
3715 .set_many(
3716 BTreeMap::from([
3717 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3718 (3i64.to_le_bytes().to_vec(), None),
3719 ]),
3720 2,
3721 )
3722 .unwrap();
3723 let pending = table_txn.pending();
3724 assert_eq!(
3725 pending,
3726 vec![
3727 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3728 (
3729 3i64.to_le_bytes().to_vec(),
3730 "v6".to_string(),
3731 Diff::MINUS_ONE
3732 ),
3733 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3734 ]
3735 );
3736 commit(&mut table, pending);
3737 assert_eq!(
3738 table,
3739 BTreeMap::from([
3740 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3741 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3742 ])
3743 );
3744
3745 let mut table_txn =
3747 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3748 table_txn
3749 .set_many(
3750 BTreeMap::from([
3751 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3752 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3753 ]),
3754 0,
3755 )
3756 .unwrap();
3757 let pending = table_txn.pending::<Vec<u8>, String>();
3758 assert!(pending.is_empty());
3759 commit(&mut table, pending);
3760 assert_eq!(
3761 table,
3762 BTreeMap::from([
3763 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3764 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3765 ])
3766 );
3767
3768 let mut table_txn =
3770 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3771 table_txn
3773 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3774 .unwrap_err();
3775 assert!(
3776 table_txn
3777 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3778 .unwrap()
3779 );
3780 assert!(
3781 !table_txn
3782 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3783 .unwrap()
3784 );
3785 let pending = table_txn.pending();
3786 assert_eq!(
3787 pending,
3788 vec![
3789 (
3790 1i64.to_le_bytes().to_vec(),
3791 "v6".to_string(),
3792 Diff::MINUS_ONE
3793 ),
3794 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3795 ]
3796 );
3797 commit(&mut table, pending);
3798 assert_eq!(
3799 table,
3800 BTreeMap::from([
3801 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3802 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3803 ])
3804 );
3805
3806 let mut table_txn =
3808 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3809 assert!(
3810 table_txn
3811 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3812 .unwrap()
3813 );
3814 let pending = table_txn.pending::<Vec<u8>, String>();
3815 assert!(pending.is_empty());
3816 commit(&mut table, pending);
3817 assert_eq!(
3818 table,
3819 BTreeMap::from([
3820 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3821 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3822 ])
3823 );
3824
3825 let mut table_txn =
3827 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3828 table_txn
3830 .update_by_keys(
3831 [
3832 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3833 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3834 ],
3835 0,
3836 )
3837 .unwrap_err();
3838 let n = table_txn
3839 .update_by_keys(
3840 [
3841 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3842 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3843 ],
3844 1,
3845 )
3846 .unwrap();
3847 assert_eq!(n, Diff::ONE);
3848 let n = table_txn
3849 .update_by_keys(
3850 [
3851 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3852 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3853 ],
3854 2,
3855 )
3856 .unwrap();
3857 assert_eq!(n, Diff::ZERO);
3858 let pending = table_txn.pending();
3859 assert_eq!(
3860 pending,
3861 vec![
3862 (
3863 1i64.to_le_bytes().to_vec(),
3864 "v8".to_string(),
3865 Diff::MINUS_ONE
3866 ),
3867 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3868 ]
3869 );
3870 commit(&mut table, pending);
3871 assert_eq!(
3872 table,
3873 BTreeMap::from([
3874 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3875 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3876 ])
3877 );
3878
3879 let mut table_txn =
3881 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3882 let n = table_txn
3883 .update_by_keys(
3884 [
3885 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3886 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3887 ],
3888 0,
3889 )
3890 .unwrap();
3891 assert_eq!(n, Diff::from(2));
3892 let pending = table_txn.pending::<Vec<u8>, String>();
3893 assert!(pending.is_empty());
3894 commit(&mut table, pending);
3895 assert_eq!(
3896 table,
3897 BTreeMap::from([
3898 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3899 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3900 ])
3901 );
3902
3903 let mut table_txn =
3905 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3906 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3907 assert_eq!(prev, Some("v9".to_string()));
3908 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3909 assert_none!(prev);
3910 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3911 assert_none!(prev);
3912 let pending = table_txn.pending();
3913 assert_eq!(
3914 pending,
3915 vec![(
3916 1i64.to_le_bytes().to_vec(),
3917 "v9".to_string(),
3918 Diff::MINUS_ONE
3919 ),]
3920 );
3921 commit(&mut table, pending);
3922 assert_eq!(
3923 table,
3924 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3925 );
3926
3927 let mut table_txn =
3929 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3930 let prevs = table_txn.delete_by_keys(
3931 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3932 0,
3933 );
3934 assert_eq!(
3935 prevs,
3936 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3937 );
3938 let prevs = table_txn.delete_by_keys(
3939 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3940 1,
3941 );
3942 assert_eq!(prevs, vec![]);
3943 let prevs = table_txn.delete_by_keys(
3944 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3945 2,
3946 );
3947 assert_eq!(prevs, vec![]);
3948 let pending = table_txn.pending();
3949 assert_eq!(
3950 pending,
3951 vec![(
3952 42i64.to_le_bytes().to_vec(),
3953 "v7".to_string(),
3954 Diff::MINUS_ONE
3955 ),]
3956 );
3957 commit(&mut table, pending);
3958 assert_eq!(table, BTreeMap::new());
3959 }
3960
3961 #[mz_ore::test(tokio::test)]
3962 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
3964 const VERSION: Version = Version::new(26, 0, 0);
3965 let mut persist_cache = PersistClientCache::new_no_metrics();
3966 persist_cache.cfg.build_version = VERSION;
3967 let persist_client = persist_cache
3968 .open(PersistLocation::new_in_mem())
3969 .await
3970 .unwrap();
3971 let state_builder = TestCatalogStateBuilder::new(persist_client)
3972 .with_default_deploy_generation()
3973 .with_version(VERSION);
3974
3975 let _ = state_builder
3977 .clone()
3978 .unwrap_build()
3979 .await
3980 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3981 .await
3982 .unwrap()
3983 .0;
3984 let mut savepoint_state = state_builder
3985 .unwrap_build()
3986 .await
3987 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3988 .await
3989 .unwrap()
3990 .0;
3991
3992 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3993 assert!(!initial_snapshot.is_empty());
3994
3995 let db_name = "db";
3996 let db_owner = RoleId::User(42);
3997 let db_privileges = Vec::new();
3998 let mut txn = savepoint_state.transaction().await.unwrap();
3999 let (db_id, db_oid) = txn
4000 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4001 .unwrap();
4002 let commit_ts = txn.upper();
4003 txn.commit_internal(commit_ts).await.unwrap();
4004 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4005 let update = updates.into_element();
4006
4007 assert_eq!(update.diff, StateDiff::Addition);
4008
4009 let db = match update.kind {
4010 memory::objects::StateUpdateKind::Database(db) => db,
4011 update => panic!("unexpected update: {update:?}"),
4012 };
4013
4014 assert_eq!(db_id, db.id);
4015 assert_eq!(db_oid, db.oid);
4016 assert_eq!(db_name, db.name);
4017 assert_eq!(db_owner, db.owner_id);
4018 assert_eq!(db_privileges, db.privileges);
4019 }
4020
4021 #[mz_ore::test]
4022 fn test_allocate_introspection_source_index_id() {
4023 let cluster_variant: u8 = 0b0000_0001;
4024 let cluster_id_inner: u64 =
4025 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4026 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4027
4028 let cluster_id = ClusterId::System(cluster_id_inner);
4029 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4030
4031 let introspection_source_index_id: u64 =
4032 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4033
4034 {
4036 let mut cluster_variant_mask = 0xFF << 56;
4037 cluster_variant_mask &= introspection_source_index_id;
4038 cluster_variant_mask >>= 56;
4039 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4040 }
4041
4042 {
4044 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4045 cluster_id_inner_mask &= introspection_source_index_id;
4046 cluster_id_inner_mask >>= 8;
4047 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4048 }
4049
4050 {
4052 let mut log_variant_mask = 0xFF;
4053 log_variant_mask &= introspection_source_index_id;
4054 assert_eq!(
4055 log_variant_mask,
4056 u64::from(timely_messages_received_log_variant)
4057 );
4058 }
4059
4060 let (catalog_item_id, global_id) =
4061 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4062
4063 assert_eq!(
4064 catalog_item_id,
4065 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4066 );
4067 assert_eq!(
4068 global_id,
4069 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4070 );
4071 }
4072}