1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68 SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69 USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70 USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81 #[derivative(Debug = "ignore")]
82 #[derivative(PartialEq = "ignore")]
83 durable_catalog: &'a mut dyn DurableCatalogState,
84 databases: TableTransaction<DatabaseKey, DatabaseValue>,
85 schemas: TableTransaction<SchemaKey, SchemaValue>,
86 items: TableTransaction<ItemKey, ItemValue>,
87 comments: TableTransaction<CommentKey, CommentValue>,
88 roles: TableTransaction<RoleKey, RoleValue>,
89 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90 clusters: TableTransaction<ClusterKey, ClusterValue>,
91 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92 introspection_sources:
93 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95 configs: TableTransaction<ConfigKey, ConfigValue>,
96 settings: TableTransaction<SettingKey, SettingValue>,
97 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103 storage_collection_metadata:
104 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110 upper: mz_repr::Timestamp,
112 op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117 pub fn new(
118 durable_catalog: &'a mut dyn DurableCatalogState,
119 Snapshot {
120 databases,
121 schemas,
122 roles,
123 role_auth,
124 items,
125 comments,
126 clusters,
127 network_policies,
128 cluster_replicas,
129 introspection_sources,
130 id_allocator,
131 configs,
132 settings,
133 source_references,
134 system_object_mappings,
135 system_configurations,
136 default_privileges,
137 system_privileges,
138 storage_collection_metadata,
139 unfinalized_shards,
140 txn_wal_shard,
141 }: Snapshot,
142 upper: mz_repr::Timestamp,
143 ) -> Result<Transaction<'a>, CatalogError> {
144 Ok(Transaction {
145 durable_catalog,
146 databases: TableTransaction::new_with_uniqueness_fn(
147 databases,
148 |a: &DatabaseValue, b| a.name == b.name,
149 )?,
150 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151 a.database_id == b.database_id && a.name == b.name
152 })?,
153 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154 a.schema_id == b.schema_id && a.name == b.name && {
155 let a_type = a.item_type();
157 let b_type = b.item_type();
158 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161 }
162 })?,
163 comments: TableTransaction::new(comments)?,
164 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165 a.name == b.name
166 })?,
167 role_auth: TableTransaction::new(role_auth)?,
168 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169 a.name == b.name
170 })?,
171 network_policies: TableTransaction::new_with_uniqueness_fn(
172 network_policies,
173 |a: &NetworkPolicyValue, b| a.name == b.name,
174 )?,
175 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176 cluster_replicas,
177 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178 )?,
179 introspection_sources: TableTransaction::new(introspection_sources)?,
180 id_allocator: TableTransaction::new(id_allocator)?,
181 configs: TableTransaction::new(configs)?,
182 settings: TableTransaction::new(settings)?,
183 source_references: TableTransaction::new(source_references)?,
184 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185 system_configurations: TableTransaction::new(system_configurations)?,
186 default_privileges: TableTransaction::new(default_privileges)?,
187 system_privileges: TableTransaction::new(system_privileges)?,
188 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194 audit_log_updates: Vec::new(),
195 upper,
196 op_id: 0,
197 })
198 }
199
200 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201 let key = ItemKey { id: *id };
202 self.items
203 .get(&key)
204 .map(|v| DurableType::from_key_value(key, v.clone()))
205 }
206
207 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208 self.items
209 .items()
210 .into_iter()
211 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212 .sorted_by_key(|Item { id, .. }| *id)
213 }
214
215 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216 self.insert_audit_log_events([event]);
217 }
218
219 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220 let events = events
221 .into_iter()
222 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223 self.audit_log_updates.extend(events);
224 }
225
226 pub fn insert_user_database(
227 &mut self,
228 database_name: &str,
229 owner_id: RoleId,
230 privileges: Vec<MzAclItem>,
231 temporary_oids: &HashSet<u32>,
232 ) -> Result<(DatabaseId, u32), CatalogError> {
233 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234 let id = DatabaseId::User(id);
235 let oid = self.allocate_oid(temporary_oids)?;
236 self.insert_database(id, database_name, owner_id, privileges, oid)?;
237 Ok((id, oid))
238 }
239
240 pub(crate) fn insert_database(
241 &mut self,
242 id: DatabaseId,
243 database_name: &str,
244 owner_id: RoleId,
245 privileges: Vec<MzAclItem>,
246 oid: u32,
247 ) -> Result<u32, CatalogError> {
248 match self.databases.insert(
249 DatabaseKey { id },
250 DatabaseValue {
251 name: database_name.to_string(),
252 owner_id,
253 privileges,
254 oid,
255 },
256 self.op_id,
257 ) {
258 Ok(_) => Ok(oid),
259 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260 }
261 }
262
263 pub fn insert_user_schema(
264 &mut self,
265 database_id: DatabaseId,
266 schema_name: &str,
267 owner_id: RoleId,
268 privileges: Vec<MzAclItem>,
269 temporary_oids: &HashSet<u32>,
270 ) -> Result<(SchemaId, u32), CatalogError> {
271 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272 let id = SchemaId::User(id);
273 let oid = self.allocate_oid(temporary_oids)?;
274 self.insert_schema(
275 id,
276 Some(database_id),
277 schema_name.to_string(),
278 owner_id,
279 privileges,
280 oid,
281 )?;
282 Ok((id, oid))
283 }
284
285 pub fn insert_system_schema(
286 &mut self,
287 schema_id: u64,
288 schema_name: &str,
289 owner_id: RoleId,
290 privileges: Vec<MzAclItem>,
291 oid: u32,
292 ) -> Result<(), CatalogError> {
293 let id = SchemaId::System(schema_id);
294 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295 }
296
297 pub(crate) fn insert_schema(
298 &mut self,
299 schema_id: SchemaId,
300 database_id: Option<DatabaseId>,
301 schema_name: String,
302 owner_id: RoleId,
303 privileges: Vec<MzAclItem>,
304 oid: u32,
305 ) -> Result<(), CatalogError> {
306 match self.schemas.insert(
307 SchemaKey { id: schema_id },
308 SchemaValue {
309 database_id,
310 name: schema_name.clone(),
311 owner_id,
312 privileges,
313 oid,
314 },
315 self.op_id,
316 ) {
317 Ok(_) => Ok(()),
318 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319 }
320 }
321
322 pub fn insert_builtin_role(
323 &mut self,
324 id: RoleId,
325 name: String,
326 attributes: RoleAttributesRaw,
327 membership: RoleMembership,
328 vars: RoleVars,
329 oid: u32,
330 ) -> Result<RoleId, CatalogError> {
331 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332 self.insert_role(id, name, attributes, membership, vars, oid)?;
333 Ok(id)
334 }
335
336 pub fn insert_user_role(
337 &mut self,
338 name: String,
339 attributes: RoleAttributesRaw,
340 membership: RoleMembership,
341 vars: RoleVars,
342 temporary_oids: &HashSet<u32>,
343 ) -> Result<(RoleId, u32), CatalogError> {
344 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345 let id = RoleId::User(id);
346 let oid = self.allocate_oid(temporary_oids)?;
347 self.insert_role(id, name, attributes, membership, vars, oid)?;
348 Ok((id, oid))
349 }
350
351 fn insert_role(
352 &mut self,
353 id: RoleId,
354 name: String,
355 attributes: RoleAttributesRaw,
356 membership: RoleMembership,
357 vars: RoleVars,
358 oid: u32,
359 ) -> Result<(), CatalogError> {
360 if let Some(ref password) = attributes.password {
361 let hash = mz_auth::hash::scram256_hash(
362 password,
363 &attributes
364 .scram_iterations
365 .or_else(|| {
366 soft_panic_or_log!(
367 "Hash iterations must be set if a password is provided."
368 );
369 None
370 })
371 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374 )
375 .expect("password hash should be valid");
376 match self.role_auth.insert(
377 RoleAuthKey { role_id: id },
378 RoleAuthValue {
379 password_hash: Some(hash),
380 updated_at: SYSTEM_TIME(),
381 },
382 self.op_id,
383 ) {
384 Ok(_) => {}
385 Err(_) => {
386 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387 }
388 }
389 }
390
391 match self.roles.insert(
392 RoleKey { id },
393 RoleValue {
394 name: name.clone(),
395 attributes: attributes.into(),
396 membership,
397 vars,
398 oid,
399 },
400 self.op_id,
401 ) {
402 Ok(_) => Ok(()),
403 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404 }
405 }
406
407 pub fn insert_user_cluster(
409 &mut self,
410 cluster_id: ClusterId,
411 cluster_name: &str,
412 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413 owner_id: RoleId,
414 privileges: Vec<MzAclItem>,
415 config: ClusterConfig,
416 temporary_oids: &HashSet<u32>,
417 ) -> Result<(), CatalogError> {
418 self.insert_cluster(
419 cluster_id,
420 cluster_name,
421 introspection_source_indexes,
422 owner_id,
423 privileges,
424 config,
425 temporary_oids,
426 )
427 }
428
429 pub fn insert_system_cluster(
431 &mut self,
432 cluster_name: &str,
433 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434 privileges: Vec<MzAclItem>,
435 owner_id: RoleId,
436 config: ClusterConfig,
437 temporary_oids: &HashSet<u32>,
438 ) -> Result<(), CatalogError> {
439 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441 self.insert_cluster(
442 cluster_id,
443 cluster_name,
444 introspection_source_indexes,
445 owner_id,
446 privileges,
447 config,
448 temporary_oids,
449 )
450 }
451
452 fn insert_cluster(
453 &mut self,
454 cluster_id: ClusterId,
455 cluster_name: &str,
456 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457 owner_id: RoleId,
458 privileges: Vec<MzAclItem>,
459 config: ClusterConfig,
460 temporary_oids: &HashSet<u32>,
461 ) -> Result<(), CatalogError> {
462 if let Err(_) = self.clusters.insert(
463 ClusterKey { id: cluster_id },
464 ClusterValue {
465 name: cluster_name.to_string(),
466 owner_id,
467 privileges,
468 config,
469 },
470 self.op_id,
471 ) {
472 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473 };
474
475 let amount = usize_to_u64(introspection_source_indexes.len());
476 let oids = self.allocate_oids(amount, temporary_oids)?;
477 let introspection_source_indexes: Vec<_> = introspection_source_indexes
478 .into_iter()
479 .zip_eq(oids)
480 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481 .collect();
482 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483 let introspection_source_index = IntrospectionSourceIndex {
484 cluster_id,
485 name: builtin.name.to_string(),
486 item_id,
487 index_id,
488 oid,
489 };
490 let (key, value) = introspection_source_index.into_key_value();
491 self.introspection_sources
492 .insert(key, value, self.op_id)
493 .expect("no uniqueness violation");
494 }
495
496 Ok(())
497 }
498
499 pub fn rename_cluster(
500 &mut self,
501 cluster_id: ClusterId,
502 cluster_name: &str,
503 cluster_to_name: &str,
504 ) -> Result<(), CatalogError> {
505 let key = ClusterKey { id: cluster_id };
506
507 match self.clusters.update(
508 |k, v| {
509 if *k == key {
510 let mut value = v.clone();
511 value.name = cluster_to_name.to_string();
512 Some(value)
513 } else {
514 None
515 }
516 },
517 self.op_id,
518 )? {
519 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520 Diff::ONE => Ok(()),
521 n => panic!(
522 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523 ),
524 }
525 }
526
527 pub fn rename_cluster_replica(
528 &mut self,
529 replica_id: ReplicaId,
530 replica_name: &QualifiedReplica,
531 replica_to_name: &str,
532 ) -> Result<(), CatalogError> {
533 let key = ClusterReplicaKey { id: replica_id };
534
535 match self.cluster_replicas.update(
536 |k, v| {
537 if *k == key {
538 let mut value = v.clone();
539 value.name = replica_to_name.to_string();
540 Some(value)
541 } else {
542 None
543 }
544 },
545 self.op_id,
546 )? {
547 Diff::ZERO => {
548 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549 }
550 Diff::ONE => Ok(()),
551 n => panic!(
552 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553 ),
554 }
555 }
556
557 pub fn insert_cluster_replica(
558 &mut self,
559 cluster_id: ClusterId,
560 replica_name: &str,
561 config: ReplicaConfig,
562 owner_id: RoleId,
563 ) -> Result<ReplicaId, CatalogError> {
564 let replica_id = match cluster_id {
565 ClusterId::System(_) => self.allocate_system_replica_id()?,
566 ClusterId::User(_) => self.allocate_user_replica_id()?,
567 };
568 self.insert_cluster_replica_with_id(
569 cluster_id,
570 replica_id,
571 replica_name,
572 config,
573 owner_id,
574 )?;
575 Ok(replica_id)
576 }
577
578 pub(crate) fn insert_cluster_replica_with_id(
579 &mut self,
580 cluster_id: ClusterId,
581 replica_id: ReplicaId,
582 replica_name: &str,
583 config: ReplicaConfig,
584 owner_id: RoleId,
585 ) -> Result<(), CatalogError> {
586 if let Err(_) = self.cluster_replicas.insert(
587 ClusterReplicaKey { id: replica_id },
588 ClusterReplicaValue {
589 cluster_id,
590 name: replica_name.into(),
591 config,
592 owner_id,
593 },
594 self.op_id,
595 ) {
596 let cluster = self
597 .clusters
598 .get(&ClusterKey { id: cluster_id })
599 .expect("cluster exists");
600 return Err(SqlCatalogError::DuplicateReplica(
601 replica_name.to_string(),
602 cluster.name.to_string(),
603 )
604 .into());
605 };
606 Ok(())
607 }
608
609 pub fn insert_user_network_policy(
610 &mut self,
611 name: String,
612 rules: Vec<NetworkPolicyRule>,
613 privileges: Vec<MzAclItem>,
614 owner_id: RoleId,
615 temporary_oids: &HashSet<u32>,
616 ) -> Result<NetworkPolicyId, CatalogError> {
617 let oid = self.allocate_oid(temporary_oids)?;
618 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619 let id = NetworkPolicyId::User(id);
620 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621 }
622
623 pub fn insert_network_policy(
624 &mut self,
625 id: NetworkPolicyId,
626 name: String,
627 rules: Vec<NetworkPolicyRule>,
628 privileges: Vec<MzAclItem>,
629 owner_id: RoleId,
630 oid: u32,
631 ) -> Result<NetworkPolicyId, CatalogError> {
632 match self.network_policies.insert(
633 NetworkPolicyKey { id },
634 NetworkPolicyValue {
635 name: name.clone(),
636 rules,
637 privileges,
638 owner_id,
639 oid,
640 },
641 self.op_id,
642 ) {
643 Ok(_) => Ok(id),
644 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645 }
646 }
647
648 pub fn update_introspection_source_index_gids(
653 &mut self,
654 mappings: impl Iterator<
655 Item = (
656 ClusterId,
657 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658 ),
659 >,
660 ) -> Result<(), CatalogError> {
661 for (cluster_id, updates) in mappings {
662 for (name, item_id, index_id, oid) in updates {
663 let introspection_source_index = IntrospectionSourceIndex {
664 cluster_id,
665 name,
666 item_id,
667 index_id,
668 oid,
669 };
670 let (key, value) = introspection_source_index.into_key_value();
671
672 let prev = self
673 .introspection_sources
674 .set(key, Some(value), self.op_id)?;
675 if prev.is_none() {
676 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677 "{index_id}"
678 ))
679 .into());
680 }
681 }
682 }
683 Ok(())
684 }
685
686 pub fn insert_user_item(
687 &mut self,
688 id: CatalogItemId,
689 global_id: GlobalId,
690 schema_id: SchemaId,
691 item_name: &str,
692 create_sql: String,
693 owner_id: RoleId,
694 privileges: Vec<MzAclItem>,
695 temporary_oids: &HashSet<u32>,
696 versions: BTreeMap<RelationVersion, GlobalId>,
697 ) -> Result<u32, CatalogError> {
698 let oid = self.allocate_oid(temporary_oids)?;
699 self.insert_item(
700 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701 )?;
702 Ok(oid)
703 }
704
705 pub fn insert_item(
706 &mut self,
707 id: CatalogItemId,
708 oid: u32,
709 global_id: GlobalId,
710 schema_id: SchemaId,
711 item_name: &str,
712 create_sql: String,
713 owner_id: RoleId,
714 privileges: Vec<MzAclItem>,
715 extra_versions: BTreeMap<RelationVersion, GlobalId>,
716 ) -> Result<(), CatalogError> {
717 match self.items.insert(
718 ItemKey { id },
719 ItemValue {
720 schema_id,
721 name: item_name.to_string(),
722 create_sql,
723 owner_id,
724 privileges,
725 oid,
726 global_id,
727 extra_versions,
728 },
729 self.op_id,
730 ) {
731 Ok(_) => Ok(()),
732 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733 }
734 }
735
736 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738 }
739
740 pub fn get_and_increment_id_by(
741 &mut self,
742 key: String,
743 amount: u64,
744 ) -> Result<Vec<u64>, CatalogError> {
745 assert!(
746 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747 "system item IDs cannot be allocated outside of bootstrap"
748 );
749
750 let current_id = self
751 .id_allocator
752 .items()
753 .get(&IdAllocKey { name: key.clone() })
754 .unwrap_or_else(|| panic!("{key} id allocator missing"))
755 .next_id;
756 let next_id = current_id
757 .checked_add(amount)
758 .ok_or(SqlCatalogError::IdExhaustion)?;
759 let prev = self.id_allocator.set(
760 IdAllocKey { name: key },
761 Some(IdAllocValue { next_id }),
762 self.op_id,
763 )?;
764 assert_eq!(
765 prev,
766 Some(IdAllocValue {
767 next_id: current_id
768 })
769 );
770 Ok((current_id..next_id).collect())
771 }
772
773 pub fn allocate_system_item_ids(
774 &mut self,
775 amount: u64,
776 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777 assert!(
778 !self.durable_catalog.is_bootstrap_complete(),
779 "we can only allocate system item IDs during bootstrap"
780 );
781 Ok(self
782 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783 .into_iter()
784 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786 .collect())
787 }
788
789 pub fn allocate_introspection_source_index_id(
821 cluster_id: &ClusterId,
822 log_variant: LogVariant,
823 ) -> (CatalogItemId, GlobalId) {
824 let cluster_variant: u8 = match cluster_id {
825 ClusterId::System(_) => 1,
826 ClusterId::User(_) => 2,
827 };
828 let cluster_id: u64 = cluster_id.inner_id();
829 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830 assert_eq!(
831 CLUSTER_ID_MASK & cluster_id,
832 0,
833 "invalid cluster ID: {cluster_id}"
834 );
835 let log_variant: u8 = match log_variant {
836 LogVariant::Timely(TimelyLog::Operates) => 1,
837 LogVariant::Timely(TimelyLog::Channels) => 2,
838 LogVariant::Timely(TimelyLog::Elapsed) => 3,
839 LogVariant::Timely(TimelyLog::Histogram) => 4,
840 LogVariant::Timely(TimelyLog::Addresses) => 5,
841 LogVariant::Timely(TimelyLog::Parks) => 6,
842 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844 LogVariant::Timely(TimelyLog::Reachability) => 9,
845 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849 LogVariant::Differential(DifferentialLog::Sharing) => 14,
850 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
863 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
864 LogVariant::Compute(ComputeLog::LirMapping) => 30,
865 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
866 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
867 };
868
869 let mut id: u64 = u64::from(cluster_variant) << 56;
870 id |= cluster_id << 8;
871 id |= u64::from(log_variant);
872
873 (
874 CatalogItemId::IntrospectionSourceIndex(id),
875 GlobalId::IntrospectionSourceIndex(id),
876 )
877 }
878
879 pub fn allocate_user_item_ids(
880 &mut self,
881 amount: u64,
882 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
883 Ok(self
884 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
885 .into_iter()
886 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
888 .collect())
889 }
890
891 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
892 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
893 Ok(ReplicaId::User(id))
894 }
895
896 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
897 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
898 Ok(ReplicaId::System(id))
899 }
900
901 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
902 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
903 }
904
905 pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
906 self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
907 }
908
909 #[mz_ore::instrument]
912 fn allocate_oids(
913 &mut self,
914 amount: u64,
915 temporary_oids: &HashSet<u32>,
916 ) -> Result<Vec<u32>, CatalogError> {
917 struct UserOid(u32);
920
921 impl UserOid {
922 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
923 if oid < FIRST_USER_OID {
924 Err(anyhow!("invalid user OID {oid}"))
925 } else {
926 Ok(UserOid(oid))
927 }
928 }
929 }
930
931 impl std::ops::AddAssign<u32> for UserOid {
932 fn add_assign(&mut self, rhs: u32) {
933 let (res, overflow) = self.0.overflowing_add(rhs);
934 self.0 = if overflow { FIRST_USER_OID + res } else { res };
935 }
936 }
937
938 if amount > u32::MAX.into() {
939 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
940 }
941
942 let mut allocated_oids = HashSet::with_capacity(
948 self.databases.len()
949 + self.schemas.len()
950 + self.roles.len()
951 + self.items.len()
952 + self.introspection_sources.len()
953 + temporary_oids.len(),
954 );
955 self.databases.for_values(|_, value| {
956 allocated_oids.insert(value.oid);
957 });
958 self.schemas.for_values(|_, value| {
959 allocated_oids.insert(value.oid);
960 });
961 self.roles.for_values(|_, value| {
962 allocated_oids.insert(value.oid);
963 });
964 self.items.for_values(|_, value| {
965 allocated_oids.insert(value.oid);
966 });
967 self.introspection_sources.for_values(|_, value| {
968 allocated_oids.insert(value.oid);
969 });
970
971 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
972
973 let start_oid: u32 = self
974 .id_allocator
975 .items()
976 .get(&IdAllocKey {
977 name: OID_ALLOC_KEY.to_string(),
978 })
979 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
980 .next_id
981 .try_into()
982 .expect("we should never persist an oid outside of the u32 range");
983 let mut current_oid = UserOid::new(start_oid)
984 .expect("we should never persist an oid outside of user OID range");
985 let mut oids = Vec::new();
986 while oids.len() < u64_to_usize(amount) {
987 if !is_allocated(current_oid.0) {
988 oids.push(current_oid.0);
989 }
990 current_oid += 1;
991
992 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
993 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
995 }
996 }
997
998 let next_id = current_oid.0;
999 let prev = self.id_allocator.set(
1000 IdAllocKey {
1001 name: OID_ALLOC_KEY.to_string(),
1002 },
1003 Some(IdAllocValue {
1004 next_id: next_id.into(),
1005 }),
1006 self.op_id,
1007 )?;
1008 assert_eq!(
1009 prev,
1010 Some(IdAllocValue {
1011 next_id: start_oid.into(),
1012 })
1013 );
1014
1015 Ok(oids)
1016 }
1017
1018 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1021 self.allocate_oids(1, temporary_oids)
1022 .map(|oids| oids.into_element())
1023 }
1024
1025 pub(crate) fn insert_id_allocator(
1026 &mut self,
1027 name: String,
1028 next_id: u64,
1029 ) -> Result<(), CatalogError> {
1030 match self.id_allocator.insert(
1031 IdAllocKey { name: name.clone() },
1032 IdAllocValue { next_id },
1033 self.op_id,
1034 ) {
1035 Ok(_) => Ok(()),
1036 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1037 }
1038 }
1039
1040 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1047 let prev = self
1048 .databases
1049 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1050 if prev.is_some() {
1051 Ok(())
1052 } else {
1053 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1054 }
1055 }
1056
1057 pub fn remove_databases(
1064 &mut self,
1065 databases: &BTreeSet<DatabaseId>,
1066 ) -> Result<(), CatalogError> {
1067 if databases.is_empty() {
1068 return Ok(());
1069 }
1070
1071 let to_remove = databases
1072 .iter()
1073 .map(|id| (DatabaseKey { id: *id }, None))
1074 .collect();
1075 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1076 prev.retain(|_k, val| val.is_none());
1077
1078 if !prev.is_empty() {
1079 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1080 return Err(SqlCatalogError::UnknownDatabase(err).into());
1081 }
1082
1083 Ok(())
1084 }
1085
1086 pub fn remove_schema(
1093 &mut self,
1094 database_id: &Option<DatabaseId>,
1095 schema_id: &SchemaId,
1096 ) -> Result<(), CatalogError> {
1097 let prev = self
1098 .schemas
1099 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1100 if prev.is_some() {
1101 Ok(())
1102 } else {
1103 let database_name = match database_id {
1104 Some(id) => format!("{id}."),
1105 None => "".to_string(),
1106 };
1107 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1108 }
1109 }
1110
1111 pub fn remove_schemas(
1118 &mut self,
1119 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1120 ) -> Result<(), CatalogError> {
1121 if schemas.is_empty() {
1122 return Ok(());
1123 }
1124
1125 let to_remove = schemas
1126 .iter()
1127 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1128 .collect();
1129 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1130 prev.retain(|_k, v| v.is_none());
1131
1132 if !prev.is_empty() {
1133 let err = prev
1134 .keys()
1135 .map(|k| {
1136 let db_spec = schemas.get(&k.id).expect("should_exist");
1137 let db_name = match db_spec {
1138 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1139 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1140 };
1141 format!("{}.{}", db_name, k.id)
1142 })
1143 .join(", ");
1144
1145 return Err(SqlCatalogError::UnknownSchema(err).into());
1146 }
1147
1148 Ok(())
1149 }
1150
1151 pub fn remove_source_references(
1152 &mut self,
1153 source_id: CatalogItemId,
1154 ) -> Result<(), CatalogError> {
1155 let deleted = self
1156 .source_references
1157 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1158 .is_some();
1159 if deleted {
1160 Ok(())
1161 } else {
1162 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1163 }
1164 }
1165
1166 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1173 assert!(
1174 roles.iter().all(|id| id.is_user()),
1175 "cannot delete non-user roles"
1176 );
1177 self.remove_roles(roles)
1178 }
1179
1180 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1187 if roles.is_empty() {
1188 return Ok(());
1189 }
1190
1191 let to_remove_keys = roles
1192 .iter()
1193 .map(|role_id| RoleKey { id: *role_id })
1194 .collect::<Vec<_>>();
1195
1196 let to_remove_roles = to_remove_keys
1197 .iter()
1198 .map(|role_key| (role_key.clone(), None))
1199 .collect();
1200
1201 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1202
1203 let to_remove_role_auth = to_remove_keys
1204 .iter()
1205 .map(|role_key| {
1206 (
1207 RoleAuthKey {
1208 role_id: role_key.id,
1209 },
1210 None,
1211 )
1212 })
1213 .collect();
1214
1215 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1216
1217 prev.retain(|_k, v| v.is_none());
1218 if !prev.is_empty() {
1219 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1220 return Err(SqlCatalogError::UnknownRole(err).into());
1221 }
1222
1223 role_auth_prev.retain(|_k, v| v.is_none());
1224 Ok(())
1228 }
1229
1230 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1237 if clusters.is_empty() {
1238 return Ok(());
1239 }
1240
1241 let to_remove = clusters
1242 .iter()
1243 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1244 .collect();
1245 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1246
1247 prev.retain(|_k, v| v.is_none());
1248 if !prev.is_empty() {
1249 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1250 return Err(SqlCatalogError::UnknownCluster(err).into());
1251 }
1252
1253 self.cluster_replicas
1259 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1260 self.introspection_sources
1261 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1262
1263 Ok(())
1264 }
1265
1266 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1273 let deleted = self
1274 .cluster_replicas
1275 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1276 .is_some();
1277 if deleted {
1278 Ok(())
1279 } else {
1280 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1281 }
1282 }
1283
1284 pub fn remove_cluster_replicas(
1291 &mut self,
1292 replicas: &BTreeSet<ReplicaId>,
1293 ) -> Result<(), CatalogError> {
1294 if replicas.is_empty() {
1295 return Ok(());
1296 }
1297
1298 let to_remove = replicas
1299 .iter()
1300 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1301 .collect();
1302 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1303
1304 prev.retain(|_k, v| v.is_none());
1305 if !prev.is_empty() {
1306 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1307 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1308 }
1309
1310 Ok(())
1311 }
1312
1313 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1320 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1321 if prev.is_some() {
1322 Ok(())
1323 } else {
1324 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1325 }
1326 }
1327
1328 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1335 if ids.is_empty() {
1336 return Ok(());
1337 }
1338
1339 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1340 let n = self.items.delete_by_keys(ks, self.op_id).len();
1341 if n == ids.len() {
1342 Ok(())
1343 } else {
1344 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1345 let mut unknown = ids.difference(&item_ids);
1346 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1347 }
1348 }
1349
1350 pub fn remove_system_object_mappings(
1357 &mut self,
1358 descriptions: BTreeSet<SystemObjectDescription>,
1359 ) -> Result<(), CatalogError> {
1360 if descriptions.is_empty() {
1361 return Ok(());
1362 }
1363
1364 let ks: Vec<_> = descriptions
1365 .clone()
1366 .into_iter()
1367 .map(|desc| GidMappingKey {
1368 schema_name: desc.schema_name,
1369 object_type: desc.object_type,
1370 object_name: desc.object_name,
1371 })
1372 .collect();
1373 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1374
1375 if n == descriptions.len() {
1376 Ok(())
1377 } else {
1378 let item_descriptions = self
1379 .system_gid_mapping
1380 .items()
1381 .keys()
1382 .map(|k| SystemObjectDescription {
1383 schema_name: k.schema_name.clone(),
1384 object_type: k.object_type.clone(),
1385 object_name: k.object_name.clone(),
1386 })
1387 .collect();
1388 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1389 format!(
1390 "{} {}.{}",
1391 desc.object_type, desc.schema_name, desc.object_name
1392 )
1393 });
1394 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1395 }
1396 }
1397
1398 pub fn remove_introspection_source_indexes(
1405 &mut self,
1406 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1407 ) -> Result<(), CatalogError> {
1408 if introspection_source_indexes.is_empty() {
1409 return Ok(());
1410 }
1411
1412 let ks: Vec<_> = introspection_source_indexes
1413 .clone()
1414 .into_iter()
1415 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1416 .collect();
1417 let n = self
1418 .introspection_sources
1419 .delete_by_keys(ks, self.op_id)
1420 .len();
1421 if n == introspection_source_indexes.len() {
1422 Ok(())
1423 } else {
1424 let txn_indexes = self
1425 .introspection_sources
1426 .items()
1427 .keys()
1428 .map(|k| (k.cluster_id, k.name.clone()))
1429 .collect();
1430 let mut unknown = introspection_source_indexes
1431 .difference(&txn_indexes)
1432 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1433 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1434 }
1435 }
1436
1437 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1444 let updated =
1445 self.items
1446 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1447 if updated {
1448 Ok(())
1449 } else {
1450 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1451 }
1452 }
1453
1454 pub fn update_items(
1462 &mut self,
1463 items: BTreeMap<CatalogItemId, Item>,
1464 ) -> Result<(), CatalogError> {
1465 if items.is_empty() {
1466 return Ok(());
1467 }
1468
1469 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1470 let kvs: Vec<_> = items
1471 .clone()
1472 .into_iter()
1473 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1474 .collect();
1475 let n = self.items.update_by_keys(kvs, self.op_id)?;
1476 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1477 if n == update_ids.len() {
1478 Ok(())
1479 } else {
1480 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1481 let mut unknown = update_ids.difference(&item_ids);
1482 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1483 }
1484 }
1485
1486 pub fn update_role(
1494 &mut self,
1495 id: RoleId,
1496 role: Role,
1497 password: PasswordAction,
1498 ) -> Result<(), CatalogError> {
1499 let key = RoleKey { id };
1500 if self.roles.get(&key).is_some() {
1501 let auth_key = RoleAuthKey { role_id: id };
1502
1503 match password {
1504 PasswordAction::Set(new_password) => {
1505 let hash = mz_auth::hash::scram256_hash(
1506 &new_password.password,
1507 &new_password.scram_iterations,
1508 )
1509 .expect("password hash should be valid");
1510 let value = RoleAuthValue {
1511 password_hash: Some(hash),
1512 updated_at: SYSTEM_TIME(),
1513 };
1514
1515 if self.role_auth.get(&auth_key).is_some() {
1516 self.role_auth
1517 .update_by_key(auth_key.clone(), value, self.op_id)?;
1518 } else {
1519 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1520 }
1521 }
1522 PasswordAction::Clear => {
1523 let value = RoleAuthValue {
1524 password_hash: None,
1525 updated_at: SYSTEM_TIME(),
1526 };
1527 if self.role_auth.get(&auth_key).is_some() {
1528 self.role_auth
1529 .update_by_key(auth_key.clone(), value, self.op_id)?;
1530 }
1531 }
1532 PasswordAction::NoChange => {}
1533 }
1534
1535 self.roles
1536 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1537
1538 Ok(())
1539 } else {
1540 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1541 }
1542 }
1543
1544 pub fn update_roles_without_auth(
1555 &mut self,
1556 roles: BTreeMap<RoleId, Role>,
1557 ) -> Result<(), CatalogError> {
1558 if roles.is_empty() {
1559 return Ok(());
1560 }
1561
1562 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1563 let kvs: Vec<_> = roles
1564 .into_iter()
1565 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1566 .collect();
1567 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1568 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1569
1570 if n == update_role_ids.len() {
1571 Ok(())
1572 } else {
1573 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1574 let mut unknown = update_role_ids.difference(&role_ids);
1575 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1576 }
1577 }
1578
1579 pub fn update_system_object_mappings(
1584 &mut self,
1585 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1586 ) -> Result<(), CatalogError> {
1587 if mappings.is_empty() {
1588 return Ok(());
1589 }
1590
1591 let n = self.system_gid_mapping.update(
1592 |_k, v| {
1593 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1594 let (_, new_value) = mapping.clone().into_key_value();
1595 Some(new_value)
1596 } else {
1597 None
1598 }
1599 },
1600 self.op_id,
1601 )?;
1602
1603 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1604 != mappings.len()
1605 {
1606 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1607 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1608 }
1609
1610 Ok(())
1611 }
1612
1613 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1620 let updated = self.clusters.update_by_key(
1621 ClusterKey { id },
1622 cluster.into_key_value().1,
1623 self.op_id,
1624 )?;
1625 if updated {
1626 Ok(())
1627 } else {
1628 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1629 }
1630 }
1631
1632 pub fn update_cluster_replica(
1639 &mut self,
1640 replica_id: ReplicaId,
1641 replica: ClusterReplica,
1642 ) -> Result<(), CatalogError> {
1643 let updated = self.cluster_replicas.update_by_key(
1644 ClusterReplicaKey { id: replica_id },
1645 replica.into_key_value().1,
1646 self.op_id,
1647 )?;
1648 if updated {
1649 Ok(())
1650 } else {
1651 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1652 }
1653 }
1654
1655 pub fn update_database(
1662 &mut self,
1663 id: DatabaseId,
1664 database: Database,
1665 ) -> Result<(), CatalogError> {
1666 let updated = self.databases.update_by_key(
1667 DatabaseKey { id },
1668 database.into_key_value().1,
1669 self.op_id,
1670 )?;
1671 if updated {
1672 Ok(())
1673 } else {
1674 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1675 }
1676 }
1677
1678 pub fn update_schema(
1685 &mut self,
1686 schema_id: SchemaId,
1687 schema: Schema,
1688 ) -> Result<(), CatalogError> {
1689 let updated = self.schemas.update_by_key(
1690 SchemaKey { id: schema_id },
1691 schema.into_key_value().1,
1692 self.op_id,
1693 )?;
1694 if updated {
1695 Ok(())
1696 } else {
1697 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1698 }
1699 }
1700
1701 pub fn update_network_policy(
1708 &mut self,
1709 id: NetworkPolicyId,
1710 network_policy: NetworkPolicy,
1711 ) -> Result<(), CatalogError> {
1712 let updated = self.network_policies.update_by_key(
1713 NetworkPolicyKey { id },
1714 network_policy.into_key_value().1,
1715 self.op_id,
1716 )?;
1717 if updated {
1718 Ok(())
1719 } else {
1720 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1721 }
1722 }
1723 pub fn remove_network_policies(
1730 &mut self,
1731 network_policies: &BTreeSet<NetworkPolicyId>,
1732 ) -> Result<(), CatalogError> {
1733 if network_policies.is_empty() {
1734 return Ok(());
1735 }
1736
1737 let to_remove = network_policies
1738 .iter()
1739 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1740 .collect();
1741 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1742 assert!(
1743 prev.iter().all(|(k, _)| k.id.is_user()),
1744 "cannot delete non-user network policy"
1745 );
1746
1747 prev.retain(|_k, v| v.is_none());
1748 if !prev.is_empty() {
1749 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1750 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1751 }
1752
1753 Ok(())
1754 }
1755 pub fn set_default_privilege(
1759 &mut self,
1760 role_id: RoleId,
1761 database_id: Option<DatabaseId>,
1762 schema_id: Option<SchemaId>,
1763 object_type: ObjectType,
1764 grantee: RoleId,
1765 privileges: Option<AclMode>,
1766 ) -> Result<(), CatalogError> {
1767 self.default_privileges.set(
1768 DefaultPrivilegesKey {
1769 role_id,
1770 database_id,
1771 schema_id,
1772 object_type,
1773 grantee,
1774 },
1775 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1776 self.op_id,
1777 )?;
1778 Ok(())
1779 }
1780
1781 pub fn set_default_privileges(
1783 &mut self,
1784 default_privileges: Vec<DefaultPrivilege>,
1785 ) -> Result<(), CatalogError> {
1786 if default_privileges.is_empty() {
1787 return Ok(());
1788 }
1789
1790 let default_privileges = default_privileges
1791 .into_iter()
1792 .map(DurableType::into_key_value)
1793 .map(|(k, v)| (k, Some(v)))
1794 .collect();
1795 self.default_privileges
1796 .set_many(default_privileges, self.op_id)?;
1797 Ok(())
1798 }
1799
1800 pub fn set_system_privilege(
1804 &mut self,
1805 grantee: RoleId,
1806 grantor: RoleId,
1807 acl_mode: Option<AclMode>,
1808 ) -> Result<(), CatalogError> {
1809 self.system_privileges.set(
1810 SystemPrivilegesKey { grantee, grantor },
1811 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1812 self.op_id,
1813 )?;
1814 Ok(())
1815 }
1816
1817 pub fn set_system_privileges(
1819 &mut self,
1820 system_privileges: Vec<MzAclItem>,
1821 ) -> Result<(), CatalogError> {
1822 if system_privileges.is_empty() {
1823 return Ok(());
1824 }
1825
1826 let system_privileges = system_privileges
1827 .into_iter()
1828 .map(DurableType::into_key_value)
1829 .map(|(k, v)| (k, Some(v)))
1830 .collect();
1831 self.system_privileges
1832 .set_many(system_privileges, self.op_id)?;
1833 Ok(())
1834 }
1835
1836 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1838 self.settings.set(
1839 SettingKey { name },
1840 value.map(|value| SettingValue { value }),
1841 self.op_id,
1842 )?;
1843 Ok(())
1844 }
1845
1846 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1847 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1848 }
1849
1850 pub fn insert_introspection_source_indexes(
1852 &mut self,
1853 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1854 temporary_oids: &HashSet<u32>,
1855 ) -> Result<(), CatalogError> {
1856 if introspection_source_indexes.is_empty() {
1857 return Ok(());
1858 }
1859
1860 let amount = usize_to_u64(introspection_source_indexes.len());
1861 let oids = self.allocate_oids(amount, temporary_oids)?;
1862 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1863 .into_iter()
1864 .zip_eq(oids)
1865 .map(
1866 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1867 cluster_id,
1868 name,
1869 item_id,
1870 index_id,
1871 oid,
1872 },
1873 )
1874 .collect();
1875
1876 for introspection_source_index in introspection_source_indexes {
1877 let (key, value) = introspection_source_index.into_key_value();
1878 self.introspection_sources.insert(key, value, self.op_id)?;
1879 }
1880
1881 Ok(())
1882 }
1883
1884 pub fn set_system_object_mappings(
1886 &mut self,
1887 mappings: Vec<SystemObjectMapping>,
1888 ) -> Result<(), CatalogError> {
1889 if mappings.is_empty() {
1890 return Ok(());
1891 }
1892
1893 let mappings = mappings
1894 .into_iter()
1895 .map(DurableType::into_key_value)
1896 .map(|(k, v)| (k, Some(v)))
1897 .collect();
1898 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1899 Ok(())
1900 }
1901
1902 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1904 if replicas.is_empty() {
1905 return Ok(());
1906 }
1907
1908 let replicas = replicas
1909 .into_iter()
1910 .map(DurableType::into_key_value)
1911 .map(|(k, v)| (k, Some(v)))
1912 .collect();
1913 self.cluster_replicas.set_many(replicas, self.op_id)?;
1914 Ok(())
1915 }
1916
1917 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1919 match value {
1920 Some(value) => {
1921 let config = Config { key, value };
1922 let (key, value) = config.into_key_value();
1923 self.configs.set(key, Some(value), self.op_id)?;
1924 }
1925 None => {
1926 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1927 }
1928 }
1929 Ok(())
1930 }
1931
1932 pub fn get_config(&self, key: String) -> Option<u64> {
1934 self.configs
1935 .get(&ConfigKey { key })
1936 .map(|entry| entry.value)
1937 }
1938
1939 pub fn get_setting(&self, name: String) -> Option<&str> {
1941 self.settings
1942 .get(&SettingKey { name })
1943 .map(|entry| &*entry.value)
1944 }
1945
1946 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1947 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1948 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1949 }
1950
1951 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1952 self.set_setting(
1953 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1954 Some(shard_id.to_string()),
1955 )
1956 }
1957
1958 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1959 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1960 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1961 }
1962
1963 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1964 self.set_setting(
1965 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1966 Some(shard_id.to_string()),
1967 )
1968 }
1969
1970 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1976 self.set_config(
1977 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1978 Some(
1979 value
1980 .as_millis()
1981 .try_into()
1982 .expect("max wait fits into u64"),
1983 ),
1984 )
1985 }
1986
1987 pub fn set_0dt_deployment_ddl_check_interval(
1994 &mut self,
1995 value: Duration,
1996 ) -> Result<(), CatalogError> {
1997 self.set_config(
1998 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
1999 Some(
2000 value
2001 .as_millis()
2002 .try_into()
2003 .expect("ddl check interval fits into u64"),
2004 ),
2005 )
2006 }
2007
2008 pub fn set_enable_0dt_deployment_panic_after_timeout(
2014 &mut self,
2015 value: bool,
2016 ) -> Result<(), CatalogError> {
2017 self.set_config(
2018 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2019 Some(u64::from(value)),
2020 )
2021 }
2022
2023 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2029 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2030 }
2031
2032 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2039 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2040 }
2041
2042 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2049 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2050 }
2051
2052 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2054 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2055 }
2056
2057 pub fn update_comment(
2058 &mut self,
2059 object_id: CommentObjectId,
2060 sub_component: Option<usize>,
2061 comment: Option<String>,
2062 ) -> Result<(), CatalogError> {
2063 let key = CommentKey {
2064 object_id,
2065 sub_component,
2066 };
2067 let value = comment.map(|c| CommentValue { comment: c });
2068 self.comments.set(key, value, self.op_id)?;
2069
2070 Ok(())
2071 }
2072
2073 pub fn drop_comments(
2074 &mut self,
2075 object_ids: &BTreeSet<CommentObjectId>,
2076 ) -> Result<(), CatalogError> {
2077 if object_ids.is_empty() {
2078 return Ok(());
2079 }
2080
2081 self.comments
2082 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2083 Ok(())
2084 }
2085
2086 pub fn update_source_references(
2087 &mut self,
2088 source_id: CatalogItemId,
2089 references: Vec<SourceReference>,
2090 updated_at: u64,
2091 ) -> Result<(), CatalogError> {
2092 let key = SourceReferencesKey { source_id };
2093 let value = SourceReferencesValue {
2094 references,
2095 updated_at,
2096 };
2097 self.source_references.set(key, Some(value), self.op_id)?;
2098 Ok(())
2099 }
2100
2101 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2103 let key = ServerConfigurationKey {
2104 name: name.to_string(),
2105 };
2106 let value = ServerConfigurationValue { value };
2107 self.system_configurations
2108 .set(key, Some(value), self.op_id)?;
2109 Ok(())
2110 }
2111
2112 pub fn remove_system_config(&mut self, name: &str) {
2114 let key = ServerConfigurationKey {
2115 name: name.to_string(),
2116 };
2117 self.system_configurations
2118 .set(key, None, self.op_id)
2119 .expect("cannot have uniqueness violation");
2120 }
2121
2122 pub fn clear_system_configs(&mut self) {
2124 self.system_configurations.delete(|_k, _v| true, self.op_id);
2125 }
2126
2127 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2128 match self.configs.insert(
2129 ConfigKey { key: key.clone() },
2130 ConfigValue { value },
2131 self.op_id,
2132 ) {
2133 Ok(_) => Ok(()),
2134 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2135 }
2136 }
2137
2138 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2139 self.clusters
2140 .items()
2141 .into_iter()
2142 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2143 }
2144
2145 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2146 self.cluster_replicas
2147 .items()
2148 .into_iter()
2149 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2150 }
2151
2152 pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2153 self.databases
2154 .items()
2155 .into_iter()
2156 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2157 }
2158
2159 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2160 self.roles
2161 .items()
2162 .into_iter()
2163 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2164 }
2165
2166 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2167 self.network_policies
2168 .items()
2169 .into_iter()
2170 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2171 }
2172
2173 pub fn get_system_object_mappings(
2174 &self,
2175 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2176 self.system_gid_mapping
2177 .items()
2178 .into_iter()
2179 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2180 }
2181
2182 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2183 self.schemas
2184 .items()
2185 .into_iter()
2186 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2187 }
2188
2189 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2190 self.system_configurations
2191 .items()
2192 .into_iter()
2193 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2194 }
2195
2196 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2197 let key = SchemaKey { id: *id };
2198 self.schemas
2199 .get(&key)
2200 .map(|v| DurableType::from_key_value(key, v.clone()))
2201 }
2202
2203 pub fn get_introspection_source_indexes(
2204 &self,
2205 cluster_id: ClusterId,
2206 ) -> BTreeMap<&str, (GlobalId, u32)> {
2207 self.introspection_sources
2208 .items()
2209 .into_iter()
2210 .filter(|(k, _v)| k.cluster_id == cluster_id)
2211 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2212 .collect()
2213 }
2214
2215 pub fn get_catalog_content_version(&self) -> Option<&str> {
2216 self.settings
2217 .get(&SettingKey {
2218 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2219 })
2220 .map(|value| &*value.value)
2221 }
2222
2223 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2224 self.settings
2225 .get(&SettingKey {
2226 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2227 })
2228 .map(|value| value.value.clone())
2229 }
2230
2231 #[must_use]
2237 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2238 let updates = self.get_op_updates();
2239 self.commit_op();
2240 updates
2241 }
2242
2243 fn get_op_updates(&self) -> Vec<StateUpdate> {
2244 fn get_collection_op_updates<'a, T>(
2245 table_txn: &'a TableTransaction<T::Key, T::Value>,
2246 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2247 op: Timestamp,
2248 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2249 where
2250 T::Key: Ord + Eq + Clone + Debug,
2251 T::Value: Ord + Clone + Debug,
2252 T: DurableType,
2253 {
2254 table_txn
2255 .pending
2256 .iter()
2257 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2258 .filter_map(move |(k, v)| {
2259 if v.ts == op {
2260 let key = k.clone();
2261 let value = v.value.clone();
2262 let diff = v.diff.clone().try_into().expect("invalid diff");
2263 let update = DurableType::from_key_value(key, value);
2264 let kind = kind_fn(update);
2265 Some((kind, diff))
2266 } else {
2267 None
2268 }
2269 })
2270 }
2271
2272 fn get_large_collection_op_updates<'a, T>(
2273 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2274 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2275 op: Timestamp,
2276 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2277 where
2278 T::Key: Ord + Eq + Clone + Debug,
2279 T: DurableType<Value = ()>,
2280 {
2281 collection.iter().filter_map(move |(k, diff, ts)| {
2282 if *ts == op {
2283 let key = k.clone();
2284 let diff = diff.clone().try_into().expect("invalid diff");
2285 let update = DurableType::from_key_value(key, ());
2286 let kind = kind_fn(update);
2287 Some((kind, diff))
2288 } else {
2289 None
2290 }
2291 })
2292 }
2293
2294 let Transaction {
2295 durable_catalog: _,
2296 databases,
2297 schemas,
2298 items,
2299 comments,
2300 roles,
2301 role_auth,
2302 clusters,
2303 network_policies,
2304 cluster_replicas,
2305 introspection_sources,
2306 system_gid_mapping,
2307 system_configurations,
2308 default_privileges,
2309 source_references,
2310 system_privileges,
2311 audit_log_updates,
2312 storage_collection_metadata,
2313 unfinalized_shards,
2314 id_allocator: _,
2316 configs: _,
2317 settings: _,
2318 txn_wal_shard: _,
2319 upper,
2320 op_id: _,
2321 } = &self;
2322
2323 let updates = std::iter::empty()
2324 .chain(get_collection_op_updates(
2325 roles,
2326 StateUpdateKind::Role,
2327 self.op_id,
2328 ))
2329 .chain(get_collection_op_updates(
2330 role_auth,
2331 StateUpdateKind::RoleAuth,
2332 self.op_id,
2333 ))
2334 .chain(get_collection_op_updates(
2335 databases,
2336 StateUpdateKind::Database,
2337 self.op_id,
2338 ))
2339 .chain(get_collection_op_updates(
2340 schemas,
2341 StateUpdateKind::Schema,
2342 self.op_id,
2343 ))
2344 .chain(get_collection_op_updates(
2345 default_privileges,
2346 StateUpdateKind::DefaultPrivilege,
2347 self.op_id,
2348 ))
2349 .chain(get_collection_op_updates(
2350 system_privileges,
2351 StateUpdateKind::SystemPrivilege,
2352 self.op_id,
2353 ))
2354 .chain(get_collection_op_updates(
2355 system_configurations,
2356 StateUpdateKind::SystemConfiguration,
2357 self.op_id,
2358 ))
2359 .chain(get_collection_op_updates(
2360 clusters,
2361 StateUpdateKind::Cluster,
2362 self.op_id,
2363 ))
2364 .chain(get_collection_op_updates(
2365 network_policies,
2366 StateUpdateKind::NetworkPolicy,
2367 self.op_id,
2368 ))
2369 .chain(get_collection_op_updates(
2370 introspection_sources,
2371 StateUpdateKind::IntrospectionSourceIndex,
2372 self.op_id,
2373 ))
2374 .chain(get_collection_op_updates(
2375 cluster_replicas,
2376 StateUpdateKind::ClusterReplica,
2377 self.op_id,
2378 ))
2379 .chain(get_collection_op_updates(
2380 system_gid_mapping,
2381 StateUpdateKind::SystemObjectMapping,
2382 self.op_id,
2383 ))
2384 .chain(get_collection_op_updates(
2385 items,
2386 StateUpdateKind::Item,
2387 self.op_id,
2388 ))
2389 .chain(get_collection_op_updates(
2390 comments,
2391 StateUpdateKind::Comment,
2392 self.op_id,
2393 ))
2394 .chain(get_collection_op_updates(
2395 source_references,
2396 StateUpdateKind::SourceReferences,
2397 self.op_id,
2398 ))
2399 .chain(get_collection_op_updates(
2400 storage_collection_metadata,
2401 StateUpdateKind::StorageCollectionMetadata,
2402 self.op_id,
2403 ))
2404 .chain(get_collection_op_updates(
2405 unfinalized_shards,
2406 StateUpdateKind::UnfinalizedShard,
2407 self.op_id,
2408 ))
2409 .chain(get_large_collection_op_updates(
2410 audit_log_updates,
2411 StateUpdateKind::AuditLog,
2412 self.op_id,
2413 ))
2414 .map(|(kind, diff)| StateUpdate {
2415 kind,
2416 ts: upper.clone(),
2417 diff,
2418 })
2419 .collect();
2420
2421 updates
2422 }
2423
2424 pub fn is_savepoint(&self) -> bool {
2425 self.durable_catalog.is_savepoint()
2426 }
2427
2428 fn commit_op(&mut self) {
2429 self.op_id += 1;
2430 }
2431
2432 pub fn op_id(&self) -> Timestamp {
2433 self.op_id
2434 }
2435
2436 pub fn upper(&self) -> mz_repr::Timestamp {
2437 self.upper
2438 }
2439
2440 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2441 let audit_log_updates = self
2442 .audit_log_updates
2443 .into_iter()
2444 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2445 .collect();
2446
2447 let txn_batch = TransactionBatch {
2448 databases: self.databases.pending(),
2449 schemas: self.schemas.pending(),
2450 items: self.items.pending(),
2451 comments: self.comments.pending(),
2452 roles: self.roles.pending(),
2453 role_auth: self.role_auth.pending(),
2454 clusters: self.clusters.pending(),
2455 cluster_replicas: self.cluster_replicas.pending(),
2456 network_policies: self.network_policies.pending(),
2457 introspection_sources: self.introspection_sources.pending(),
2458 id_allocator: self.id_allocator.pending(),
2459 configs: self.configs.pending(),
2460 source_references: self.source_references.pending(),
2461 settings: self.settings.pending(),
2462 system_gid_mapping: self.system_gid_mapping.pending(),
2463 system_configurations: self.system_configurations.pending(),
2464 default_privileges: self.default_privileges.pending(),
2465 system_privileges: self.system_privileges.pending(),
2466 storage_collection_metadata: self.storage_collection_metadata.pending(),
2467 unfinalized_shards: self.unfinalized_shards.pending(),
2468 txn_wal_shard: self.txn_wal_shard.pending(),
2469 audit_log_updates,
2470 upper: self.upper,
2471 };
2472 (txn_batch, self.durable_catalog)
2473 }
2474
2475 #[mz_ore::instrument(level = "debug")]
2487 pub(crate) async fn commit_internal(
2488 self,
2489 commit_ts: mz_repr::Timestamp,
2490 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2491 let (mut txn_batch, durable_catalog) = self.into_parts();
2492 let TransactionBatch {
2493 databases,
2494 schemas,
2495 items,
2496 comments,
2497 roles,
2498 role_auth,
2499 clusters,
2500 cluster_replicas,
2501 network_policies,
2502 introspection_sources,
2503 id_allocator,
2504 configs,
2505 source_references,
2506 settings,
2507 system_gid_mapping,
2508 system_configurations,
2509 default_privileges,
2510 system_privileges,
2511 storage_collection_metadata,
2512 unfinalized_shards,
2513 txn_wal_shard,
2514 audit_log_updates,
2515 upper,
2516 } = &mut txn_batch;
2517 differential_dataflow::consolidation::consolidate_updates(databases);
2520 differential_dataflow::consolidation::consolidate_updates(schemas);
2521 differential_dataflow::consolidation::consolidate_updates(items);
2522 differential_dataflow::consolidation::consolidate_updates(comments);
2523 differential_dataflow::consolidation::consolidate_updates(roles);
2524 differential_dataflow::consolidation::consolidate_updates(role_auth);
2525 differential_dataflow::consolidation::consolidate_updates(clusters);
2526 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2527 differential_dataflow::consolidation::consolidate_updates(network_policies);
2528 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2529 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2530 differential_dataflow::consolidation::consolidate_updates(configs);
2531 differential_dataflow::consolidation::consolidate_updates(settings);
2532 differential_dataflow::consolidation::consolidate_updates(source_references);
2533 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2534 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2535 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2536 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2537 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2538 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2539 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2540 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2541
2542 assert!(
2543 commit_ts >= *upper,
2544 "expected commit ts, {}, to be greater than or equal to upper, {}",
2545 commit_ts,
2546 upper
2547 );
2548 let upper = durable_catalog
2549 .commit_transaction(txn_batch, commit_ts)
2550 .await?;
2551 Ok((durable_catalog, upper))
2552 }
2553
2554 #[mz_ore::instrument(level = "debug")]
2571 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2572 let op_updates = self.get_op_updates();
2573 assert!(
2574 op_updates.is_empty(),
2575 "unconsumed transaction updates: {op_updates:?}"
2576 );
2577
2578 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2579 let updates = durable_storage.sync_updates(upper).await?;
2581 soft_assert_no_log!(
2586 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2587 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2588 );
2589 Ok(())
2590 }
2591}
2592
2593use crate::durable::async_trait;
2594
2595use super::objects::{RoleAuthKey, RoleAuthValue};
2596
2597#[async_trait]
2598impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2599 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2600 self.storage_collection_metadata
2601 .items()
2602 .into_iter()
2603 .map(
2604 |(
2605 StorageCollectionMetadataKey { id },
2606 StorageCollectionMetadataValue { shard },
2607 )| { (*id, shard.clone()) },
2608 )
2609 .collect()
2610 }
2611
2612 fn insert_collection_metadata(
2613 &mut self,
2614 metadata: BTreeMap<GlobalId, ShardId>,
2615 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2616 for (id, shard) in metadata {
2617 self.storage_collection_metadata
2618 .insert(
2619 StorageCollectionMetadataKey { id },
2620 StorageCollectionMetadataValue {
2621 shard: shard.clone(),
2622 },
2623 self.op_id,
2624 )
2625 .map_err(|err| match err {
2626 DurableCatalogError::DuplicateKey => {
2627 StorageError::CollectionMetadataAlreadyExists(id)
2628 }
2629 DurableCatalogError::UniquenessViolation => {
2630 StorageError::PersistShardAlreadyInUse(shard)
2631 }
2632 err => StorageError::Generic(anyhow::anyhow!(err)),
2633 })?;
2634 }
2635 Ok(())
2636 }
2637
2638 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2639 let ks: Vec<_> = ids
2640 .into_iter()
2641 .map(|id| StorageCollectionMetadataKey { id })
2642 .collect();
2643 self.storage_collection_metadata
2644 .delete_by_keys(ks, self.op_id)
2645 .into_iter()
2646 .map(
2647 |(
2648 StorageCollectionMetadataKey { id },
2649 StorageCollectionMetadataValue { shard },
2650 )| (id, shard),
2651 )
2652 .collect()
2653 }
2654
2655 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2656 self.unfinalized_shards
2657 .items()
2658 .into_iter()
2659 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2660 .collect()
2661 }
2662
2663 fn insert_unfinalized_shards(
2664 &mut self,
2665 s: BTreeSet<ShardId>,
2666 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2667 for shard in s {
2668 match self
2669 .unfinalized_shards
2670 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2671 {
2672 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2674 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2675 };
2676 }
2677 Ok(())
2678 }
2679
2680 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2681 let ks: Vec<_> = shards
2682 .into_iter()
2683 .map(|shard| UnfinalizedShardKey { shard })
2684 .collect();
2685 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2686 }
2687
2688 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2689 self.txn_wal_shard
2690 .values()
2691 .iter()
2692 .next()
2693 .map(|TxnWalShardValue { shard }| *shard)
2694 }
2695
2696 fn write_txn_wal_shard(
2697 &mut self,
2698 shard: ShardId,
2699 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2700 self.txn_wal_shard
2701 .insert((), TxnWalShardValue { shard }, self.op_id)
2702 .map_err(|err| match err {
2703 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2704 err => StorageError::Generic(anyhow::anyhow!(err)),
2705 })
2706 }
2707}
2708
2709#[derive(Debug, Clone, Default, PartialEq)]
2711pub struct TransactionBatch {
2712 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2713 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2714 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2715 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2716 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2717 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2718 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2719 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2720 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2721 pub(crate) introspection_sources: Vec<(
2722 proto::ClusterIntrospectionSourceIndexKey,
2723 proto::ClusterIntrospectionSourceIndexValue,
2724 Diff,
2725 )>,
2726 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2727 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2728 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2729 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2730 pub(crate) system_configurations: Vec<(
2731 proto::ServerConfigurationKey,
2732 proto::ServerConfigurationValue,
2733 Diff,
2734 )>,
2735 pub(crate) default_privileges: Vec<(
2736 proto::DefaultPrivilegesKey,
2737 proto::DefaultPrivilegesValue,
2738 Diff,
2739 )>,
2740 pub(crate) source_references: Vec<(
2741 proto::SourceReferencesKey,
2742 proto::SourceReferencesValue,
2743 Diff,
2744 )>,
2745 pub(crate) system_privileges: Vec<(
2746 proto::SystemPrivilegesKey,
2747 proto::SystemPrivilegesValue,
2748 Diff,
2749 )>,
2750 pub(crate) storage_collection_metadata: Vec<(
2751 proto::StorageCollectionMetadataKey,
2752 proto::StorageCollectionMetadataValue,
2753 Diff,
2754 )>,
2755 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2756 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2757 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2758 pub(crate) upper: mz_repr::Timestamp,
2760}
2761
2762impl TransactionBatch {
2763 pub fn is_empty(&self) -> bool {
2764 let TransactionBatch {
2765 databases,
2766 schemas,
2767 items,
2768 comments,
2769 roles,
2770 role_auth,
2771 clusters,
2772 cluster_replicas,
2773 network_policies,
2774 introspection_sources,
2775 id_allocator,
2776 configs,
2777 settings,
2778 source_references,
2779 system_gid_mapping,
2780 system_configurations,
2781 default_privileges,
2782 system_privileges,
2783 storage_collection_metadata,
2784 unfinalized_shards,
2785 txn_wal_shard,
2786 audit_log_updates,
2787 upper: _,
2788 } = self;
2789 databases.is_empty()
2790 && schemas.is_empty()
2791 && items.is_empty()
2792 && comments.is_empty()
2793 && roles.is_empty()
2794 && role_auth.is_empty()
2795 && clusters.is_empty()
2796 && cluster_replicas.is_empty()
2797 && network_policies.is_empty()
2798 && introspection_sources.is_empty()
2799 && id_allocator.is_empty()
2800 && configs.is_empty()
2801 && settings.is_empty()
2802 && source_references.is_empty()
2803 && system_gid_mapping.is_empty()
2804 && system_configurations.is_empty()
2805 && default_privileges.is_empty()
2806 && system_privileges.is_empty()
2807 && storage_collection_metadata.is_empty()
2808 && unfinalized_shards.is_empty()
2809 && txn_wal_shard.is_empty()
2810 && audit_log_updates.is_empty()
2811 }
2812}
2813
2814#[derive(Debug, Clone, PartialEq, Eq)]
2815struct TransactionUpdate<V> {
2816 value: V,
2817 ts: Timestamp,
2818 diff: Diff,
2819}
2820
2821trait UniqueName {
2823 const HAS_UNIQUE_NAME: bool;
2826 fn unique_name(&self) -> &str;
2828}
2829
2830mod unique_name {
2831 use crate::durable::objects::*;
2832
2833 macro_rules! impl_unique_name {
2834 ($($t:ty),* $(,)?) => {
2835 $(
2836 impl crate::durable::transaction::UniqueName for $t {
2837 const HAS_UNIQUE_NAME: bool = true;
2838 fn unique_name(&self) -> &str {
2839 &self.name
2840 }
2841 }
2842 )*
2843 };
2844 }
2845
2846 macro_rules! impl_no_unique_name {
2847 ($($t:ty),* $(,)?) => {
2848 $(
2849 impl crate::durable::transaction::UniqueName for $t {
2850 const HAS_UNIQUE_NAME: bool = false;
2851 fn unique_name(&self) -> &str {
2852 ""
2853 }
2854 }
2855 )*
2856 };
2857 }
2858
2859 impl_unique_name! {
2860 ClusterReplicaValue,
2861 ClusterValue,
2862 DatabaseValue,
2863 ItemValue,
2864 NetworkPolicyValue,
2865 RoleValue,
2866 SchemaValue,
2867 }
2868
2869 impl_no_unique_name!(
2870 (),
2871 ClusterIntrospectionSourceIndexValue,
2872 CommentValue,
2873 ConfigValue,
2874 DefaultPrivilegesValue,
2875 GidMappingValue,
2876 IdAllocValue,
2877 ServerConfigurationValue,
2878 SettingValue,
2879 SourceReferencesValue,
2880 StorageCollectionMetadataValue,
2881 SystemPrivilegesValue,
2882 TxnWalShardValue,
2883 RoleAuthValue,
2884 );
2885
2886 #[cfg(test)]
2887 mod test {
2888 impl_no_unique_name!(String,);
2889 }
2890}
2891
2892#[derive(Debug)]
2902struct TableTransaction<K, V> {
2903 initial: BTreeMap<K, V>,
2904 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2907 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2908}
2909
2910impl<K, V> TableTransaction<K, V>
2911where
2912 K: Ord + Eq + Clone + Debug,
2913 V: Ord + Clone + Debug + UniqueName,
2914{
2915 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2922 where
2923 K: RustType<KP>,
2924 V: RustType<VP>,
2925 {
2926 let initial = initial
2927 .into_iter()
2928 .map(RustType::from_proto)
2929 .collect::<Result<_, _>>()?;
2930
2931 Ok(Self {
2932 initial,
2933 pending: BTreeMap::new(),
2934 uniqueness_violation: None,
2935 })
2936 }
2937
2938 fn new_with_uniqueness_fn<KP, VP>(
2941 initial: BTreeMap<KP, VP>,
2942 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2943 ) -> Result<Self, TryFromProtoError>
2944 where
2945 K: RustType<KP>,
2946 V: RustType<VP>,
2947 {
2948 let initial = initial
2949 .into_iter()
2950 .map(RustType::from_proto)
2951 .collect::<Result<_, _>>()?;
2952
2953 Ok(Self {
2954 initial,
2955 pending: BTreeMap::new(),
2956 uniqueness_violation: Some(uniqueness_violation),
2957 })
2958 }
2959
2960 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2963 where
2964 K: RustType<KP>,
2965 V: RustType<VP>,
2966 {
2967 soft_assert_no_log!(self.verify().is_ok());
2968 self.pending
2971 .into_iter()
2972 .flat_map(|(k, v)| {
2973 let mut v: Vec<_> = v
2974 .into_iter()
2975 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
2976 .collect();
2977 differential_dataflow::consolidation::consolidate(&mut v);
2978 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
2979 })
2980 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
2981 .collect()
2982 }
2983
2984 fn verify(&self) -> Result<(), DurableCatalogError> {
2989 if let Some(uniqueness_violation) = self.uniqueness_violation {
2990 let items = self.values();
2992 if V::HAS_UNIQUE_NAME {
2993 let by_name: BTreeMap<_, _> = items
2994 .iter()
2995 .enumerate()
2996 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
2997 .collect();
2998 for (i, vi) in items.iter().enumerate() {
2999 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3000 if i != *j && uniqueness_violation(vi, *vj) {
3001 return Err(DurableCatalogError::UniquenessViolation);
3002 }
3003 }
3004 }
3005 } else {
3006 for (i, vi) in items.iter().enumerate() {
3007 for (j, vj) in items.iter().enumerate() {
3008 if i != j && uniqueness_violation(vi, vj) {
3009 return Err(DurableCatalogError::UniquenessViolation);
3010 }
3011 }
3012 }
3013 }
3014 }
3015 soft_assert_no_log!(
3016 self.pending
3017 .values()
3018 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3019 "pending should be sorted by timestamp: {:?}",
3020 self.pending
3021 );
3022 Ok(())
3023 }
3024
3025 fn verify_keys<'a>(
3030 &self,
3031 keys: impl IntoIterator<Item = &'a K>,
3032 ) -> Result<(), DurableCatalogError>
3033 where
3034 K: 'a,
3035 {
3036 if let Some(uniqueness_violation) = self.uniqueness_violation {
3037 let entries: Vec<_> = keys
3038 .into_iter()
3039 .filter_map(|key| self.get(key).map(|value| (key, value)))
3040 .collect();
3041 for (ki, vi) in self.items() {
3043 for (kj, vj) in &entries {
3044 if ki != *kj && uniqueness_violation(vi, vj) {
3045 return Err(DurableCatalogError::UniquenessViolation);
3046 }
3047 }
3048 }
3049 }
3050 soft_assert_no_log!(self.verify().is_ok());
3051 Ok(())
3052 }
3053
3054 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3057 let mut seen = BTreeSet::new();
3058 for k in self.pending.keys() {
3059 seen.insert(k);
3060 let v = self.get(k);
3061 if let Some(v) = v {
3064 f(k, v);
3065 }
3066 }
3067 for (k, v) in self.initial.iter() {
3068 if !seen.contains(k) {
3070 f(k, v);
3071 }
3072 }
3073 }
3074
3075 fn get(&self, k: &K) -> Option<&V> {
3077 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3078 let mut updates = Vec::with_capacity(pending.len() + 1);
3079 if let Some(initial) = self.initial.get(k) {
3080 updates.push((initial, Diff::ONE));
3081 }
3082 updates.extend(
3083 pending
3084 .into_iter()
3085 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3086 );
3087
3088 differential_dataflow::consolidation::consolidate(&mut updates);
3089 assert!(updates.len() <= 1);
3090 updates.into_iter().next().map(|(v, _)| v)
3091 }
3092
3093 #[cfg(test)]
3098 fn items_cloned(&self) -> BTreeMap<K, V> {
3099 let mut items = BTreeMap::new();
3100 self.for_values(|k, v| {
3101 items.insert(k.clone(), v.clone());
3102 });
3103 items
3104 }
3105
3106 fn items(&self) -> BTreeMap<&K, &V> {
3109 let mut items = BTreeMap::new();
3110 self.for_values(|k, v| {
3111 items.insert(k, v);
3112 });
3113 items
3114 }
3115
3116 fn values(&self) -> BTreeSet<&V> {
3118 let mut items = BTreeSet::new();
3119 self.for_values(|_, v| {
3120 items.insert(v);
3121 });
3122 items
3123 }
3124
3125 fn len(&self) -> usize {
3127 let mut count = 0;
3128 self.for_values(|_, _| {
3129 count += 1;
3130 });
3131 count
3132 }
3133
3134 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3138 &mut self,
3139 mut f: F,
3140 ) {
3141 let mut pending = BTreeMap::new();
3142 self.for_values(|k, v| f(&mut pending, k, v));
3143 for (k, updates) in pending {
3144 self.pending.entry(k).or_default().extend(updates);
3145 }
3146 }
3147
3148 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3152 let mut violation = None;
3153 self.for_values(|for_k, for_v| {
3154 if &k == for_k {
3155 violation = Some(DurableCatalogError::DuplicateKey);
3156 }
3157 if let Some(uniqueness_violation) = self.uniqueness_violation {
3158 if uniqueness_violation(for_v, &v) {
3159 violation = Some(DurableCatalogError::UniquenessViolation);
3160 }
3161 }
3162 });
3163 if let Some(violation) = violation {
3164 return Err(violation);
3165 }
3166 self.pending.entry(k).or_default().push(TransactionUpdate {
3167 value: v,
3168 ts,
3169 diff: Diff::ONE,
3170 });
3171 soft_assert_no_log!(self.verify().is_ok());
3172 Ok(())
3173 }
3174
3175 fn update<F: Fn(&K, &V) -> Option<V>>(
3184 &mut self,
3185 f: F,
3186 ts: Timestamp,
3187 ) -> Result<Diff, DurableCatalogError> {
3188 let mut changed = Diff::ZERO;
3189 let mut keys = BTreeSet::new();
3190 let pending = self.pending.clone();
3192 self.for_values_mut(|p, k, v| {
3193 if let Some(next) = f(k, v) {
3194 changed += Diff::ONE;
3195 keys.insert(k.clone());
3196 let updates = p.entry(k.clone()).or_default();
3197 updates.push(TransactionUpdate {
3198 value: v.clone(),
3199 ts,
3200 diff: Diff::MINUS_ONE,
3201 });
3202 updates.push(TransactionUpdate {
3203 value: next,
3204 ts,
3205 diff: Diff::ONE,
3206 });
3207 }
3208 });
3209 if let Err(err) = self.verify_keys(&keys) {
3211 self.pending = pending;
3212 Err(err)
3213 } else {
3214 Ok(changed)
3215 }
3216 }
3217
3218 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3223 if let Some(cur_v) = self.get(&k) {
3224 if v != *cur_v {
3225 self.set(k, Some(v), ts)?;
3226 }
3227 Ok(true)
3228 } else {
3229 Ok(false)
3230 }
3231 }
3232
3233 fn update_by_keys(
3238 &mut self,
3239 kvs: impl IntoIterator<Item = (K, V)>,
3240 ts: Timestamp,
3241 ) -> Result<Diff, DurableCatalogError> {
3242 let kvs: Vec<_> = kvs
3243 .into_iter()
3244 .filter_map(|(k, v)| match self.get(&k) {
3245 Some(cur_v) => Some((*cur_v == v, k, v)),
3247 None => None,
3248 })
3249 .collect();
3250 let changed = kvs.len();
3251 let changed =
3252 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3253 let kvs = kvs
3254 .into_iter()
3255 .filter(|(no_op, _, _)| !no_op)
3257 .map(|(_, k, v)| (k, Some(v)))
3258 .collect();
3259 self.set_many(kvs, ts)?;
3260 Ok(changed)
3261 }
3262
3263 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3270 let prev = self.get(&k).cloned();
3271 let entry = self.pending.entry(k.clone()).or_default();
3272 let restore_len = entry.len();
3273
3274 match (v, prev.clone()) {
3275 (Some(v), Some(prev)) => {
3276 entry.push(TransactionUpdate {
3277 value: prev,
3278 ts,
3279 diff: Diff::MINUS_ONE,
3280 });
3281 entry.push(TransactionUpdate {
3282 value: v,
3283 ts,
3284 diff: Diff::ONE,
3285 });
3286 }
3287 (Some(v), None) => {
3288 entry.push(TransactionUpdate {
3289 value: v,
3290 ts,
3291 diff: Diff::ONE,
3292 });
3293 }
3294 (None, Some(prev)) => {
3295 entry.push(TransactionUpdate {
3296 value: prev,
3297 ts,
3298 diff: Diff::MINUS_ONE,
3299 });
3300 }
3301 (None, None) => {}
3302 }
3303
3304 if let Err(err) = self.verify_keys([&k]) {
3306 let pending = self.pending.get_mut(&k).expect("inserted above");
3309 pending.truncate(restore_len);
3310 Err(err)
3311 } else {
3312 Ok(prev)
3313 }
3314 }
3315
3316 fn set_many(
3321 &mut self,
3322 kvs: BTreeMap<K, Option<V>>,
3323 ts: Timestamp,
3324 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3325 if kvs.is_empty() {
3326 return Ok(BTreeMap::new());
3327 }
3328
3329 let mut prevs = BTreeMap::new();
3330 let mut restores = BTreeMap::new();
3331
3332 for (k, v) in kvs {
3333 let prev = self.get(&k).cloned();
3334 let entry = self.pending.entry(k.clone()).or_default();
3335 restores.insert(k.clone(), entry.len());
3336
3337 match (v, prev.clone()) {
3338 (Some(v), Some(prev)) => {
3339 entry.push(TransactionUpdate {
3340 value: prev,
3341 ts,
3342 diff: Diff::MINUS_ONE,
3343 });
3344 entry.push(TransactionUpdate {
3345 value: v,
3346 ts,
3347 diff: Diff::ONE,
3348 });
3349 }
3350 (Some(v), None) => {
3351 entry.push(TransactionUpdate {
3352 value: v,
3353 ts,
3354 diff: Diff::ONE,
3355 });
3356 }
3357 (None, Some(prev)) => {
3358 entry.push(TransactionUpdate {
3359 value: prev,
3360 ts,
3361 diff: Diff::MINUS_ONE,
3362 });
3363 }
3364 (None, None) => {}
3365 }
3366
3367 prevs.insert(k, prev);
3368 }
3369
3370 if let Err(err) = self.verify_keys(prevs.keys()) {
3372 for (k, restore_len) in restores {
3373 let pending = self.pending.get_mut(&k).expect("inserted above");
3376 pending.truncate(restore_len);
3377 }
3378 Err(err)
3379 } else {
3380 Ok(prevs)
3381 }
3382 }
3383
3384 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3390 let mut deleted = Vec::new();
3391 self.for_values_mut(|p, k, v| {
3392 if f(k, v) {
3393 deleted.push((k.clone(), v.clone()));
3394 p.entry(k.clone()).or_default().push(TransactionUpdate {
3395 value: v.clone(),
3396 ts,
3397 diff: Diff::MINUS_ONE,
3398 });
3399 }
3400 });
3401 soft_assert_no_log!(self.verify().is_ok());
3402 deleted
3403 }
3404
3405 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3409 self.set(k, None, ts)
3410 .expect("deleting an entry cannot violate uniqueness")
3411 }
3412
3413 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3417 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3418 let prevs = self
3419 .set_many(kvs, ts)
3420 .expect("deleting entries cannot violate uniqueness");
3421 prevs
3422 .into_iter()
3423 .filter_map(|(k, v)| v.map(|v| (k, v)))
3424 .collect()
3425 }
3426}
3427
3428#[cfg(test)]
3429#[allow(clippy::unwrap_used)]
3430mod tests {
3431 use super::*;
3432
3433 use mz_ore::now::SYSTEM_TIME;
3434 use mz_ore::{assert_none, assert_ok};
3435 use mz_persist_client::cache::PersistClientCache;
3436 use mz_persist_types::PersistLocation;
3437 use semver::Version;
3438
3439 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3440 use crate::memory;
3441
3442 #[mz_ore::test]
3443 fn test_table_transaction_simple() {
3444 fn uniqueness_violation(a: &String, b: &String) -> bool {
3445 a == b
3446 }
3447 let mut table = TableTransaction::new_with_uniqueness_fn(
3448 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3449 uniqueness_violation,
3450 )
3451 .unwrap();
3452
3453 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3456 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3457 assert!(
3458 table
3459 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3460 .is_err()
3461 );
3462 assert!(
3463 table
3464 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3465 .is_err()
3466 );
3467 }
3468
3469 #[mz_ore::test]
3470 fn test_table_transaction() {
3471 fn uniqueness_violation(a: &String, b: &String) -> bool {
3472 a == b
3473 }
3474 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3475
3476 fn commit(
3477 table: &mut BTreeMap<Vec<u8>, String>,
3478 mut pending: Vec<(Vec<u8>, String, Diff)>,
3479 ) {
3480 pending.sort_by(|a, b| a.2.cmp(&b.2));
3482 for (k, v, diff) in pending {
3483 if diff == Diff::MINUS_ONE {
3484 let prev = table.remove(&k);
3485 assert_eq!(prev, Some(v));
3486 } else if diff == Diff::ONE {
3487 let prev = table.insert(k, v);
3488 assert_eq!(prev, None);
3489 } else {
3490 panic!("unexpected diff: {diff}");
3491 }
3492 }
3493 }
3494
3495 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3496 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3497 let mut table_txn =
3498 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3499 assert_eq!(table_txn.items_cloned(), table);
3500 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3501 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3502 assert_eq!(
3503 table_txn.items_cloned(),
3504 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3505 );
3506 assert_eq!(
3507 table_txn
3508 .update(|_k, _v| Some("v3".to_string()), 2)
3509 .unwrap(),
3510 Diff::ONE
3511 );
3512
3513 table_txn
3515 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3516 .unwrap_err();
3517
3518 table_txn
3519 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3520 .unwrap();
3521 assert_eq!(
3522 table_txn.items_cloned(),
3523 BTreeMap::from([
3524 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3525 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3526 ])
3527 );
3528 let err = table_txn
3529 .update(|_k, _v| Some("v1".to_string()), 5)
3530 .unwrap_err();
3531 assert!(
3532 matches!(err, DurableCatalogError::UniquenessViolation),
3533 "unexpected err: {err:?}"
3534 );
3535 let pending = table_txn.pending();
3536 assert_eq!(
3537 pending,
3538 vec![
3539 (
3540 1i64.to_le_bytes().to_vec(),
3541 "v1".to_string(),
3542 Diff::MINUS_ONE
3543 ),
3544 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3545 (
3546 2i64.to_le_bytes().to_vec(),
3547 "v2".to_string(),
3548 Diff::MINUS_ONE
3549 ),
3550 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3551 ]
3552 );
3553 commit(&mut table, pending);
3554 assert_eq!(
3555 table,
3556 BTreeMap::from([
3557 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3558 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3559 ])
3560 );
3561
3562 let mut table_txn =
3563 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3564 assert_eq!(
3566 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3567 1
3568 );
3569 table_txn
3570 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3571 .unwrap();
3572 table_txn
3574 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3575 .unwrap_err();
3576 table_txn
3578 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3579 .unwrap_err();
3580 assert_eq!(
3581 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3582 1
3583 );
3584 table_txn
3586 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3587 .unwrap();
3588 table_txn
3589 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3590 .unwrap();
3591 let pending = table_txn.pending();
3592 assert_eq!(
3593 pending,
3594 vec![
3595 (
3596 1i64.to_le_bytes().to_vec(),
3597 "v3".to_string(),
3598 Diff::MINUS_ONE
3599 ),
3600 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3601 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3602 ]
3603 );
3604 commit(&mut table, pending);
3605 assert_eq!(
3606 table,
3607 BTreeMap::from([
3608 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3609 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3610 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3611 ])
3612 );
3613
3614 let mut table_txn =
3615 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3616 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3617 table_txn
3618 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3619 .unwrap();
3620
3621 commit(&mut table, table_txn.pending());
3622 assert_eq!(
3623 table,
3624 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3625 );
3626
3627 let mut table_txn =
3628 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3629 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3630 table_txn
3631 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3632 .unwrap();
3633 commit(&mut table, table_txn.pending());
3634 assert_eq!(
3635 table,
3636 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3637 );
3638
3639 let mut table_txn =
3641 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3642 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3643 table_txn
3644 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3645 .unwrap();
3646 table_txn
3647 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3648 .unwrap_err();
3649 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3650 table_txn
3651 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3652 .unwrap();
3653 commit(&mut table, table_txn.pending());
3654 assert_eq!(
3655 table.clone().into_iter().collect::<Vec<_>>(),
3656 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3657 );
3658
3659 let mut table_txn =
3661 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3662 table_txn
3664 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3665 .unwrap_err();
3666 table_txn
3667 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3668 .unwrap();
3669 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3670 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3671 let pending = table_txn.pending();
3672 assert_eq!(
3673 pending,
3674 vec![
3675 (
3676 1i64.to_le_bytes().to_vec(),
3677 "v5".to_string(),
3678 Diff::MINUS_ONE
3679 ),
3680 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3681 ]
3682 );
3683 commit(&mut table, pending);
3684 assert_eq!(
3685 table,
3686 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3687 );
3688
3689 let mut table_txn =
3691 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3692 table_txn
3693 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3694 .unwrap();
3695 let pending = table_txn.pending::<Vec<u8>, String>();
3696 assert!(pending.is_empty());
3697
3698 let mut table_txn =
3700 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3701 table_txn
3703 .set_many(
3704 BTreeMap::from([
3705 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3706 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3707 ]),
3708 0,
3709 )
3710 .unwrap_err();
3711 table_txn
3712 .set_many(
3713 BTreeMap::from([
3714 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3715 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3716 ]),
3717 1,
3718 )
3719 .unwrap();
3720 table_txn
3721 .set_many(
3722 BTreeMap::from([
3723 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3724 (3i64.to_le_bytes().to_vec(), None),
3725 ]),
3726 2,
3727 )
3728 .unwrap();
3729 let pending = table_txn.pending();
3730 assert_eq!(
3731 pending,
3732 vec![
3733 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3734 (
3735 3i64.to_le_bytes().to_vec(),
3736 "v6".to_string(),
3737 Diff::MINUS_ONE
3738 ),
3739 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3740 ]
3741 );
3742 commit(&mut table, pending);
3743 assert_eq!(
3744 table,
3745 BTreeMap::from([
3746 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3747 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3748 ])
3749 );
3750
3751 let mut table_txn =
3753 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3754 table_txn
3755 .set_many(
3756 BTreeMap::from([
3757 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3758 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3759 ]),
3760 0,
3761 )
3762 .unwrap();
3763 let pending = table_txn.pending::<Vec<u8>, String>();
3764 assert!(pending.is_empty());
3765 commit(&mut table, pending);
3766 assert_eq!(
3767 table,
3768 BTreeMap::from([
3769 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3770 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3771 ])
3772 );
3773
3774 let mut table_txn =
3776 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3777 table_txn
3779 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3780 .unwrap_err();
3781 assert!(
3782 table_txn
3783 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3784 .unwrap()
3785 );
3786 assert!(
3787 !table_txn
3788 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3789 .unwrap()
3790 );
3791 let pending = table_txn.pending();
3792 assert_eq!(
3793 pending,
3794 vec![
3795 (
3796 1i64.to_le_bytes().to_vec(),
3797 "v6".to_string(),
3798 Diff::MINUS_ONE
3799 ),
3800 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3801 ]
3802 );
3803 commit(&mut table, pending);
3804 assert_eq!(
3805 table,
3806 BTreeMap::from([
3807 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3808 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3809 ])
3810 );
3811
3812 let mut table_txn =
3814 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3815 assert!(
3816 table_txn
3817 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3818 .unwrap()
3819 );
3820 let pending = table_txn.pending::<Vec<u8>, String>();
3821 assert!(pending.is_empty());
3822 commit(&mut table, pending);
3823 assert_eq!(
3824 table,
3825 BTreeMap::from([
3826 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3827 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3828 ])
3829 );
3830
3831 let mut table_txn =
3833 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3834 table_txn
3836 .update_by_keys(
3837 [
3838 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3839 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3840 ],
3841 0,
3842 )
3843 .unwrap_err();
3844 let n = table_txn
3845 .update_by_keys(
3846 [
3847 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3848 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3849 ],
3850 1,
3851 )
3852 .unwrap();
3853 assert_eq!(n, Diff::ONE);
3854 let n = table_txn
3855 .update_by_keys(
3856 [
3857 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3858 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3859 ],
3860 2,
3861 )
3862 .unwrap();
3863 assert_eq!(n, Diff::ZERO);
3864 let pending = table_txn.pending();
3865 assert_eq!(
3866 pending,
3867 vec![
3868 (
3869 1i64.to_le_bytes().to_vec(),
3870 "v8".to_string(),
3871 Diff::MINUS_ONE
3872 ),
3873 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3874 ]
3875 );
3876 commit(&mut table, pending);
3877 assert_eq!(
3878 table,
3879 BTreeMap::from([
3880 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3881 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3882 ])
3883 );
3884
3885 let mut table_txn =
3887 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3888 let n = table_txn
3889 .update_by_keys(
3890 [
3891 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3892 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3893 ],
3894 0,
3895 )
3896 .unwrap();
3897 assert_eq!(n, Diff::from(2));
3898 let pending = table_txn.pending::<Vec<u8>, String>();
3899 assert!(pending.is_empty());
3900 commit(&mut table, pending);
3901 assert_eq!(
3902 table,
3903 BTreeMap::from([
3904 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3905 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3906 ])
3907 );
3908
3909 let mut table_txn =
3911 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3912 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3913 assert_eq!(prev, Some("v9".to_string()));
3914 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3915 assert_none!(prev);
3916 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3917 assert_none!(prev);
3918 let pending = table_txn.pending();
3919 assert_eq!(
3920 pending,
3921 vec![(
3922 1i64.to_le_bytes().to_vec(),
3923 "v9".to_string(),
3924 Diff::MINUS_ONE
3925 ),]
3926 );
3927 commit(&mut table, pending);
3928 assert_eq!(
3929 table,
3930 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3931 );
3932
3933 let mut table_txn =
3935 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3936 let prevs = table_txn.delete_by_keys(
3937 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3938 0,
3939 );
3940 assert_eq!(
3941 prevs,
3942 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3943 );
3944 let prevs = table_txn.delete_by_keys(
3945 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3946 1,
3947 );
3948 assert_eq!(prevs, vec![]);
3949 let prevs = table_txn.delete_by_keys(
3950 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3951 2,
3952 );
3953 assert_eq!(prevs, vec![]);
3954 let pending = table_txn.pending();
3955 assert_eq!(
3956 pending,
3957 vec![(
3958 42i64.to_le_bytes().to_vec(),
3959 "v7".to_string(),
3960 Diff::MINUS_ONE
3961 ),]
3962 );
3963 commit(&mut table, pending);
3964 assert_eq!(table, BTreeMap::new());
3965 }
3966
3967 #[mz_ore::test(tokio::test)]
3968 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
3970 const VERSION: Version = Version::new(26, 0, 0);
3971 let mut persist_cache = PersistClientCache::new_no_metrics();
3972 persist_cache.cfg.build_version = VERSION;
3973 let persist_client = persist_cache
3974 .open(PersistLocation::new_in_mem())
3975 .await
3976 .unwrap();
3977 let state_builder = TestCatalogStateBuilder::new(persist_client)
3978 .with_default_deploy_generation()
3979 .with_version(VERSION);
3980
3981 let _ = state_builder
3983 .clone()
3984 .unwrap_build()
3985 .await
3986 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
3987 .await
3988 .unwrap()
3989 .0;
3990 let mut savepoint_state = state_builder
3991 .unwrap_build()
3992 .await
3993 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
3994 .await
3995 .unwrap()
3996 .0;
3997
3998 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
3999 assert!(!initial_snapshot.is_empty());
4000
4001 let db_name = "db";
4002 let db_owner = RoleId::User(42);
4003 let db_privileges = Vec::new();
4004 let mut txn = savepoint_state.transaction().await.unwrap();
4005 let (db_id, db_oid) = txn
4006 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4007 .unwrap();
4008 let commit_ts = txn.upper();
4009 txn.commit_internal(commit_ts).await.unwrap();
4010 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4011 let update = updates.into_element();
4012
4013 assert_eq!(update.diff, StateDiff::Addition);
4014
4015 let db = match update.kind {
4016 memory::objects::StateUpdateKind::Database(db) => db,
4017 update => panic!("unexpected update: {update:?}"),
4018 };
4019
4020 assert_eq!(db_id, db.id);
4021 assert_eq!(db_oid, db.oid);
4022 assert_eq!(db_name, db.name);
4023 assert_eq!(db_owner, db.owner_id);
4024 assert_eq!(db_privileges, db.privileges);
4025 }
4026
4027 #[mz_ore::test]
4028 fn test_allocate_introspection_source_index_id() {
4029 let cluster_variant: u8 = 0b0000_0001;
4030 let cluster_id_inner: u64 =
4031 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4032 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4033
4034 let cluster_id = ClusterId::System(cluster_id_inner);
4035 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4036
4037 let introspection_source_index_id: u64 =
4038 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4039
4040 {
4042 let mut cluster_variant_mask = 0xFF << 56;
4043 cluster_variant_mask &= introspection_source_index_id;
4044 cluster_variant_mask >>= 56;
4045 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4046 }
4047
4048 {
4050 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4051 cluster_id_inner_mask &= introspection_source_index_id;
4052 cluster_id_inner_mask >>= 8;
4053 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4054 }
4055
4056 {
4058 let mut log_variant_mask = 0xFF;
4059 log_variant_mask &= introspection_source_index_id;
4060 assert_eq!(
4061 log_variant_mask,
4062 u64::from(timely_messages_received_log_variant)
4063 );
4064 }
4065
4066 let (catalog_item_id, global_id) =
4067 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4068
4069 assert_eq!(
4070 catalog_item_id,
4071 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4072 );
4073 assert_eq!(
4074 global_id,
4075 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4076 );
4077 }
4078}