1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68 SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69 USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70 USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81 #[derivative(Debug = "ignore")]
82 #[derivative(PartialEq = "ignore")]
83 durable_catalog: &'a mut dyn DurableCatalogState,
84 databases: TableTransaction<DatabaseKey, DatabaseValue>,
85 schemas: TableTransaction<SchemaKey, SchemaValue>,
86 items: TableTransaction<ItemKey, ItemValue>,
87 comments: TableTransaction<CommentKey, CommentValue>,
88 roles: TableTransaction<RoleKey, RoleValue>,
89 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90 clusters: TableTransaction<ClusterKey, ClusterValue>,
91 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92 introspection_sources:
93 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95 configs: TableTransaction<ConfigKey, ConfigValue>,
96 settings: TableTransaction<SettingKey, SettingValue>,
97 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103 storage_collection_metadata:
104 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110 upper: mz_repr::Timestamp,
112 op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117 pub fn new(
118 durable_catalog: &'a mut dyn DurableCatalogState,
119 Snapshot {
120 databases,
121 schemas,
122 roles,
123 role_auth,
124 items,
125 comments,
126 clusters,
127 network_policies,
128 cluster_replicas,
129 introspection_sources,
130 id_allocator,
131 configs,
132 settings,
133 source_references,
134 system_object_mappings,
135 system_configurations,
136 default_privileges,
137 system_privileges,
138 storage_collection_metadata,
139 unfinalized_shards,
140 txn_wal_shard,
141 }: Snapshot,
142 upper: mz_repr::Timestamp,
143 ) -> Result<Transaction<'a>, CatalogError> {
144 Ok(Transaction {
145 durable_catalog,
146 databases: TableTransaction::new_with_uniqueness_fn(
147 databases,
148 |a: &DatabaseValue, b| a.name == b.name,
149 )?,
150 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151 a.database_id == b.database_id && a.name == b.name
152 })?,
153 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154 a.schema_id == b.schema_id && a.name == b.name && {
155 let a_type = a.item_type();
157 let b_type = b.item_type();
158 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161 }
162 })?,
163 comments: TableTransaction::new(comments)?,
164 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165 a.name == b.name
166 })?,
167 role_auth: TableTransaction::new(role_auth)?,
168 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169 a.name == b.name
170 })?,
171 network_policies: TableTransaction::new_with_uniqueness_fn(
172 network_policies,
173 |a: &NetworkPolicyValue, b| a.name == b.name,
174 )?,
175 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176 cluster_replicas,
177 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178 )?,
179 introspection_sources: TableTransaction::new(introspection_sources)?,
180 id_allocator: TableTransaction::new(id_allocator)?,
181 configs: TableTransaction::new(configs)?,
182 settings: TableTransaction::new(settings)?,
183 source_references: TableTransaction::new(source_references)?,
184 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185 system_configurations: TableTransaction::new(system_configurations)?,
186 default_privileges: TableTransaction::new(default_privileges)?,
187 system_privileges: TableTransaction::new(system_privileges)?,
188 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194 audit_log_updates: Vec::new(),
195 upper,
196 op_id: 0,
197 })
198 }
199
200 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201 let key = ItemKey { id: *id };
202 self.items
203 .get(&key)
204 .map(|v| DurableType::from_key_value(key, v.clone()))
205 }
206
207 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208 self.items
209 .items()
210 .into_iter()
211 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212 .sorted_by_key(|Item { id, .. }| *id)
213 }
214
215 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216 self.insert_audit_log_events([event]);
217 }
218
219 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220 let events = events
221 .into_iter()
222 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223 self.audit_log_updates.extend(events);
224 }
225
226 pub fn insert_user_database(
227 &mut self,
228 database_name: &str,
229 owner_id: RoleId,
230 privileges: Vec<MzAclItem>,
231 temporary_oids: &HashSet<u32>,
232 ) -> Result<(DatabaseId, u32), CatalogError> {
233 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234 let id = DatabaseId::User(id);
235 let oid = self.allocate_oid(temporary_oids)?;
236 self.insert_database(id, database_name, owner_id, privileges, oid)?;
237 Ok((id, oid))
238 }
239
240 pub(crate) fn insert_database(
241 &mut self,
242 id: DatabaseId,
243 database_name: &str,
244 owner_id: RoleId,
245 privileges: Vec<MzAclItem>,
246 oid: u32,
247 ) -> Result<u32, CatalogError> {
248 match self.databases.insert(
249 DatabaseKey { id },
250 DatabaseValue {
251 name: database_name.to_string(),
252 owner_id,
253 privileges,
254 oid,
255 },
256 self.op_id,
257 ) {
258 Ok(_) => Ok(oid),
259 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260 }
261 }
262
263 pub fn insert_user_schema(
264 &mut self,
265 database_id: DatabaseId,
266 schema_name: &str,
267 owner_id: RoleId,
268 privileges: Vec<MzAclItem>,
269 temporary_oids: &HashSet<u32>,
270 ) -> Result<(SchemaId, u32), CatalogError> {
271 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272 let id = SchemaId::User(id);
273 let oid = self.allocate_oid(temporary_oids)?;
274 self.insert_schema(
275 id,
276 Some(database_id),
277 schema_name.to_string(),
278 owner_id,
279 privileges,
280 oid,
281 )?;
282 Ok((id, oid))
283 }
284
285 pub fn insert_system_schema(
286 &mut self,
287 schema_id: u64,
288 schema_name: &str,
289 owner_id: RoleId,
290 privileges: Vec<MzAclItem>,
291 oid: u32,
292 ) -> Result<(), CatalogError> {
293 let id = SchemaId::System(schema_id);
294 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295 }
296
297 pub(crate) fn insert_schema(
298 &mut self,
299 schema_id: SchemaId,
300 database_id: Option<DatabaseId>,
301 schema_name: String,
302 owner_id: RoleId,
303 privileges: Vec<MzAclItem>,
304 oid: u32,
305 ) -> Result<(), CatalogError> {
306 match self.schemas.insert(
307 SchemaKey { id: schema_id },
308 SchemaValue {
309 database_id,
310 name: schema_name.clone(),
311 owner_id,
312 privileges,
313 oid,
314 },
315 self.op_id,
316 ) {
317 Ok(_) => Ok(()),
318 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319 }
320 }
321
322 pub fn insert_builtin_role(
323 &mut self,
324 id: RoleId,
325 name: String,
326 attributes: RoleAttributesRaw,
327 membership: RoleMembership,
328 vars: RoleVars,
329 oid: u32,
330 ) -> Result<RoleId, CatalogError> {
331 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332 self.insert_role(id, name, attributes, membership, vars, oid)?;
333 Ok(id)
334 }
335
336 pub fn insert_user_role(
337 &mut self,
338 name: String,
339 attributes: RoleAttributesRaw,
340 membership: RoleMembership,
341 vars: RoleVars,
342 temporary_oids: &HashSet<u32>,
343 ) -> Result<(RoleId, u32), CatalogError> {
344 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345 let id = RoleId::User(id);
346 let oid = self.allocate_oid(temporary_oids)?;
347 self.insert_role(id, name, attributes, membership, vars, oid)?;
348 Ok((id, oid))
349 }
350
351 fn insert_role(
352 &mut self,
353 id: RoleId,
354 name: String,
355 attributes: RoleAttributesRaw,
356 membership: RoleMembership,
357 vars: RoleVars,
358 oid: u32,
359 ) -> Result<(), CatalogError> {
360 if let Some(ref password) = attributes.password {
361 let hash = mz_auth::hash::scram256_hash(
362 password,
363 &attributes
364 .scram_iterations
365 .or_else(|| {
366 soft_panic_or_log!(
367 "Hash iterations must be set if a password is provided."
368 );
369 None
370 })
371 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374 )
375 .expect("password hash should be valid");
376 match self.role_auth.insert(
377 RoleAuthKey { role_id: id },
378 RoleAuthValue {
379 password_hash: Some(hash),
380 updated_at: SYSTEM_TIME(),
381 },
382 self.op_id,
383 ) {
384 Ok(_) => {}
385 Err(_) => {
386 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387 }
388 }
389 }
390
391 match self.roles.insert(
392 RoleKey { id },
393 RoleValue {
394 name: name.clone(),
395 attributes: attributes.into(),
396 membership,
397 vars,
398 oid,
399 },
400 self.op_id,
401 ) {
402 Ok(_) => Ok(()),
403 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404 }
405 }
406
407 pub fn insert_user_cluster(
409 &mut self,
410 cluster_id: ClusterId,
411 cluster_name: &str,
412 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413 owner_id: RoleId,
414 privileges: Vec<MzAclItem>,
415 config: ClusterConfig,
416 temporary_oids: &HashSet<u32>,
417 ) -> Result<(), CatalogError> {
418 self.insert_cluster(
419 cluster_id,
420 cluster_name,
421 introspection_source_indexes,
422 owner_id,
423 privileges,
424 config,
425 temporary_oids,
426 )
427 }
428
429 pub fn insert_system_cluster(
431 &mut self,
432 cluster_name: &str,
433 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434 privileges: Vec<MzAclItem>,
435 owner_id: RoleId,
436 config: ClusterConfig,
437 temporary_oids: &HashSet<u32>,
438 ) -> Result<(), CatalogError> {
439 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441 self.insert_cluster(
442 cluster_id,
443 cluster_name,
444 introspection_source_indexes,
445 owner_id,
446 privileges,
447 config,
448 temporary_oids,
449 )
450 }
451
452 fn insert_cluster(
453 &mut self,
454 cluster_id: ClusterId,
455 cluster_name: &str,
456 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457 owner_id: RoleId,
458 privileges: Vec<MzAclItem>,
459 config: ClusterConfig,
460 temporary_oids: &HashSet<u32>,
461 ) -> Result<(), CatalogError> {
462 if let Err(_) = self.clusters.insert(
463 ClusterKey { id: cluster_id },
464 ClusterValue {
465 name: cluster_name.to_string(),
466 owner_id,
467 privileges,
468 config,
469 },
470 self.op_id,
471 ) {
472 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473 };
474
475 let amount = usize_to_u64(introspection_source_indexes.len());
476 let oids = self.allocate_oids(amount, temporary_oids)?;
477 let introspection_source_indexes: Vec<_> = introspection_source_indexes
478 .into_iter()
479 .zip_eq(oids)
480 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481 .collect();
482 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483 let introspection_source_index = IntrospectionSourceIndex {
484 cluster_id,
485 name: builtin.name.to_string(),
486 item_id,
487 index_id,
488 oid,
489 };
490 let (key, value) = introspection_source_index.into_key_value();
491 self.introspection_sources
492 .insert(key, value, self.op_id)
493 .expect("no uniqueness violation");
494 }
495
496 Ok(())
497 }
498
499 pub fn rename_cluster(
500 &mut self,
501 cluster_id: ClusterId,
502 cluster_name: &str,
503 cluster_to_name: &str,
504 ) -> Result<(), CatalogError> {
505 let key = ClusterKey { id: cluster_id };
506
507 match self.clusters.update(
508 |k, v| {
509 if *k == key {
510 let mut value = v.clone();
511 value.name = cluster_to_name.to_string();
512 Some(value)
513 } else {
514 None
515 }
516 },
517 self.op_id,
518 )? {
519 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520 Diff::ONE => Ok(()),
521 n => panic!(
522 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523 ),
524 }
525 }
526
527 pub fn rename_cluster_replica(
528 &mut self,
529 replica_id: ReplicaId,
530 replica_name: &QualifiedReplica,
531 replica_to_name: &str,
532 ) -> Result<(), CatalogError> {
533 let key = ClusterReplicaKey { id: replica_id };
534
535 match self.cluster_replicas.update(
536 |k, v| {
537 if *k == key {
538 let mut value = v.clone();
539 value.name = replica_to_name.to_string();
540 Some(value)
541 } else {
542 None
543 }
544 },
545 self.op_id,
546 )? {
547 Diff::ZERO => {
548 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549 }
550 Diff::ONE => Ok(()),
551 n => panic!(
552 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553 ),
554 }
555 }
556
557 pub fn insert_cluster_replica(
558 &mut self,
559 cluster_id: ClusterId,
560 replica_name: &str,
561 config: ReplicaConfig,
562 owner_id: RoleId,
563 ) -> Result<ReplicaId, CatalogError> {
564 let replica_id = match cluster_id {
565 ClusterId::System(_) => self.allocate_system_replica_id()?,
566 ClusterId::User(_) => self.allocate_user_replica_id()?,
567 };
568 self.insert_cluster_replica_with_id(
569 cluster_id,
570 replica_id,
571 replica_name,
572 config,
573 owner_id,
574 )?;
575 Ok(replica_id)
576 }
577
578 pub(crate) fn insert_cluster_replica_with_id(
579 &mut self,
580 cluster_id: ClusterId,
581 replica_id: ReplicaId,
582 replica_name: &str,
583 config: ReplicaConfig,
584 owner_id: RoleId,
585 ) -> Result<(), CatalogError> {
586 if let Err(_) = self.cluster_replicas.insert(
587 ClusterReplicaKey { id: replica_id },
588 ClusterReplicaValue {
589 cluster_id,
590 name: replica_name.into(),
591 config,
592 owner_id,
593 },
594 self.op_id,
595 ) {
596 let cluster = self
597 .clusters
598 .get(&ClusterKey { id: cluster_id })
599 .expect("cluster exists");
600 return Err(SqlCatalogError::DuplicateReplica(
601 replica_name.to_string(),
602 cluster.name.to_string(),
603 )
604 .into());
605 };
606 Ok(())
607 }
608
609 pub fn insert_user_network_policy(
610 &mut self,
611 name: String,
612 rules: Vec<NetworkPolicyRule>,
613 privileges: Vec<MzAclItem>,
614 owner_id: RoleId,
615 temporary_oids: &HashSet<u32>,
616 ) -> Result<NetworkPolicyId, CatalogError> {
617 let oid = self.allocate_oid(temporary_oids)?;
618 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619 let id = NetworkPolicyId::User(id);
620 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621 }
622
623 pub fn insert_network_policy(
624 &mut self,
625 id: NetworkPolicyId,
626 name: String,
627 rules: Vec<NetworkPolicyRule>,
628 privileges: Vec<MzAclItem>,
629 owner_id: RoleId,
630 oid: u32,
631 ) -> Result<NetworkPolicyId, CatalogError> {
632 match self.network_policies.insert(
633 NetworkPolicyKey { id },
634 NetworkPolicyValue {
635 name: name.clone(),
636 rules,
637 privileges,
638 owner_id,
639 oid,
640 },
641 self.op_id,
642 ) {
643 Ok(_) => Ok(id),
644 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645 }
646 }
647
648 pub fn update_introspection_source_index_gids(
653 &mut self,
654 mappings: impl Iterator<
655 Item = (
656 ClusterId,
657 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658 ),
659 >,
660 ) -> Result<(), CatalogError> {
661 for (cluster_id, updates) in mappings {
662 for (name, item_id, index_id, oid) in updates {
663 let introspection_source_index = IntrospectionSourceIndex {
664 cluster_id,
665 name,
666 item_id,
667 index_id,
668 oid,
669 };
670 let (key, value) = introspection_source_index.into_key_value();
671
672 let prev = self
673 .introspection_sources
674 .set(key, Some(value), self.op_id)?;
675 if prev.is_none() {
676 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677 "{index_id}"
678 ))
679 .into());
680 }
681 }
682 }
683 Ok(())
684 }
685
686 pub fn insert_user_item(
687 &mut self,
688 id: CatalogItemId,
689 global_id: GlobalId,
690 schema_id: SchemaId,
691 item_name: &str,
692 create_sql: String,
693 owner_id: RoleId,
694 privileges: Vec<MzAclItem>,
695 temporary_oids: &HashSet<u32>,
696 versions: BTreeMap<RelationVersion, GlobalId>,
697 ) -> Result<u32, CatalogError> {
698 let oid = self.allocate_oid(temporary_oids)?;
699 self.insert_item(
700 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701 )?;
702 Ok(oid)
703 }
704
705 pub fn insert_item(
706 &mut self,
707 id: CatalogItemId,
708 oid: u32,
709 global_id: GlobalId,
710 schema_id: SchemaId,
711 item_name: &str,
712 create_sql: String,
713 owner_id: RoleId,
714 privileges: Vec<MzAclItem>,
715 extra_versions: BTreeMap<RelationVersion, GlobalId>,
716 ) -> Result<(), CatalogError> {
717 match self.items.insert(
718 ItemKey { id },
719 ItemValue {
720 schema_id,
721 name: item_name.to_string(),
722 create_sql,
723 owner_id,
724 privileges,
725 oid,
726 global_id,
727 extra_versions,
728 },
729 self.op_id,
730 ) {
731 Ok(_) => Ok(()),
732 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733 }
734 }
735
736 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738 }
739
740 pub fn get_and_increment_id_by(
741 &mut self,
742 key: String,
743 amount: u64,
744 ) -> Result<Vec<u64>, CatalogError> {
745 assert!(
746 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747 "system item IDs cannot be allocated outside of bootstrap"
748 );
749
750 let current_id = self
751 .id_allocator
752 .items()
753 .get(&IdAllocKey { name: key.clone() })
754 .unwrap_or_else(|| panic!("{key} id allocator missing"))
755 .next_id;
756 let next_id = current_id
757 .checked_add(amount)
758 .ok_or(SqlCatalogError::IdExhaustion)?;
759 let prev = self.id_allocator.set(
760 IdAllocKey { name: key },
761 Some(IdAllocValue { next_id }),
762 self.op_id,
763 )?;
764 assert_eq!(
765 prev,
766 Some(IdAllocValue {
767 next_id: current_id
768 })
769 );
770 Ok((current_id..next_id).collect())
771 }
772
773 pub fn allocate_system_item_ids(
774 &mut self,
775 amount: u64,
776 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777 assert!(
778 !self.durable_catalog.is_bootstrap_complete(),
779 "we can only allocate system item IDs during bootstrap"
780 );
781 Ok(self
782 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783 .into_iter()
784 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786 .collect())
787 }
788
789 pub fn allocate_introspection_source_index_id(
821 cluster_id: &ClusterId,
822 log_variant: LogVariant,
823 ) -> (CatalogItemId, GlobalId) {
824 let cluster_variant: u8 = match cluster_id {
825 ClusterId::System(_) => 1,
826 ClusterId::User(_) => 2,
827 };
828 let cluster_id: u64 = cluster_id.inner_id();
829 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830 assert_eq!(
831 CLUSTER_ID_MASK & cluster_id,
832 0,
833 "invalid cluster ID: {cluster_id}"
834 );
835 let log_variant: u8 = match log_variant {
836 LogVariant::Timely(TimelyLog::Operates) => 1,
837 LogVariant::Timely(TimelyLog::Channels) => 2,
838 LogVariant::Timely(TimelyLog::Elapsed) => 3,
839 LogVariant::Timely(TimelyLog::Histogram) => 4,
840 LogVariant::Timely(TimelyLog::Addresses) => 5,
841 LogVariant::Timely(TimelyLog::Parks) => 6,
842 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844 LogVariant::Timely(TimelyLog::Reachability) => 9,
845 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849 LogVariant::Differential(DifferentialLog::Sharing) => 14,
850 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
863 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
864 LogVariant::Compute(ComputeLog::LirMapping) => 30,
865 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
866 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
867 };
868
869 let mut id: u64 = u64::from(cluster_variant) << 56;
870 id |= cluster_id << 8;
871 id |= u64::from(log_variant);
872
873 (
874 CatalogItemId::IntrospectionSourceIndex(id),
875 GlobalId::IntrospectionSourceIndex(id),
876 )
877 }
878
879 pub fn allocate_user_item_ids(
880 &mut self,
881 amount: u64,
882 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
883 Ok(self
884 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
885 .into_iter()
886 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
888 .collect())
889 }
890
891 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
892 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
893 Ok(ReplicaId::User(id))
894 }
895
896 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
897 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
898 Ok(ReplicaId::System(id))
899 }
900
901 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
902 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
903 }
904
905 pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
906 self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
907 }
908
909 #[mz_ore::instrument]
912 fn allocate_oids(
913 &mut self,
914 amount: u64,
915 temporary_oids: &HashSet<u32>,
916 ) -> Result<Vec<u32>, CatalogError> {
917 struct UserOid(u32);
920
921 impl UserOid {
922 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
923 if oid < FIRST_USER_OID {
924 Err(anyhow!("invalid user OID {oid}"))
925 } else {
926 Ok(UserOid(oid))
927 }
928 }
929 }
930
931 impl std::ops::AddAssign<u32> for UserOid {
932 fn add_assign(&mut self, rhs: u32) {
933 let (res, overflow) = self.0.overflowing_add(rhs);
934 self.0 = if overflow { FIRST_USER_OID + res } else { res };
935 }
936 }
937
938 if amount > u32::MAX.into() {
939 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
940 }
941
942 let mut allocated_oids = HashSet::with_capacity(
948 self.databases.len()
949 + self.schemas.len()
950 + self.roles.len()
951 + self.items.len()
952 + self.introspection_sources.len()
953 + temporary_oids.len(),
954 );
955 self.databases.for_values(|_, value| {
956 allocated_oids.insert(value.oid);
957 });
958 self.schemas.for_values(|_, value| {
959 allocated_oids.insert(value.oid);
960 });
961 self.roles.for_values(|_, value| {
962 allocated_oids.insert(value.oid);
963 });
964 self.items.for_values(|_, value| {
965 allocated_oids.insert(value.oid);
966 });
967 self.introspection_sources.for_values(|_, value| {
968 allocated_oids.insert(value.oid);
969 });
970
971 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
972
973 let start_oid: u32 = self
974 .id_allocator
975 .items()
976 .get(&IdAllocKey {
977 name: OID_ALLOC_KEY.to_string(),
978 })
979 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
980 .next_id
981 .try_into()
982 .expect("we should never persist an oid outside of the u32 range");
983 let mut current_oid = UserOid::new(start_oid)
984 .expect("we should never persist an oid outside of user OID range");
985 let mut oids = Vec::new();
986 while oids.len() < u64_to_usize(amount) {
987 if !is_allocated(current_oid.0) {
988 oids.push(current_oid.0);
989 }
990 current_oid += 1;
991
992 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
993 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
995 }
996 }
997
998 let next_id = current_oid.0;
999 let prev = self.id_allocator.set(
1000 IdAllocKey {
1001 name: OID_ALLOC_KEY.to_string(),
1002 },
1003 Some(IdAllocValue {
1004 next_id: next_id.into(),
1005 }),
1006 self.op_id,
1007 )?;
1008 assert_eq!(
1009 prev,
1010 Some(IdAllocValue {
1011 next_id: start_oid.into(),
1012 })
1013 );
1014
1015 Ok(oids)
1016 }
1017
1018 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1021 self.allocate_oids(1, temporary_oids)
1022 .map(|oids| oids.into_element())
1023 }
1024
1025 pub(crate) fn insert_id_allocator(
1026 &mut self,
1027 name: String,
1028 next_id: u64,
1029 ) -> Result<(), CatalogError> {
1030 match self.id_allocator.insert(
1031 IdAllocKey { name: name.clone() },
1032 IdAllocValue { next_id },
1033 self.op_id,
1034 ) {
1035 Ok(_) => Ok(()),
1036 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1037 }
1038 }
1039
1040 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1047 let prev = self
1048 .databases
1049 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1050 if prev.is_some() {
1051 Ok(())
1052 } else {
1053 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1054 }
1055 }
1056
1057 pub fn remove_databases(
1064 &mut self,
1065 databases: &BTreeSet<DatabaseId>,
1066 ) -> Result<(), CatalogError> {
1067 if databases.is_empty() {
1068 return Ok(());
1069 }
1070
1071 let to_remove = databases
1072 .iter()
1073 .map(|id| (DatabaseKey { id: *id }, None))
1074 .collect();
1075 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1076 prev.retain(|_k, val| val.is_none());
1077
1078 if !prev.is_empty() {
1079 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1080 return Err(SqlCatalogError::UnknownDatabase(err).into());
1081 }
1082
1083 Ok(())
1084 }
1085
1086 pub fn remove_schema(
1093 &mut self,
1094 database_id: &Option<DatabaseId>,
1095 schema_id: &SchemaId,
1096 ) -> Result<(), CatalogError> {
1097 let prev = self
1098 .schemas
1099 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1100 if prev.is_some() {
1101 Ok(())
1102 } else {
1103 let database_name = match database_id {
1104 Some(id) => format!("{id}."),
1105 None => "".to_string(),
1106 };
1107 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1108 }
1109 }
1110
1111 pub fn remove_schemas(
1118 &mut self,
1119 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1120 ) -> Result<(), CatalogError> {
1121 if schemas.is_empty() {
1122 return Ok(());
1123 }
1124
1125 let to_remove = schemas
1126 .iter()
1127 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1128 .collect();
1129 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1130 prev.retain(|_k, v| v.is_none());
1131
1132 if !prev.is_empty() {
1133 let err = prev
1134 .keys()
1135 .map(|k| {
1136 let db_spec = schemas.get(&k.id).expect("should_exist");
1137 let db_name = match db_spec {
1138 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1139 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1140 };
1141 format!("{}.{}", db_name, k.id)
1142 })
1143 .join(", ");
1144
1145 return Err(SqlCatalogError::UnknownSchema(err).into());
1146 }
1147
1148 Ok(())
1149 }
1150
1151 pub fn remove_source_references(
1152 &mut self,
1153 source_id: CatalogItemId,
1154 ) -> Result<(), CatalogError> {
1155 let deleted = self
1156 .source_references
1157 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1158 .is_some();
1159 if deleted {
1160 Ok(())
1161 } else {
1162 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1163 }
1164 }
1165
1166 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1173 assert!(
1174 roles.iter().all(|id| id.is_user()),
1175 "cannot delete non-user roles"
1176 );
1177 self.remove_roles(roles)
1178 }
1179
1180 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1187 if roles.is_empty() {
1188 return Ok(());
1189 }
1190
1191 let to_remove_keys = roles
1192 .iter()
1193 .map(|role_id| RoleKey { id: *role_id })
1194 .collect::<Vec<_>>();
1195
1196 let to_remove_roles = to_remove_keys
1197 .iter()
1198 .map(|role_key| (role_key.clone(), None))
1199 .collect();
1200
1201 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1202
1203 let to_remove_role_auth = to_remove_keys
1204 .iter()
1205 .map(|role_key| {
1206 (
1207 RoleAuthKey {
1208 role_id: role_key.id,
1209 },
1210 None,
1211 )
1212 })
1213 .collect();
1214
1215 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1216
1217 prev.retain(|_k, v| v.is_none());
1218 if !prev.is_empty() {
1219 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1220 return Err(SqlCatalogError::UnknownRole(err).into());
1221 }
1222
1223 role_auth_prev.retain(|_k, v| v.is_none());
1224 Ok(())
1228 }
1229
1230 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1237 if clusters.is_empty() {
1238 return Ok(());
1239 }
1240
1241 let to_remove = clusters
1242 .iter()
1243 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1244 .collect();
1245 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1246
1247 prev.retain(|_k, v| v.is_none());
1248 if !prev.is_empty() {
1249 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1250 return Err(SqlCatalogError::UnknownCluster(err).into());
1251 }
1252
1253 self.cluster_replicas
1259 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1260 self.introspection_sources
1261 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1262
1263 Ok(())
1264 }
1265
1266 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1273 let deleted = self
1274 .cluster_replicas
1275 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1276 .is_some();
1277 if deleted {
1278 Ok(())
1279 } else {
1280 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1281 }
1282 }
1283
1284 pub fn remove_cluster_replicas(
1291 &mut self,
1292 replicas: &BTreeSet<ReplicaId>,
1293 ) -> Result<(), CatalogError> {
1294 if replicas.is_empty() {
1295 return Ok(());
1296 }
1297
1298 let to_remove = replicas
1299 .iter()
1300 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1301 .collect();
1302 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1303
1304 prev.retain(|_k, v| v.is_none());
1305 if !prev.is_empty() {
1306 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1307 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1308 }
1309
1310 Ok(())
1311 }
1312
1313 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1320 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1321 if prev.is_some() {
1322 Ok(())
1323 } else {
1324 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1325 }
1326 }
1327
1328 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1335 if ids.is_empty() {
1336 return Ok(());
1337 }
1338
1339 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1340 let n = self.items.delete_by_keys(ks, self.op_id).len();
1341 if n == ids.len() {
1342 Ok(())
1343 } else {
1344 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1345 let mut unknown = ids.difference(&item_ids);
1346 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1347 }
1348 }
1349
1350 pub fn remove_system_object_mappings(
1357 &mut self,
1358 descriptions: BTreeSet<SystemObjectDescription>,
1359 ) -> Result<(), CatalogError> {
1360 if descriptions.is_empty() {
1361 return Ok(());
1362 }
1363
1364 let ks: Vec<_> = descriptions
1365 .clone()
1366 .into_iter()
1367 .map(|desc| GidMappingKey {
1368 schema_name: desc.schema_name,
1369 object_type: desc.object_type,
1370 object_name: desc.object_name,
1371 })
1372 .collect();
1373 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1374
1375 if n == descriptions.len() {
1376 Ok(())
1377 } else {
1378 let item_descriptions = self
1379 .system_gid_mapping
1380 .items()
1381 .keys()
1382 .map(|k| SystemObjectDescription {
1383 schema_name: k.schema_name.clone(),
1384 object_type: k.object_type.clone(),
1385 object_name: k.object_name.clone(),
1386 })
1387 .collect();
1388 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1389 format!(
1390 "{} {}.{}",
1391 desc.object_type, desc.schema_name, desc.object_name
1392 )
1393 });
1394 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1395 }
1396 }
1397
1398 pub fn remove_introspection_source_indexes(
1405 &mut self,
1406 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1407 ) -> Result<(), CatalogError> {
1408 if introspection_source_indexes.is_empty() {
1409 return Ok(());
1410 }
1411
1412 let ks: Vec<_> = introspection_source_indexes
1413 .clone()
1414 .into_iter()
1415 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1416 .collect();
1417 let n = self
1418 .introspection_sources
1419 .delete_by_keys(ks, self.op_id)
1420 .len();
1421 if n == introspection_source_indexes.len() {
1422 Ok(())
1423 } else {
1424 let txn_indexes = self
1425 .introspection_sources
1426 .items()
1427 .keys()
1428 .map(|k| (k.cluster_id, k.name.clone()))
1429 .collect();
1430 let mut unknown = introspection_source_indexes
1431 .difference(&txn_indexes)
1432 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1433 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1434 }
1435 }
1436
1437 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1444 let updated =
1445 self.items
1446 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1447 if updated {
1448 Ok(())
1449 } else {
1450 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1451 }
1452 }
1453
1454 pub fn update_items(
1462 &mut self,
1463 items: BTreeMap<CatalogItemId, Item>,
1464 ) -> Result<(), CatalogError> {
1465 if items.is_empty() {
1466 return Ok(());
1467 }
1468
1469 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1470 let kvs: Vec<_> = items
1471 .clone()
1472 .into_iter()
1473 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1474 .collect();
1475 let n = self.items.update_by_keys(kvs, self.op_id)?;
1476 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1477 if n == update_ids.len() {
1478 Ok(())
1479 } else {
1480 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1481 let mut unknown = update_ids.difference(&item_ids);
1482 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1483 }
1484 }
1485
1486 pub fn update_role(
1494 &mut self,
1495 id: RoleId,
1496 role: Role,
1497 password: PasswordAction,
1498 ) -> Result<(), CatalogError> {
1499 let key = RoleKey { id };
1500 if self.roles.get(&key).is_some() {
1501 let auth_key = RoleAuthKey { role_id: id };
1502
1503 match password {
1504 PasswordAction::Set(new_password) => {
1505 let hash = mz_auth::hash::scram256_hash(
1506 &new_password.password,
1507 &new_password.scram_iterations,
1508 )
1509 .expect("password hash should be valid");
1510 let value = RoleAuthValue {
1511 password_hash: Some(hash),
1512 updated_at: SYSTEM_TIME(),
1513 };
1514
1515 if self.role_auth.get(&auth_key).is_some() {
1516 self.role_auth
1517 .update_by_key(auth_key.clone(), value, self.op_id)?;
1518 } else {
1519 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1520 }
1521 }
1522 PasswordAction::Clear => {
1523 let value = RoleAuthValue {
1524 password_hash: None,
1525 updated_at: SYSTEM_TIME(),
1526 };
1527 if self.role_auth.get(&auth_key).is_some() {
1528 self.role_auth
1529 .update_by_key(auth_key.clone(), value, self.op_id)?;
1530 }
1531 }
1532 PasswordAction::NoChange => {}
1533 }
1534
1535 self.roles
1536 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1537
1538 Ok(())
1539 } else {
1540 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1541 }
1542 }
1543
1544 pub fn update_roles_without_auth(
1555 &mut self,
1556 roles: BTreeMap<RoleId, Role>,
1557 ) -> Result<(), CatalogError> {
1558 if roles.is_empty() {
1559 return Ok(());
1560 }
1561
1562 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1563 let kvs: Vec<_> = roles
1564 .into_iter()
1565 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1566 .collect();
1567 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1568 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1569
1570 if n == update_role_ids.len() {
1571 Ok(())
1572 } else {
1573 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1574 let mut unknown = update_role_ids.difference(&role_ids);
1575 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1576 }
1577 }
1578
1579 pub fn update_system_object_mappings(
1584 &mut self,
1585 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1586 ) -> Result<(), CatalogError> {
1587 if mappings.is_empty() {
1588 return Ok(());
1589 }
1590
1591 let n = self.system_gid_mapping.update(
1592 |_k, v| {
1593 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1594 let (_, new_value) = mapping.clone().into_key_value();
1595 Some(new_value)
1596 } else {
1597 None
1598 }
1599 },
1600 self.op_id,
1601 )?;
1602
1603 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1604 != mappings.len()
1605 {
1606 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1607 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1608 }
1609
1610 Ok(())
1611 }
1612
1613 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1620 let updated = self.clusters.update_by_key(
1621 ClusterKey { id },
1622 cluster.into_key_value().1,
1623 self.op_id,
1624 )?;
1625 if updated {
1626 Ok(())
1627 } else {
1628 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1629 }
1630 }
1631
1632 pub fn update_cluster_replica(
1639 &mut self,
1640 replica_id: ReplicaId,
1641 replica: ClusterReplica,
1642 ) -> Result<(), CatalogError> {
1643 let updated = self.cluster_replicas.update_by_key(
1644 ClusterReplicaKey { id: replica_id },
1645 replica.into_key_value().1,
1646 self.op_id,
1647 )?;
1648 if updated {
1649 Ok(())
1650 } else {
1651 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1652 }
1653 }
1654
1655 pub fn update_database(
1662 &mut self,
1663 id: DatabaseId,
1664 database: Database,
1665 ) -> Result<(), CatalogError> {
1666 let updated = self.databases.update_by_key(
1667 DatabaseKey { id },
1668 database.into_key_value().1,
1669 self.op_id,
1670 )?;
1671 if updated {
1672 Ok(())
1673 } else {
1674 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1675 }
1676 }
1677
1678 pub fn update_schema(
1685 &mut self,
1686 schema_id: SchemaId,
1687 schema: Schema,
1688 ) -> Result<(), CatalogError> {
1689 let updated = self.schemas.update_by_key(
1690 SchemaKey { id: schema_id },
1691 schema.into_key_value().1,
1692 self.op_id,
1693 )?;
1694 if updated {
1695 Ok(())
1696 } else {
1697 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1698 }
1699 }
1700
1701 pub fn update_network_policy(
1708 &mut self,
1709 id: NetworkPolicyId,
1710 network_policy: NetworkPolicy,
1711 ) -> Result<(), CatalogError> {
1712 let updated = self.network_policies.update_by_key(
1713 NetworkPolicyKey { id },
1714 network_policy.into_key_value().1,
1715 self.op_id,
1716 )?;
1717 if updated {
1718 Ok(())
1719 } else {
1720 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1721 }
1722 }
1723 pub fn remove_network_policies(
1730 &mut self,
1731 network_policies: &BTreeSet<NetworkPolicyId>,
1732 ) -> Result<(), CatalogError> {
1733 if network_policies.is_empty() {
1734 return Ok(());
1735 }
1736
1737 let to_remove = network_policies
1738 .iter()
1739 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1740 .collect();
1741 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1742 assert!(
1743 prev.iter().all(|(k, _)| k.id.is_user()),
1744 "cannot delete non-user network policy"
1745 );
1746
1747 prev.retain(|_k, v| v.is_none());
1748 if !prev.is_empty() {
1749 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1750 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1751 }
1752
1753 Ok(())
1754 }
1755 pub fn set_default_privilege(
1759 &mut self,
1760 role_id: RoleId,
1761 database_id: Option<DatabaseId>,
1762 schema_id: Option<SchemaId>,
1763 object_type: ObjectType,
1764 grantee: RoleId,
1765 privileges: Option<AclMode>,
1766 ) -> Result<(), CatalogError> {
1767 self.default_privileges.set(
1768 DefaultPrivilegesKey {
1769 role_id,
1770 database_id,
1771 schema_id,
1772 object_type,
1773 grantee,
1774 },
1775 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1776 self.op_id,
1777 )?;
1778 Ok(())
1779 }
1780
1781 pub fn set_default_privileges(
1783 &mut self,
1784 default_privileges: Vec<DefaultPrivilege>,
1785 ) -> Result<(), CatalogError> {
1786 if default_privileges.is_empty() {
1787 return Ok(());
1788 }
1789
1790 let default_privileges = default_privileges
1791 .into_iter()
1792 .map(DurableType::into_key_value)
1793 .map(|(k, v)| (k, Some(v)))
1794 .collect();
1795 self.default_privileges
1796 .set_many(default_privileges, self.op_id)?;
1797 Ok(())
1798 }
1799
1800 pub fn set_system_privilege(
1804 &mut self,
1805 grantee: RoleId,
1806 grantor: RoleId,
1807 acl_mode: Option<AclMode>,
1808 ) -> Result<(), CatalogError> {
1809 self.system_privileges.set(
1810 SystemPrivilegesKey { grantee, grantor },
1811 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1812 self.op_id,
1813 )?;
1814 Ok(())
1815 }
1816
1817 pub fn set_system_privileges(
1819 &mut self,
1820 system_privileges: Vec<MzAclItem>,
1821 ) -> Result<(), CatalogError> {
1822 if system_privileges.is_empty() {
1823 return Ok(());
1824 }
1825
1826 let system_privileges = system_privileges
1827 .into_iter()
1828 .map(DurableType::into_key_value)
1829 .map(|(k, v)| (k, Some(v)))
1830 .collect();
1831 self.system_privileges
1832 .set_many(system_privileges, self.op_id)?;
1833 Ok(())
1834 }
1835
1836 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1838 self.settings.set(
1839 SettingKey { name },
1840 value.map(|value| SettingValue { value }),
1841 self.op_id,
1842 )?;
1843 Ok(())
1844 }
1845
1846 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1847 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1848 }
1849
1850 pub fn insert_introspection_source_indexes(
1852 &mut self,
1853 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1854 temporary_oids: &HashSet<u32>,
1855 ) -> Result<(), CatalogError> {
1856 if introspection_source_indexes.is_empty() {
1857 return Ok(());
1858 }
1859
1860 let amount = usize_to_u64(introspection_source_indexes.len());
1861 let oids = self.allocate_oids(amount, temporary_oids)?;
1862 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1863 .into_iter()
1864 .zip_eq(oids)
1865 .map(
1866 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1867 cluster_id,
1868 name,
1869 item_id,
1870 index_id,
1871 oid,
1872 },
1873 )
1874 .collect();
1875
1876 for introspection_source_index in introspection_source_indexes {
1877 let (key, value) = introspection_source_index.into_key_value();
1878 self.introspection_sources.insert(key, value, self.op_id)?;
1879 }
1880
1881 Ok(())
1882 }
1883
1884 pub fn set_system_object_mappings(
1886 &mut self,
1887 mappings: Vec<SystemObjectMapping>,
1888 ) -> Result<(), CatalogError> {
1889 if mappings.is_empty() {
1890 return Ok(());
1891 }
1892
1893 let mappings = mappings
1894 .into_iter()
1895 .map(DurableType::into_key_value)
1896 .map(|(k, v)| (k, Some(v)))
1897 .collect();
1898 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1899 Ok(())
1900 }
1901
1902 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1904 if replicas.is_empty() {
1905 return Ok(());
1906 }
1907
1908 let replicas = replicas
1909 .into_iter()
1910 .map(DurableType::into_key_value)
1911 .map(|(k, v)| (k, Some(v)))
1912 .collect();
1913 self.cluster_replicas.set_many(replicas, self.op_id)?;
1914 Ok(())
1915 }
1916
1917 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1919 match value {
1920 Some(value) => {
1921 let config = Config { key, value };
1922 let (key, value) = config.into_key_value();
1923 self.configs.set(key, Some(value), self.op_id)?;
1924 }
1925 None => {
1926 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1927 }
1928 }
1929 Ok(())
1930 }
1931
1932 pub fn get_config(&self, key: String) -> Option<u64> {
1934 self.configs
1935 .get(&ConfigKey { key })
1936 .map(|entry| entry.value)
1937 }
1938
1939 pub fn get_setting(&self, name: String) -> Option<&str> {
1941 self.settings
1942 .get(&SettingKey { name })
1943 .map(|entry| &*entry.value)
1944 }
1945
1946 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1947 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1948 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1949 }
1950
1951 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1952 self.set_setting(
1953 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1954 Some(shard_id.to_string()),
1955 )
1956 }
1957
1958 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1959 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1960 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1961 }
1962
1963 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1964 self.set_setting(
1965 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1966 Some(shard_id.to_string()),
1967 )
1968 }
1969
1970 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1976 self.set_config(
1977 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1978 Some(
1979 value
1980 .as_millis()
1981 .try_into()
1982 .expect("max wait fits into u64"),
1983 ),
1984 )
1985 }
1986
1987 pub fn set_0dt_deployment_ddl_check_interval(
1994 &mut self,
1995 value: Duration,
1996 ) -> Result<(), CatalogError> {
1997 self.set_config(
1998 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
1999 Some(
2000 value
2001 .as_millis()
2002 .try_into()
2003 .expect("ddl check interval fits into u64"),
2004 ),
2005 )
2006 }
2007
2008 pub fn set_enable_0dt_deployment_panic_after_timeout(
2014 &mut self,
2015 value: bool,
2016 ) -> Result<(), CatalogError> {
2017 self.set_config(
2018 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2019 Some(u64::from(value)),
2020 )
2021 }
2022
2023 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2029 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2030 }
2031
2032 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2039 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2040 }
2041
2042 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2049 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2050 }
2051
2052 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2054 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2055 }
2056
2057 pub fn update_comment(
2058 &mut self,
2059 object_id: CommentObjectId,
2060 sub_component: Option<usize>,
2061 comment: Option<String>,
2062 ) -> Result<(), CatalogError> {
2063 let key = CommentKey {
2064 object_id,
2065 sub_component,
2066 };
2067 let value = comment.map(|c| CommentValue { comment: c });
2068 self.comments.set(key, value, self.op_id)?;
2069
2070 Ok(())
2071 }
2072
2073 pub fn drop_comments(
2074 &mut self,
2075 object_ids: &BTreeSet<CommentObjectId>,
2076 ) -> Result<(), CatalogError> {
2077 if object_ids.is_empty() {
2078 return Ok(());
2079 }
2080
2081 self.comments
2082 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2083 Ok(())
2084 }
2085
2086 pub fn update_source_references(
2087 &mut self,
2088 source_id: CatalogItemId,
2089 references: Vec<SourceReference>,
2090 updated_at: u64,
2091 ) -> Result<(), CatalogError> {
2092 let key = SourceReferencesKey { source_id };
2093 let value = SourceReferencesValue {
2094 references,
2095 updated_at,
2096 };
2097 self.source_references.set(key, Some(value), self.op_id)?;
2098 Ok(())
2099 }
2100
2101 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2103 let key = ServerConfigurationKey {
2104 name: name.to_string(),
2105 };
2106 let value = ServerConfigurationValue { value };
2107 self.system_configurations
2108 .set(key, Some(value), self.op_id)?;
2109 Ok(())
2110 }
2111
2112 pub fn remove_system_config(&mut self, name: &str) {
2114 let key = ServerConfigurationKey {
2115 name: name.to_string(),
2116 };
2117 self.system_configurations
2118 .set(key, None, self.op_id)
2119 .expect("cannot have uniqueness violation");
2120 }
2121
2122 pub fn clear_system_configs(&mut self) {
2124 self.system_configurations.delete(|_k, _v| true, self.op_id);
2125 }
2126
2127 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2128 match self.configs.insert(
2129 ConfigKey { key: key.clone() },
2130 ConfigValue { value },
2131 self.op_id,
2132 ) {
2133 Ok(_) => Ok(()),
2134 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2135 }
2136 }
2137
2138 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2139 self.clusters
2140 .items()
2141 .into_iter()
2142 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2143 }
2144
2145 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2146 self.cluster_replicas
2147 .items()
2148 .into_iter()
2149 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2150 }
2151
2152 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2153 self.roles
2154 .items()
2155 .into_iter()
2156 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2157 }
2158
2159 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2160 self.network_policies
2161 .items()
2162 .into_iter()
2163 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2164 }
2165
2166 pub fn get_system_object_mappings(
2167 &self,
2168 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2169 self.system_gid_mapping
2170 .items()
2171 .into_iter()
2172 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2173 }
2174
2175 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2176 self.schemas
2177 .items()
2178 .into_iter()
2179 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2180 }
2181
2182 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2183 self.system_configurations
2184 .items()
2185 .into_iter()
2186 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2187 }
2188
2189 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2190 let key = SchemaKey { id: *id };
2191 self.schemas
2192 .get(&key)
2193 .map(|v| DurableType::from_key_value(key, v.clone()))
2194 }
2195
2196 pub fn get_introspection_source_indexes(
2197 &self,
2198 cluster_id: ClusterId,
2199 ) -> BTreeMap<&str, (GlobalId, u32)> {
2200 self.introspection_sources
2201 .items()
2202 .into_iter()
2203 .filter(|(k, _v)| k.cluster_id == cluster_id)
2204 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2205 .collect()
2206 }
2207
2208 pub fn get_catalog_content_version(&self) -> Option<&str> {
2209 self.settings
2210 .get(&SettingKey {
2211 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2212 })
2213 .map(|value| &*value.value)
2214 }
2215
2216 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2217 self.settings
2218 .get(&SettingKey {
2219 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2220 })
2221 .map(|value| value.value.clone())
2222 }
2223
2224 #[must_use]
2230 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2231 let updates = self.get_op_updates();
2232 self.commit_op();
2233 updates
2234 }
2235
2236 fn get_op_updates(&self) -> Vec<StateUpdate> {
2237 fn get_collection_op_updates<'a, T>(
2238 table_txn: &'a TableTransaction<T::Key, T::Value>,
2239 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2240 op: Timestamp,
2241 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2242 where
2243 T::Key: Ord + Eq + Clone + Debug,
2244 T::Value: Ord + Clone + Debug,
2245 T: DurableType,
2246 {
2247 table_txn
2248 .pending
2249 .iter()
2250 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2251 .filter_map(move |(k, v)| {
2252 if v.ts == op {
2253 let key = k.clone();
2254 let value = v.value.clone();
2255 let diff = v.diff.clone().try_into().expect("invalid diff");
2256 let update = DurableType::from_key_value(key, value);
2257 let kind = kind_fn(update);
2258 Some((kind, diff))
2259 } else {
2260 None
2261 }
2262 })
2263 }
2264
2265 fn get_large_collection_op_updates<'a, T>(
2266 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2267 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2268 op: Timestamp,
2269 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2270 where
2271 T::Key: Ord + Eq + Clone + Debug,
2272 T: DurableType<Value = ()>,
2273 {
2274 collection.iter().filter_map(move |(k, diff, ts)| {
2275 if *ts == op {
2276 let key = k.clone();
2277 let diff = diff.clone().try_into().expect("invalid diff");
2278 let update = DurableType::from_key_value(key, ());
2279 let kind = kind_fn(update);
2280 Some((kind, diff))
2281 } else {
2282 None
2283 }
2284 })
2285 }
2286
2287 let Transaction {
2288 durable_catalog: _,
2289 databases,
2290 schemas,
2291 items,
2292 comments,
2293 roles,
2294 role_auth,
2295 clusters,
2296 network_policies,
2297 cluster_replicas,
2298 introspection_sources,
2299 system_gid_mapping,
2300 system_configurations,
2301 default_privileges,
2302 source_references,
2303 system_privileges,
2304 audit_log_updates,
2305 storage_collection_metadata,
2306 unfinalized_shards,
2307 id_allocator: _,
2309 configs: _,
2310 settings: _,
2311 txn_wal_shard: _,
2312 upper,
2313 op_id: _,
2314 } = &self;
2315
2316 let updates = std::iter::empty()
2317 .chain(get_collection_op_updates(
2318 roles,
2319 StateUpdateKind::Role,
2320 self.op_id,
2321 ))
2322 .chain(get_collection_op_updates(
2323 role_auth,
2324 StateUpdateKind::RoleAuth,
2325 self.op_id,
2326 ))
2327 .chain(get_collection_op_updates(
2328 databases,
2329 StateUpdateKind::Database,
2330 self.op_id,
2331 ))
2332 .chain(get_collection_op_updates(
2333 schemas,
2334 StateUpdateKind::Schema,
2335 self.op_id,
2336 ))
2337 .chain(get_collection_op_updates(
2338 default_privileges,
2339 StateUpdateKind::DefaultPrivilege,
2340 self.op_id,
2341 ))
2342 .chain(get_collection_op_updates(
2343 system_privileges,
2344 StateUpdateKind::SystemPrivilege,
2345 self.op_id,
2346 ))
2347 .chain(get_collection_op_updates(
2348 system_configurations,
2349 StateUpdateKind::SystemConfiguration,
2350 self.op_id,
2351 ))
2352 .chain(get_collection_op_updates(
2353 clusters,
2354 StateUpdateKind::Cluster,
2355 self.op_id,
2356 ))
2357 .chain(get_collection_op_updates(
2358 network_policies,
2359 StateUpdateKind::NetworkPolicy,
2360 self.op_id,
2361 ))
2362 .chain(get_collection_op_updates(
2363 introspection_sources,
2364 StateUpdateKind::IntrospectionSourceIndex,
2365 self.op_id,
2366 ))
2367 .chain(get_collection_op_updates(
2368 cluster_replicas,
2369 StateUpdateKind::ClusterReplica,
2370 self.op_id,
2371 ))
2372 .chain(get_collection_op_updates(
2373 system_gid_mapping,
2374 StateUpdateKind::SystemObjectMapping,
2375 self.op_id,
2376 ))
2377 .chain(get_collection_op_updates(
2378 items,
2379 StateUpdateKind::Item,
2380 self.op_id,
2381 ))
2382 .chain(get_collection_op_updates(
2383 comments,
2384 StateUpdateKind::Comment,
2385 self.op_id,
2386 ))
2387 .chain(get_collection_op_updates(
2388 source_references,
2389 StateUpdateKind::SourceReferences,
2390 self.op_id,
2391 ))
2392 .chain(get_collection_op_updates(
2393 storage_collection_metadata,
2394 StateUpdateKind::StorageCollectionMetadata,
2395 self.op_id,
2396 ))
2397 .chain(get_collection_op_updates(
2398 unfinalized_shards,
2399 StateUpdateKind::UnfinalizedShard,
2400 self.op_id,
2401 ))
2402 .chain(get_large_collection_op_updates(
2403 audit_log_updates,
2404 StateUpdateKind::AuditLog,
2405 self.op_id,
2406 ))
2407 .map(|(kind, diff)| StateUpdate {
2408 kind,
2409 ts: upper.clone(),
2410 diff,
2411 })
2412 .collect();
2413
2414 updates
2415 }
2416
2417 pub fn is_savepoint(&self) -> bool {
2418 self.durable_catalog.is_savepoint()
2419 }
2420
2421 fn commit_op(&mut self) {
2422 self.op_id += 1;
2423 }
2424
2425 pub fn op_id(&self) -> Timestamp {
2426 self.op_id
2427 }
2428
2429 pub fn upper(&self) -> mz_repr::Timestamp {
2430 self.upper
2431 }
2432
2433 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2434 let audit_log_updates = self
2435 .audit_log_updates
2436 .into_iter()
2437 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2438 .collect();
2439
2440 let txn_batch = TransactionBatch {
2441 databases: self.databases.pending(),
2442 schemas: self.schemas.pending(),
2443 items: self.items.pending(),
2444 comments: self.comments.pending(),
2445 roles: self.roles.pending(),
2446 role_auth: self.role_auth.pending(),
2447 clusters: self.clusters.pending(),
2448 cluster_replicas: self.cluster_replicas.pending(),
2449 network_policies: self.network_policies.pending(),
2450 introspection_sources: self.introspection_sources.pending(),
2451 id_allocator: self.id_allocator.pending(),
2452 configs: self.configs.pending(),
2453 source_references: self.source_references.pending(),
2454 settings: self.settings.pending(),
2455 system_gid_mapping: self.system_gid_mapping.pending(),
2456 system_configurations: self.system_configurations.pending(),
2457 default_privileges: self.default_privileges.pending(),
2458 system_privileges: self.system_privileges.pending(),
2459 storage_collection_metadata: self.storage_collection_metadata.pending(),
2460 unfinalized_shards: self.unfinalized_shards.pending(),
2461 txn_wal_shard: self.txn_wal_shard.pending(),
2462 audit_log_updates,
2463 upper: self.upper,
2464 };
2465 (txn_batch, self.durable_catalog)
2466 }
2467
2468 #[mz_ore::instrument(level = "debug")]
2480 pub(crate) async fn commit_internal(
2481 self,
2482 commit_ts: mz_repr::Timestamp,
2483 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2484 let (mut txn_batch, durable_catalog) = self.into_parts();
2485 let TransactionBatch {
2486 databases,
2487 schemas,
2488 items,
2489 comments,
2490 roles,
2491 role_auth,
2492 clusters,
2493 cluster_replicas,
2494 network_policies,
2495 introspection_sources,
2496 id_allocator,
2497 configs,
2498 source_references,
2499 settings,
2500 system_gid_mapping,
2501 system_configurations,
2502 default_privileges,
2503 system_privileges,
2504 storage_collection_metadata,
2505 unfinalized_shards,
2506 txn_wal_shard,
2507 audit_log_updates,
2508 upper,
2509 } = &mut txn_batch;
2510 differential_dataflow::consolidation::consolidate_updates(databases);
2513 differential_dataflow::consolidation::consolidate_updates(schemas);
2514 differential_dataflow::consolidation::consolidate_updates(items);
2515 differential_dataflow::consolidation::consolidate_updates(comments);
2516 differential_dataflow::consolidation::consolidate_updates(roles);
2517 differential_dataflow::consolidation::consolidate_updates(role_auth);
2518 differential_dataflow::consolidation::consolidate_updates(clusters);
2519 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2520 differential_dataflow::consolidation::consolidate_updates(network_policies);
2521 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2522 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2523 differential_dataflow::consolidation::consolidate_updates(configs);
2524 differential_dataflow::consolidation::consolidate_updates(settings);
2525 differential_dataflow::consolidation::consolidate_updates(source_references);
2526 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2527 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2528 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2529 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2530 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2531 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2532 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2533 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2534
2535 assert!(
2536 commit_ts >= *upper,
2537 "expected commit ts, {}, to be greater than or equal to upper, {}",
2538 commit_ts,
2539 upper
2540 );
2541 let upper = durable_catalog
2542 .commit_transaction(txn_batch, commit_ts)
2543 .await?;
2544 Ok((durable_catalog, upper))
2545 }
2546
2547 #[mz_ore::instrument(level = "debug")]
2564 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2565 let op_updates = self.get_op_updates();
2566 assert!(
2567 op_updates.is_empty(),
2568 "unconsumed transaction updates: {op_updates:?}"
2569 );
2570
2571 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2572 let updates = durable_storage.sync_updates(upper).await?;
2574 soft_assert_no_log!(
2579 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2580 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2581 );
2582 Ok(())
2583 }
2584}
2585
2586use crate::durable::async_trait;
2587
2588use super::objects::{RoleAuthKey, RoleAuthValue};
2589
2590#[async_trait]
2591impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2592 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2593 self.storage_collection_metadata
2594 .items()
2595 .into_iter()
2596 .map(
2597 |(
2598 StorageCollectionMetadataKey { id },
2599 StorageCollectionMetadataValue { shard },
2600 )| { (*id, shard.clone()) },
2601 )
2602 .collect()
2603 }
2604
2605 fn insert_collection_metadata(
2606 &mut self,
2607 metadata: BTreeMap<GlobalId, ShardId>,
2608 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2609 for (id, shard) in metadata {
2610 self.storage_collection_metadata
2611 .insert(
2612 StorageCollectionMetadataKey { id },
2613 StorageCollectionMetadataValue {
2614 shard: shard.clone(),
2615 },
2616 self.op_id,
2617 )
2618 .map_err(|err| match err {
2619 DurableCatalogError::DuplicateKey => {
2620 StorageError::CollectionMetadataAlreadyExists(id)
2621 }
2622 DurableCatalogError::UniquenessViolation => {
2623 StorageError::PersistShardAlreadyInUse(shard)
2624 }
2625 err => StorageError::Generic(anyhow::anyhow!(err)),
2626 })?;
2627 }
2628 Ok(())
2629 }
2630
2631 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2632 let ks: Vec<_> = ids
2633 .into_iter()
2634 .map(|id| StorageCollectionMetadataKey { id })
2635 .collect();
2636 self.storage_collection_metadata
2637 .delete_by_keys(ks, self.op_id)
2638 .into_iter()
2639 .map(
2640 |(
2641 StorageCollectionMetadataKey { id },
2642 StorageCollectionMetadataValue { shard },
2643 )| (id, shard),
2644 )
2645 .collect()
2646 }
2647
2648 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2649 self.unfinalized_shards
2650 .items()
2651 .into_iter()
2652 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2653 .collect()
2654 }
2655
2656 fn insert_unfinalized_shards(
2657 &mut self,
2658 s: BTreeSet<ShardId>,
2659 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2660 for shard in s {
2661 match self
2662 .unfinalized_shards
2663 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2664 {
2665 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2667 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2668 };
2669 }
2670 Ok(())
2671 }
2672
2673 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2674 let ks: Vec<_> = shards
2675 .into_iter()
2676 .map(|shard| UnfinalizedShardKey { shard })
2677 .collect();
2678 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2679 }
2680
2681 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2682 self.txn_wal_shard
2683 .values()
2684 .iter()
2685 .next()
2686 .map(|TxnWalShardValue { shard }| *shard)
2687 }
2688
2689 fn write_txn_wal_shard(
2690 &mut self,
2691 shard: ShardId,
2692 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2693 self.txn_wal_shard
2694 .insert((), TxnWalShardValue { shard }, self.op_id)
2695 .map_err(|err| match err {
2696 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2697 err => StorageError::Generic(anyhow::anyhow!(err)),
2698 })
2699 }
2700}
2701
2702#[derive(Debug, Clone, Default, PartialEq)]
2704pub struct TransactionBatch {
2705 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2706 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2707 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2708 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2709 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2710 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2711 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2712 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2713 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2714 pub(crate) introspection_sources: Vec<(
2715 proto::ClusterIntrospectionSourceIndexKey,
2716 proto::ClusterIntrospectionSourceIndexValue,
2717 Diff,
2718 )>,
2719 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2720 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2721 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2722 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2723 pub(crate) system_configurations: Vec<(
2724 proto::ServerConfigurationKey,
2725 proto::ServerConfigurationValue,
2726 Diff,
2727 )>,
2728 pub(crate) default_privileges: Vec<(
2729 proto::DefaultPrivilegesKey,
2730 proto::DefaultPrivilegesValue,
2731 Diff,
2732 )>,
2733 pub(crate) source_references: Vec<(
2734 proto::SourceReferencesKey,
2735 proto::SourceReferencesValue,
2736 Diff,
2737 )>,
2738 pub(crate) system_privileges: Vec<(
2739 proto::SystemPrivilegesKey,
2740 proto::SystemPrivilegesValue,
2741 Diff,
2742 )>,
2743 pub(crate) storage_collection_metadata: Vec<(
2744 proto::StorageCollectionMetadataKey,
2745 proto::StorageCollectionMetadataValue,
2746 Diff,
2747 )>,
2748 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2749 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2750 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2751 pub(crate) upper: mz_repr::Timestamp,
2753}
2754
2755impl TransactionBatch {
2756 pub fn is_empty(&self) -> bool {
2757 let TransactionBatch {
2758 databases,
2759 schemas,
2760 items,
2761 comments,
2762 roles,
2763 role_auth,
2764 clusters,
2765 cluster_replicas,
2766 network_policies,
2767 introspection_sources,
2768 id_allocator,
2769 configs,
2770 settings,
2771 source_references,
2772 system_gid_mapping,
2773 system_configurations,
2774 default_privileges,
2775 system_privileges,
2776 storage_collection_metadata,
2777 unfinalized_shards,
2778 txn_wal_shard,
2779 audit_log_updates,
2780 upper: _,
2781 } = self;
2782 databases.is_empty()
2783 && schemas.is_empty()
2784 && items.is_empty()
2785 && comments.is_empty()
2786 && roles.is_empty()
2787 && role_auth.is_empty()
2788 && clusters.is_empty()
2789 && cluster_replicas.is_empty()
2790 && network_policies.is_empty()
2791 && introspection_sources.is_empty()
2792 && id_allocator.is_empty()
2793 && configs.is_empty()
2794 && settings.is_empty()
2795 && source_references.is_empty()
2796 && system_gid_mapping.is_empty()
2797 && system_configurations.is_empty()
2798 && default_privileges.is_empty()
2799 && system_privileges.is_empty()
2800 && storage_collection_metadata.is_empty()
2801 && unfinalized_shards.is_empty()
2802 && txn_wal_shard.is_empty()
2803 && audit_log_updates.is_empty()
2804 }
2805}
2806
2807#[derive(Debug, Clone, PartialEq, Eq)]
2808struct TransactionUpdate<V> {
2809 value: V,
2810 ts: Timestamp,
2811 diff: Diff,
2812}
2813
2814trait UniqueName {
2816 const HAS_UNIQUE_NAME: bool;
2819 fn unique_name(&self) -> &str;
2821}
2822
2823mod unique_name {
2824 use crate::durable::objects::*;
2825
2826 macro_rules! impl_unique_name {
2827 ($($t:ty),* $(,)?) => {
2828 $(
2829 impl crate::durable::transaction::UniqueName for $t {
2830 const HAS_UNIQUE_NAME: bool = true;
2831 fn unique_name(&self) -> &str {
2832 &self.name
2833 }
2834 }
2835 )*
2836 };
2837 }
2838
2839 macro_rules! impl_no_unique_name {
2840 ($($t:ty),* $(,)?) => {
2841 $(
2842 impl crate::durable::transaction::UniqueName for $t {
2843 const HAS_UNIQUE_NAME: bool = false;
2844 fn unique_name(&self) -> &str {
2845 ""
2846 }
2847 }
2848 )*
2849 };
2850 }
2851
2852 impl_unique_name! {
2853 ClusterReplicaValue,
2854 ClusterValue,
2855 DatabaseValue,
2856 ItemValue,
2857 NetworkPolicyValue,
2858 RoleValue,
2859 SchemaValue,
2860 }
2861
2862 impl_no_unique_name!(
2863 (),
2864 ClusterIntrospectionSourceIndexValue,
2865 CommentValue,
2866 ConfigValue,
2867 DefaultPrivilegesValue,
2868 GidMappingValue,
2869 IdAllocValue,
2870 ServerConfigurationValue,
2871 SettingValue,
2872 SourceReferencesValue,
2873 StorageCollectionMetadataValue,
2874 SystemPrivilegesValue,
2875 TxnWalShardValue,
2876 RoleAuthValue,
2877 );
2878
2879 #[cfg(test)]
2880 mod test {
2881 impl_no_unique_name!(String,);
2882 }
2883}
2884
2885#[derive(Debug)]
2895struct TableTransaction<K, V> {
2896 initial: BTreeMap<K, V>,
2897 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2900 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2901}
2902
2903impl<K, V> TableTransaction<K, V>
2904where
2905 K: Ord + Eq + Clone + Debug,
2906 V: Ord + Clone + Debug + UniqueName,
2907{
2908 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2915 where
2916 K: RustType<KP>,
2917 V: RustType<VP>,
2918 {
2919 let initial = initial
2920 .into_iter()
2921 .map(RustType::from_proto)
2922 .collect::<Result<_, _>>()?;
2923
2924 Ok(Self {
2925 initial,
2926 pending: BTreeMap::new(),
2927 uniqueness_violation: None,
2928 })
2929 }
2930
2931 fn new_with_uniqueness_fn<KP, VP>(
2934 initial: BTreeMap<KP, VP>,
2935 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2936 ) -> Result<Self, TryFromProtoError>
2937 where
2938 K: RustType<KP>,
2939 V: RustType<VP>,
2940 {
2941 let initial = initial
2942 .into_iter()
2943 .map(RustType::from_proto)
2944 .collect::<Result<_, _>>()?;
2945
2946 Ok(Self {
2947 initial,
2948 pending: BTreeMap::new(),
2949 uniqueness_violation: Some(uniqueness_violation),
2950 })
2951 }
2952
2953 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2956 where
2957 K: RustType<KP>,
2958 V: RustType<VP>,
2959 {
2960 soft_assert_no_log!(self.verify().is_ok());
2961 self.pending
2964 .into_iter()
2965 .flat_map(|(k, v)| {
2966 let mut v: Vec<_> = v
2967 .into_iter()
2968 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2969 .collect();
2970 differential_dataflow::consolidation::consolidate(&mut v);
2971 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2972 })
2973 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2974 .collect()
2975 }
2976
2977 fn verify(&self) -> Result<(), DurableCatalogError> {
2982 if let Some(uniqueness_violation) = self.uniqueness_violation {
2983 let items = self.values();
2985 if V::HAS_UNIQUE_NAME {
2986 let by_name: BTreeMap<_, _> = items
2987 .iter()
2988 .enumerate()
2989 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2990 .collect();
2991 for (i, vi) in items.iter().enumerate() {
2992 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
2993 if i != *j && uniqueness_violation(vi, *vj) {
2994 return Err(DurableCatalogError::UniquenessViolation);
2995 }
2996 }
2997 }
2998 } else {
2999 for (i, vi) in items.iter().enumerate() {
3000 for (j, vj) in items.iter().enumerate() {
3001 if i != j && uniqueness_violation(vi, vj) {
3002 return Err(DurableCatalogError::UniquenessViolation);
3003 }
3004 }
3005 }
3006 }
3007 }
3008 soft_assert_no_log!(
3009 self.pending
3010 .values()
3011 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3012 "pending should be sorted by timestamp: {:?}",
3013 self.pending
3014 );
3015 Ok(())
3016 }
3017
3018 fn verify_keys<'a>(
3023 &self,
3024 keys: impl IntoIterator<Item = &'a K>,
3025 ) -> Result<(), DurableCatalogError>
3026 where
3027 K: 'a,
3028 {
3029 if let Some(uniqueness_violation) = self.uniqueness_violation {
3030 let entries: Vec<_> = keys
3031 .into_iter()
3032 .filter_map(|key| self.get(key).map(|value| (key, value)))
3033 .collect();
3034 for (ki, vi) in self.items() {
3036 for (kj, vj) in &entries {
3037 if ki != *kj && uniqueness_violation(vi, vj) {
3038 return Err(DurableCatalogError::UniquenessViolation);
3039 }
3040 }
3041 }
3042 }
3043 soft_assert_no_log!(self.verify().is_ok());
3044 Ok(())
3045 }
3046
3047 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3050 let mut seen = BTreeSet::new();
3051 for k in self.pending.keys() {
3052 seen.insert(k);
3053 let v = self.get(k);
3054 if let Some(v) = v {
3057 f(k, v);
3058 }
3059 }
3060 for (k, v) in self.initial.iter() {
3061 if !seen.contains(k) {
3063 f(k, v);
3064 }
3065 }
3066 }
3067
3068 fn get(&self, k: &K) -> Option<&V> {
3070 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3071 let mut updates = Vec::with_capacity(pending.len() + 1);
3072 if let Some(initial) = self.initial.get(k) {
3073 updates.push((initial, Diff::ONE));
3074 }
3075 updates.extend(
3076 pending
3077 .into_iter()
3078 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3079 );
3080
3081 differential_dataflow::consolidation::consolidate(&mut updates);
3082 assert!(updates.len() <= 1);
3083 updates.into_iter().next().map(|(v, _)| v)
3084 }
3085
3086 #[cfg(test)]
3091 fn items_cloned(&self) -> BTreeMap<K, V> {
3092 let mut items = BTreeMap::new();
3093 self.for_values(|k, v| {
3094 items.insert(k.clone(), v.clone());
3095 });
3096 items
3097 }
3098
3099 fn items(&self) -> BTreeMap<&K, &V> {
3102 let mut items = BTreeMap::new();
3103 self.for_values(|k, v| {
3104 items.insert(k, v);
3105 });
3106 items
3107 }
3108
3109 fn values(&self) -> BTreeSet<&V> {
3111 let mut items = BTreeSet::new();
3112 self.for_values(|_, v| {
3113 items.insert(v);
3114 });
3115 items
3116 }
3117
3118 fn len(&self) -> usize {
3120 let mut count = 0;
3121 self.for_values(|_, _| {
3122 count += 1;
3123 });
3124 count
3125 }
3126
3127 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3131 &mut self,
3132 mut f: F,
3133 ) {
3134 let mut pending = BTreeMap::new();
3135 self.for_values(|k, v| f(&mut pending, k, v));
3136 for (k, updates) in pending {
3137 self.pending.entry(k).or_default().extend(updates);
3138 }
3139 }
3140
3141 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3145 let mut violation = None;
3146 self.for_values(|for_k, for_v| {
3147 if &k == for_k {
3148 violation = Some(DurableCatalogError::DuplicateKey);
3149 }
3150 if let Some(uniqueness_violation) = self.uniqueness_violation {
3151 if uniqueness_violation(for_v, &v) {
3152 violation = Some(DurableCatalogError::UniquenessViolation);
3153 }
3154 }
3155 });
3156 if let Some(violation) = violation {
3157 return Err(violation);
3158 }
3159 self.pending.entry(k).or_default().push(TransactionUpdate {
3160 value: v,
3161 ts,
3162 diff: Diff::ONE,
3163 });
3164 soft_assert_no_log!(self.verify().is_ok());
3165 Ok(())
3166 }
3167
3168 fn update<F: Fn(&K, &V) -> Option<V>>(
3177 &mut self,
3178 f: F,
3179 ts: Timestamp,
3180 ) -> Result<Diff, DurableCatalogError> {
3181 let mut changed = Diff::ZERO;
3182 let mut keys = BTreeSet::new();
3183 let pending = self.pending.clone();
3185 self.for_values_mut(|p, k, v| {
3186 if let Some(next) = f(k, v) {
3187 changed += Diff::ONE;
3188 keys.insert(k.clone());
3189 let updates = p.entry(k.clone()).or_default();
3190 updates.push(TransactionUpdate {
3191 value: v.clone(),
3192 ts,
3193 diff: Diff::MINUS_ONE,
3194 });
3195 updates.push(TransactionUpdate {
3196 value: next,
3197 ts,
3198 diff: Diff::ONE,
3199 });
3200 }
3201 });
3202 if let Err(err) = self.verify_keys(&keys) {
3204 self.pending = pending;
3205 Err(err)
3206 } else {
3207 Ok(changed)
3208 }
3209 }
3210
3211 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3216 if let Some(cur_v) = self.get(&k) {
3217 if v != *cur_v {
3218 self.set(k, Some(v), ts)?;
3219 }
3220 Ok(true)
3221 } else {
3222 Ok(false)
3223 }
3224 }
3225
3226 fn update_by_keys(
3231 &mut self,
3232 kvs: impl IntoIterator<Item = (K, V)>,
3233 ts: Timestamp,
3234 ) -> Result<Diff, DurableCatalogError> {
3235 let kvs: Vec<_> = kvs
3236 .into_iter()
3237 .filter_map(|(k, v)| match self.get(&k) {
3238 Some(cur_v) => Some((*cur_v == v, k, v)),
3240 None => None,
3241 })
3242 .collect();
3243 let changed = kvs.len();
3244 let changed =
3245 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3246 let kvs = kvs
3247 .into_iter()
3248 .filter(|(no_op, _, _)| !no_op)
3250 .map(|(_, k, v)| (k, Some(v)))
3251 .collect();
3252 self.set_many(kvs, ts)?;
3253 Ok(changed)
3254 }
3255
3256 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3263 let prev = self.get(&k).cloned();
3264 let entry = self.pending.entry(k.clone()).or_default();
3265 let restore_len = entry.len();
3266
3267 match (v, prev.clone()) {
3268 (Some(v), Some(prev)) => {
3269 entry.push(TransactionUpdate {
3270 value: prev,
3271 ts,
3272 diff: Diff::MINUS_ONE,
3273 });
3274 entry.push(TransactionUpdate {
3275 value: v,
3276 ts,
3277 diff: Diff::ONE,
3278 });
3279 }
3280 (Some(v), None) => {
3281 entry.push(TransactionUpdate {
3282 value: v,
3283 ts,
3284 diff: Diff::ONE,
3285 });
3286 }
3287 (None, Some(prev)) => {
3288 entry.push(TransactionUpdate {
3289 value: prev,
3290 ts,
3291 diff: Diff::MINUS_ONE,
3292 });
3293 }
3294 (None, None) => {}
3295 }
3296
3297 if let Err(err) = self.verify_keys([&k]) {
3299 let pending = self.pending.get_mut(&k).expect("inserted above");
3302 pending.truncate(restore_len);
3303 Err(err)
3304 } else {
3305 Ok(prev)
3306 }
3307 }
3308
3309 fn set_many(
3314 &mut self,
3315 kvs: BTreeMap<K, Option<V>>,
3316 ts: Timestamp,
3317 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3318 if kvs.is_empty() {
3319 return Ok(BTreeMap::new());
3320 }
3321
3322 let mut prevs = BTreeMap::new();
3323 let mut restores = BTreeMap::new();
3324
3325 for (k, v) in kvs {
3326 let prev = self.get(&k).cloned();
3327 let entry = self.pending.entry(k.clone()).or_default();
3328 restores.insert(k.clone(), entry.len());
3329
3330 match (v, prev.clone()) {
3331 (Some(v), Some(prev)) => {
3332 entry.push(TransactionUpdate {
3333 value: prev,
3334 ts,
3335 diff: Diff::MINUS_ONE,
3336 });
3337 entry.push(TransactionUpdate {
3338 value: v,
3339 ts,
3340 diff: Diff::ONE,
3341 });
3342 }
3343 (Some(v), None) => {
3344 entry.push(TransactionUpdate {
3345 value: v,
3346 ts,
3347 diff: Diff::ONE,
3348 });
3349 }
3350 (None, Some(prev)) => {
3351 entry.push(TransactionUpdate {
3352 value: prev,
3353 ts,
3354 diff: Diff::MINUS_ONE,
3355 });
3356 }
3357 (None, None) => {}
3358 }
3359
3360 prevs.insert(k, prev);
3361 }
3362
3363 if let Err(err) = self.verify_keys(prevs.keys()) {
3365 for (k, restore_len) in restores {
3366 let pending = self.pending.get_mut(&k).expect("inserted above");
3369 pending.truncate(restore_len);
3370 }
3371 Err(err)
3372 } else {
3373 Ok(prevs)
3374 }
3375 }
3376
3377 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3383 let mut deleted = Vec::new();
3384 self.for_values_mut(|p, k, v| {
3385 if f(k, v) {
3386 deleted.push((k.clone(), v.clone()));
3387 p.entry(k.clone()).or_default().push(TransactionUpdate {
3388 value: v.clone(),
3389 ts,
3390 diff: Diff::MINUS_ONE,
3391 });
3392 }
3393 });
3394 soft_assert_no_log!(self.verify().is_ok());
3395 deleted
3396 }
3397
3398 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3402 self.set(k, None, ts)
3403 .expect("deleting an entry cannot violate uniqueness")
3404 }
3405
3406 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3410 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3411 let prevs = self
3412 .set_many(kvs, ts)
3413 .expect("deleting entries cannot violate uniqueness");
3414 prevs
3415 .into_iter()
3416 .filter_map(|(k, v)| v.map(|v| (k, v)))
3417 .collect()
3418 }
3419}
3420
3421#[cfg(test)]
3422#[allow(clippy::unwrap_used)]
3423mod tests {
3424 use super::*;
3425
3426 use mz_ore::now::SYSTEM_TIME;
3427 use mz_ore::{assert_none, assert_ok};
3428 use mz_persist_client::cache::PersistClientCache;
3429 use mz_persist_types::PersistLocation;
3430 use semver::Version;
3431
3432 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3433 use crate::memory;
3434
3435 #[mz_ore::test]
3436 fn test_table_transaction_simple() {
3437 fn uniqueness_violation(a: &String, b: &String) -> bool {
3438 a == b
3439 }
3440 let mut table = TableTransaction::new_with_uniqueness_fn(
3441 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3442 uniqueness_violation,
3443 )
3444 .unwrap();
3445
3446 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3449 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3450 assert!(
3451 table
3452 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3453 .is_err()
3454 );
3455 assert!(
3456 table
3457 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3458 .is_err()
3459 );
3460 }
3461
3462 #[mz_ore::test]
3463 fn test_table_transaction() {
3464 fn uniqueness_violation(a: &String, b: &String) -> bool {
3465 a == b
3466 }
3467 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3468
3469 fn commit(
3470 table: &mut BTreeMap<Vec<u8>, String>,
3471 mut pending: Vec<(Vec<u8>, String, Diff)>,
3472 ) {
3473 pending.sort_by(|a, b| a.2.cmp(&b.2));
3475 for (k, v, diff) in pending {
3476 if diff == Diff::MINUS_ONE {
3477 let prev = table.remove(&k);
3478 assert_eq!(prev, Some(v));
3479 } else if diff == Diff::ONE {
3480 let prev = table.insert(k, v);
3481 assert_eq!(prev, None);
3482 } else {
3483 panic!("unexpected diff: {diff}");
3484 }
3485 }
3486 }
3487
3488 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3489 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3490 let mut table_txn =
3491 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3492 assert_eq!(table_txn.items_cloned(), table);
3493 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3494 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3495 assert_eq!(
3496 table_txn.items_cloned(),
3497 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3498 );
3499 assert_eq!(
3500 table_txn
3501 .update(|_k, _v| Some("v3".to_string()), 2)
3502 .unwrap(),
3503 Diff::ONE
3504 );
3505
3506 table_txn
3508 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3509 .unwrap_err();
3510
3511 table_txn
3512 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3513 .unwrap();
3514 assert_eq!(
3515 table_txn.items_cloned(),
3516 BTreeMap::from([
3517 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3518 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3519 ])
3520 );
3521 let err = table_txn
3522 .update(|_k, _v| Some("v1".to_string()), 5)
3523 .unwrap_err();
3524 assert!(
3525 matches!(err, DurableCatalogError::UniquenessViolation),
3526 "unexpected err: {err:?}"
3527 );
3528 let pending = table_txn.pending();
3529 assert_eq!(
3530 pending,
3531 vec![
3532 (
3533 1i64.to_le_bytes().to_vec(),
3534 "v1".to_string(),
3535 Diff::MINUS_ONE
3536 ),
3537 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3538 (
3539 2i64.to_le_bytes().to_vec(),
3540 "v2".to_string(),
3541 Diff::MINUS_ONE
3542 ),
3543 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3544 ]
3545 );
3546 commit(&mut table, pending);
3547 assert_eq!(
3548 table,
3549 BTreeMap::from([
3550 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3551 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3552 ])
3553 );
3554
3555 let mut table_txn =
3556 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3557 assert_eq!(
3559 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3560 1
3561 );
3562 table_txn
3563 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3564 .unwrap();
3565 table_txn
3567 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3568 .unwrap_err();
3569 table_txn
3571 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3572 .unwrap_err();
3573 assert_eq!(
3574 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3575 1
3576 );
3577 table_txn
3579 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3580 .unwrap();
3581 table_txn
3582 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3583 .unwrap();
3584 let pending = table_txn.pending();
3585 assert_eq!(
3586 pending,
3587 vec![
3588 (
3589 1i64.to_le_bytes().to_vec(),
3590 "v3".to_string(),
3591 Diff::MINUS_ONE
3592 ),
3593 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3594 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3595 ]
3596 );
3597 commit(&mut table, pending);
3598 assert_eq!(
3599 table,
3600 BTreeMap::from([
3601 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3602 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3603 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3604 ])
3605 );
3606
3607 let mut table_txn =
3608 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3609 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3610 table_txn
3611 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3612 .unwrap();
3613
3614 commit(&mut table, table_txn.pending());
3615 assert_eq!(
3616 table,
3617 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3618 );
3619
3620 let mut table_txn =
3621 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3622 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3623 table_txn
3624 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3625 .unwrap();
3626 commit(&mut table, table_txn.pending());
3627 assert_eq!(
3628 table,
3629 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3630 );
3631
3632 let mut table_txn =
3634 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3635 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3636 table_txn
3637 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3638 .unwrap();
3639 table_txn
3640 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3641 .unwrap_err();
3642 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3643 table_txn
3644 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3645 .unwrap();
3646 commit(&mut table, table_txn.pending());
3647 assert_eq!(
3648 table.clone().into_iter().collect::<Vec<_>>(),
3649 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3650 );
3651
3652 let mut table_txn =
3654 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3655 table_txn
3657 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3658 .unwrap_err();
3659 table_txn
3660 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3661 .unwrap();
3662 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3663 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3664 let pending = table_txn.pending();
3665 assert_eq!(
3666 pending,
3667 vec![
3668 (
3669 1i64.to_le_bytes().to_vec(),
3670 "v5".to_string(),
3671 Diff::MINUS_ONE
3672 ),
3673 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3674 ]
3675 );
3676 commit(&mut table, pending);
3677 assert_eq!(
3678 table,
3679 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3680 );
3681
3682 let mut table_txn =
3684 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3685 table_txn
3686 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3687 .unwrap();
3688 let pending = table_txn.pending::<Vec<u8>, String>();
3689 assert!(pending.is_empty());
3690
3691 let mut table_txn =
3693 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3694 table_txn
3696 .set_many(
3697 BTreeMap::from([
3698 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3699 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3700 ]),
3701 0,
3702 )
3703 .unwrap_err();
3704 table_txn
3705 .set_many(
3706 BTreeMap::from([
3707 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3708 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3709 ]),
3710 1,
3711 )
3712 .unwrap();
3713 table_txn
3714 .set_many(
3715 BTreeMap::from([
3716 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3717 (3i64.to_le_bytes().to_vec(), None),
3718 ]),
3719 2,
3720 )
3721 .unwrap();
3722 let pending = table_txn.pending();
3723 assert_eq!(
3724 pending,
3725 vec![
3726 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3727 (
3728 3i64.to_le_bytes().to_vec(),
3729 "v6".to_string(),
3730 Diff::MINUS_ONE
3731 ),
3732 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3733 ]
3734 );
3735 commit(&mut table, pending);
3736 assert_eq!(
3737 table,
3738 BTreeMap::from([
3739 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3740 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3741 ])
3742 );
3743
3744 let mut table_txn =
3746 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3747 table_txn
3748 .set_many(
3749 BTreeMap::from([
3750 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3751 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3752 ]),
3753 0,
3754 )
3755 .unwrap();
3756 let pending = table_txn.pending::<Vec<u8>, String>();
3757 assert!(pending.is_empty());
3758 commit(&mut table, pending);
3759 assert_eq!(
3760 table,
3761 BTreeMap::from([
3762 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3763 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3764 ])
3765 );
3766
3767 let mut table_txn =
3769 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3770 table_txn
3772 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3773 .unwrap_err();
3774 assert!(
3775 table_txn
3776 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3777 .unwrap()
3778 );
3779 assert!(
3780 !table_txn
3781 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3782 .unwrap()
3783 );
3784 let pending = table_txn.pending();
3785 assert_eq!(
3786 pending,
3787 vec![
3788 (
3789 1i64.to_le_bytes().to_vec(),
3790 "v6".to_string(),
3791 Diff::MINUS_ONE
3792 ),
3793 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3794 ]
3795 );
3796 commit(&mut table, pending);
3797 assert_eq!(
3798 table,
3799 BTreeMap::from([
3800 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3801 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3802 ])
3803 );
3804
3805 let mut table_txn =
3807 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3808 assert!(
3809 table_txn
3810 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3811 .unwrap()
3812 );
3813 let pending = table_txn.pending::<Vec<u8>, String>();
3814 assert!(pending.is_empty());
3815 commit(&mut table, pending);
3816 assert_eq!(
3817 table,
3818 BTreeMap::from([
3819 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3820 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3821 ])
3822 );
3823
3824 let mut table_txn =
3826 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3827 table_txn
3829 .update_by_keys(
3830 [
3831 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3832 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3833 ],
3834 0,
3835 )
3836 .unwrap_err();
3837 let n = table_txn
3838 .update_by_keys(
3839 [
3840 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3841 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3842 ],
3843 1,
3844 )
3845 .unwrap();
3846 assert_eq!(n, Diff::ONE);
3847 let n = table_txn
3848 .update_by_keys(
3849 [
3850 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3851 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3852 ],
3853 2,
3854 )
3855 .unwrap();
3856 assert_eq!(n, Diff::ZERO);
3857 let pending = table_txn.pending();
3858 assert_eq!(
3859 pending,
3860 vec![
3861 (
3862 1i64.to_le_bytes().to_vec(),
3863 "v8".to_string(),
3864 Diff::MINUS_ONE
3865 ),
3866 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3867 ]
3868 );
3869 commit(&mut table, pending);
3870 assert_eq!(
3871 table,
3872 BTreeMap::from([
3873 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3874 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3875 ])
3876 );
3877
3878 let mut table_txn =
3880 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3881 let n = table_txn
3882 .update_by_keys(
3883 [
3884 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3885 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3886 ],
3887 0,
3888 )
3889 .unwrap();
3890 assert_eq!(n, Diff::from(2));
3891 let pending = table_txn.pending::<Vec<u8>, String>();
3892 assert!(pending.is_empty());
3893 commit(&mut table, pending);
3894 assert_eq!(
3895 table,
3896 BTreeMap::from([
3897 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3898 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3899 ])
3900 );
3901
3902 let mut table_txn =
3904 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3905 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3906 assert_eq!(prev, Some("v9".to_string()));
3907 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3908 assert_none!(prev);
3909 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3910 assert_none!(prev);
3911 let pending = table_txn.pending();
3912 assert_eq!(
3913 pending,
3914 vec![(
3915 1i64.to_le_bytes().to_vec(),
3916 "v9".to_string(),
3917 Diff::MINUS_ONE
3918 ),]
3919 );
3920 commit(&mut table, pending);
3921 assert_eq!(
3922 table,
3923 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3924 );
3925
3926 let mut table_txn =
3928 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3929 let prevs = table_txn.delete_by_keys(
3930 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3931 0,
3932 );
3933 assert_eq!(
3934 prevs,
3935 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3936 );
3937 let prevs = table_txn.delete_by_keys(
3938 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3939 1,
3940 );
3941 assert_eq!(prevs, vec![]);
3942 let prevs = table_txn.delete_by_keys(
3943 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3944 2,
3945 );
3946 assert_eq!(prevs, vec![]);
3947 let pending = table_txn.pending();
3948 assert_eq!(
3949 pending,
3950 vec![(
3951 42i64.to_le_bytes().to_vec(),
3952 "v7".to_string(),
3953 Diff::MINUS_ONE
3954 ),]
3955 );
3956 commit(&mut table, pending);
3957 assert_eq!(table, BTreeMap::new());
3958 }
3959
3960 #[mz_ore::test(tokio::test)]
3961 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
3963 const VERSION: Version = Version::new(26, 0, 0);
3964 let mut persist_cache = PersistClientCache::new_no_metrics();
3965 persist_cache.cfg.build_version = VERSION;
3966 let persist_client = persist_cache
3967 .open(PersistLocation::new_in_mem())
3968 .await
3969 .unwrap();
3970 let state_builder = TestCatalogStateBuilder::new(persist_client)
3971 .with_default_deploy_generation()
3972 .with_version(VERSION);
3973
3974 let _ = state_builder
3976 .clone()
3977 .unwrap_build()
3978 .await
3979 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3980 .await
3981 .unwrap()
3982 .0;
3983 let mut savepoint_state = state_builder
3984 .unwrap_build()
3985 .await
3986 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3987 .await
3988 .unwrap()
3989 .0;
3990
3991 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3992 assert!(!initial_snapshot.is_empty());
3993
3994 let db_name = "db";
3995 let db_owner = RoleId::User(42);
3996 let db_privileges = Vec::new();
3997 let mut txn = savepoint_state.transaction().await.unwrap();
3998 let (db_id, db_oid) = txn
3999 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4000 .unwrap();
4001 let commit_ts = txn.upper();
4002 txn.commit_internal(commit_ts).await.unwrap();
4003 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4004 let update = updates.into_element();
4005
4006 assert_eq!(update.diff, StateDiff::Addition);
4007
4008 let db = match update.kind {
4009 memory::objects::StateUpdateKind::Database(db) => db,
4010 update => panic!("unexpected update: {update:?}"),
4011 };
4012
4013 assert_eq!(db_id, db.id);
4014 assert_eq!(db_oid, db.oid);
4015 assert_eq!(db_name, db.name);
4016 assert_eq!(db_owner, db.owner_id);
4017 assert_eq!(db_privileges, db.privileges);
4018 }
4019
4020 #[mz_ore::test]
4021 fn test_allocate_introspection_source_index_id() {
4022 let cluster_variant: u8 = 0b0000_0001;
4023 let cluster_id_inner: u64 =
4024 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4025 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4026
4027 let cluster_id = ClusterId::System(cluster_id_inner);
4028 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4029
4030 let introspection_source_index_id: u64 =
4031 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4032
4033 {
4035 let mut cluster_variant_mask = 0xFF << 56;
4036 cluster_variant_mask &= introspection_source_index_id;
4037 cluster_variant_mask >>= 56;
4038 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4039 }
4040
4041 {
4043 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4044 cluster_id_inner_mask &= introspection_source_index_id;
4045 cluster_id_inner_mask >>= 8;
4046 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4047 }
4048
4049 {
4051 let mut log_variant_mask = 0xFF;
4052 log_variant_mask &= introspection_source_index_id;
4053 assert_eq!(
4054 log_variant_mask,
4055 u64::from(timely_messages_received_log_variant)
4056 );
4057 }
4058
4059 let (catalog_item_id, global_id) =
4060 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4061
4062 assert_eq!(
4063 catalog_item_id,
4064 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4065 );
4066 assert_eq!(
4067 global_id,
4068 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4069 );
4070 }
4071}