1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68 SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69 USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70 USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81 #[derivative(Debug = "ignore")]
82 #[derivative(PartialEq = "ignore")]
83 durable_catalog: &'a mut dyn DurableCatalogState,
84 databases: TableTransaction<DatabaseKey, DatabaseValue>,
85 schemas: TableTransaction<SchemaKey, SchemaValue>,
86 items: TableTransaction<ItemKey, ItemValue>,
87 comments: TableTransaction<CommentKey, CommentValue>,
88 roles: TableTransaction<RoleKey, RoleValue>,
89 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90 clusters: TableTransaction<ClusterKey, ClusterValue>,
91 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92 introspection_sources:
93 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95 configs: TableTransaction<ConfigKey, ConfigValue>,
96 settings: TableTransaction<SettingKey, SettingValue>,
97 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103 storage_collection_metadata:
104 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110 upper: mz_repr::Timestamp,
112 op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117 pub fn new(
118 durable_catalog: &'a mut dyn DurableCatalogState,
119 Snapshot {
120 databases,
121 schemas,
122 roles,
123 role_auth,
124 items,
125 comments,
126 clusters,
127 network_policies,
128 cluster_replicas,
129 introspection_sources,
130 id_allocator,
131 configs,
132 settings,
133 source_references,
134 system_object_mappings,
135 system_configurations,
136 default_privileges,
137 system_privileges,
138 storage_collection_metadata,
139 unfinalized_shards,
140 txn_wal_shard,
141 }: Snapshot,
142 upper: mz_repr::Timestamp,
143 ) -> Result<Transaction<'a>, CatalogError> {
144 Ok(Transaction {
145 durable_catalog,
146 databases: TableTransaction::new_with_uniqueness_fn(
147 databases,
148 |a: &DatabaseValue, b| a.name == b.name,
149 )?,
150 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151 a.database_id == b.database_id && a.name == b.name
152 })?,
153 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154 a.schema_id == b.schema_id && a.name == b.name && {
155 let a_type = a.item_type();
157 let b_type = b.item_type();
158 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161 }
162 })?,
163 comments: TableTransaction::new(comments)?,
164 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165 a.name == b.name
166 })?,
167 role_auth: TableTransaction::new(role_auth)?,
168 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169 a.name == b.name
170 })?,
171 network_policies: TableTransaction::new_with_uniqueness_fn(
172 network_policies,
173 |a: &NetworkPolicyValue, b| a.name == b.name,
174 )?,
175 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176 cluster_replicas,
177 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178 )?,
179 introspection_sources: TableTransaction::new(introspection_sources)?,
180 id_allocator: TableTransaction::new(id_allocator)?,
181 configs: TableTransaction::new(configs)?,
182 settings: TableTransaction::new(settings)?,
183 source_references: TableTransaction::new(source_references)?,
184 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185 system_configurations: TableTransaction::new(system_configurations)?,
186 default_privileges: TableTransaction::new(default_privileges)?,
187 system_privileges: TableTransaction::new(system_privileges)?,
188 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194 audit_log_updates: Vec::new(),
195 upper,
196 op_id: 0,
197 })
198 }
199
200 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201 let key = ItemKey { id: *id };
202 self.items
203 .get(&key)
204 .map(|v| DurableType::from_key_value(key, v.clone()))
205 }
206
207 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208 self.items
209 .items()
210 .into_iter()
211 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212 .sorted_by_key(|Item { id, .. }| *id)
213 }
214
215 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216 self.insert_audit_log_events([event]);
217 }
218
219 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220 let events = events
221 .into_iter()
222 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223 self.audit_log_updates.extend(events);
224 }
225
226 pub fn insert_user_database(
227 &mut self,
228 database_name: &str,
229 owner_id: RoleId,
230 privileges: Vec<MzAclItem>,
231 temporary_oids: &HashSet<u32>,
232 ) -> Result<(DatabaseId, u32), CatalogError> {
233 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234 let id = DatabaseId::User(id);
235 let oid = self.allocate_oid(temporary_oids)?;
236 self.insert_database(id, database_name, owner_id, privileges, oid)?;
237 Ok((id, oid))
238 }
239
240 pub(crate) fn insert_database(
241 &mut self,
242 id: DatabaseId,
243 database_name: &str,
244 owner_id: RoleId,
245 privileges: Vec<MzAclItem>,
246 oid: u32,
247 ) -> Result<u32, CatalogError> {
248 match self.databases.insert(
249 DatabaseKey { id },
250 DatabaseValue {
251 name: database_name.to_string(),
252 owner_id,
253 privileges,
254 oid,
255 },
256 self.op_id,
257 ) {
258 Ok(_) => Ok(oid),
259 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260 }
261 }
262
263 pub fn insert_user_schema(
264 &mut self,
265 database_id: DatabaseId,
266 schema_name: &str,
267 owner_id: RoleId,
268 privileges: Vec<MzAclItem>,
269 temporary_oids: &HashSet<u32>,
270 ) -> Result<(SchemaId, u32), CatalogError> {
271 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272 let id = SchemaId::User(id);
273 let oid = self.allocate_oid(temporary_oids)?;
274 self.insert_schema(
275 id,
276 Some(database_id),
277 schema_name.to_string(),
278 owner_id,
279 privileges,
280 oid,
281 )?;
282 Ok((id, oid))
283 }
284
285 pub fn insert_system_schema(
286 &mut self,
287 schema_id: u64,
288 schema_name: &str,
289 owner_id: RoleId,
290 privileges: Vec<MzAclItem>,
291 oid: u32,
292 ) -> Result<(), CatalogError> {
293 let id = SchemaId::System(schema_id);
294 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295 }
296
297 pub(crate) fn insert_schema(
298 &mut self,
299 schema_id: SchemaId,
300 database_id: Option<DatabaseId>,
301 schema_name: String,
302 owner_id: RoleId,
303 privileges: Vec<MzAclItem>,
304 oid: u32,
305 ) -> Result<(), CatalogError> {
306 match self.schemas.insert(
307 SchemaKey { id: schema_id },
308 SchemaValue {
309 database_id,
310 name: schema_name.clone(),
311 owner_id,
312 privileges,
313 oid,
314 },
315 self.op_id,
316 ) {
317 Ok(_) => Ok(()),
318 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319 }
320 }
321
322 pub fn insert_builtin_role(
323 &mut self,
324 id: RoleId,
325 name: String,
326 attributes: RoleAttributesRaw,
327 membership: RoleMembership,
328 vars: RoleVars,
329 oid: u32,
330 ) -> Result<RoleId, CatalogError> {
331 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332 self.insert_role(id, name, attributes, membership, vars, oid)?;
333 Ok(id)
334 }
335
336 pub fn insert_user_role(
337 &mut self,
338 name: String,
339 attributes: RoleAttributesRaw,
340 membership: RoleMembership,
341 vars: RoleVars,
342 temporary_oids: &HashSet<u32>,
343 ) -> Result<(RoleId, u32), CatalogError> {
344 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345 let id = RoleId::User(id);
346 let oid = self.allocate_oid(temporary_oids)?;
347 self.insert_role(id, name, attributes, membership, vars, oid)?;
348 Ok((id, oid))
349 }
350
351 fn insert_role(
352 &mut self,
353 id: RoleId,
354 name: String,
355 attributes: RoleAttributesRaw,
356 membership: RoleMembership,
357 vars: RoleVars,
358 oid: u32,
359 ) -> Result<(), CatalogError> {
360 if let Some(ref password) = attributes.password {
361 let hash = mz_auth::hash::scram256_hash(
362 password,
363 &attributes
364 .scram_iterations
365 .or_else(|| {
366 soft_panic_or_log!(
367 "Hash iterations must be set if a password is provided."
368 );
369 None
370 })
371 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374 )
375 .expect("password hash should be valid");
376 match self.role_auth.insert(
377 RoleAuthKey { role_id: id },
378 RoleAuthValue {
379 password_hash: Some(hash),
380 updated_at: SYSTEM_TIME(),
381 },
382 self.op_id,
383 ) {
384 Ok(_) => {}
385 Err(_) => {
386 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387 }
388 }
389 }
390
391 match self.roles.insert(
392 RoleKey { id },
393 RoleValue {
394 name: name.clone(),
395 attributes: attributes.into(),
396 membership,
397 vars,
398 oid,
399 },
400 self.op_id,
401 ) {
402 Ok(_) => Ok(()),
403 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404 }
405 }
406
407 pub fn insert_user_cluster(
409 &mut self,
410 cluster_id: ClusterId,
411 cluster_name: &str,
412 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413 owner_id: RoleId,
414 privileges: Vec<MzAclItem>,
415 config: ClusterConfig,
416 temporary_oids: &HashSet<u32>,
417 ) -> Result<(), CatalogError> {
418 self.insert_cluster(
419 cluster_id,
420 cluster_name,
421 introspection_source_indexes,
422 owner_id,
423 privileges,
424 config,
425 temporary_oids,
426 )
427 }
428
429 pub fn insert_system_cluster(
431 &mut self,
432 cluster_name: &str,
433 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434 privileges: Vec<MzAclItem>,
435 owner_id: RoleId,
436 config: ClusterConfig,
437 temporary_oids: &HashSet<u32>,
438 ) -> Result<(), CatalogError> {
439 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441 self.insert_cluster(
442 cluster_id,
443 cluster_name,
444 introspection_source_indexes,
445 owner_id,
446 privileges,
447 config,
448 temporary_oids,
449 )
450 }
451
452 fn insert_cluster(
453 &mut self,
454 cluster_id: ClusterId,
455 cluster_name: &str,
456 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
457 owner_id: RoleId,
458 privileges: Vec<MzAclItem>,
459 config: ClusterConfig,
460 temporary_oids: &HashSet<u32>,
461 ) -> Result<(), CatalogError> {
462 if let Err(_) = self.clusters.insert(
463 ClusterKey { id: cluster_id },
464 ClusterValue {
465 name: cluster_name.to_string(),
466 owner_id,
467 privileges,
468 config,
469 },
470 self.op_id,
471 ) {
472 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
473 };
474
475 let amount = usize_to_u64(introspection_source_indexes.len());
476 let oids = self.allocate_oids(amount, temporary_oids)?;
477 let introspection_source_indexes: Vec<_> = introspection_source_indexes
478 .into_iter()
479 .zip_eq(oids)
480 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
481 .collect();
482 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
483 let introspection_source_index = IntrospectionSourceIndex {
484 cluster_id,
485 name: builtin.name.to_string(),
486 item_id,
487 index_id,
488 oid,
489 };
490 let (key, value) = introspection_source_index.into_key_value();
491 self.introspection_sources
492 .insert(key, value, self.op_id)
493 .expect("no uniqueness violation");
494 }
495
496 Ok(())
497 }
498
499 pub fn rename_cluster(
500 &mut self,
501 cluster_id: ClusterId,
502 cluster_name: &str,
503 cluster_to_name: &str,
504 ) -> Result<(), CatalogError> {
505 let key = ClusterKey { id: cluster_id };
506
507 match self.clusters.update(
508 |k, v| {
509 if *k == key {
510 let mut value = v.clone();
511 value.name = cluster_to_name.to_string();
512 Some(value)
513 } else {
514 None
515 }
516 },
517 self.op_id,
518 )? {
519 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
520 Diff::ONE => Ok(()),
521 n => panic!(
522 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
523 ),
524 }
525 }
526
527 pub fn rename_cluster_replica(
528 &mut self,
529 replica_id: ReplicaId,
530 replica_name: &QualifiedReplica,
531 replica_to_name: &str,
532 ) -> Result<(), CatalogError> {
533 let key = ClusterReplicaKey { id: replica_id };
534
535 match self.cluster_replicas.update(
536 |k, v| {
537 if *k == key {
538 let mut value = v.clone();
539 value.name = replica_to_name.to_string();
540 Some(value)
541 } else {
542 None
543 }
544 },
545 self.op_id,
546 )? {
547 Diff::ZERO => {
548 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
549 }
550 Diff::ONE => Ok(()),
551 n => panic!(
552 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
553 ),
554 }
555 }
556
557 pub fn insert_cluster_replica(
558 &mut self,
559 cluster_id: ClusterId,
560 replica_name: &str,
561 config: ReplicaConfig,
562 owner_id: RoleId,
563 ) -> Result<ReplicaId, CatalogError> {
564 let replica_id = match cluster_id {
565 ClusterId::System(_) => self.allocate_system_replica_id()?,
566 ClusterId::User(_) => self.allocate_user_replica_id()?,
567 };
568 self.insert_cluster_replica_with_id(
569 cluster_id,
570 replica_id,
571 replica_name,
572 config,
573 owner_id,
574 )?;
575 Ok(replica_id)
576 }
577
578 pub(crate) fn insert_cluster_replica_with_id(
579 &mut self,
580 cluster_id: ClusterId,
581 replica_id: ReplicaId,
582 replica_name: &str,
583 config: ReplicaConfig,
584 owner_id: RoleId,
585 ) -> Result<(), CatalogError> {
586 if let Err(_) = self.cluster_replicas.insert(
587 ClusterReplicaKey { id: replica_id },
588 ClusterReplicaValue {
589 cluster_id,
590 name: replica_name.into(),
591 config,
592 owner_id,
593 },
594 self.op_id,
595 ) {
596 let cluster = self
597 .clusters
598 .get(&ClusterKey { id: cluster_id })
599 .expect("cluster exists");
600 return Err(SqlCatalogError::DuplicateReplica(
601 replica_name.to_string(),
602 cluster.name.to_string(),
603 )
604 .into());
605 };
606 Ok(())
607 }
608
609 pub fn insert_user_network_policy(
610 &mut self,
611 name: String,
612 rules: Vec<NetworkPolicyRule>,
613 privileges: Vec<MzAclItem>,
614 owner_id: RoleId,
615 temporary_oids: &HashSet<u32>,
616 ) -> Result<NetworkPolicyId, CatalogError> {
617 let oid = self.allocate_oid(temporary_oids)?;
618 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
619 let id = NetworkPolicyId::User(id);
620 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
621 }
622
623 pub fn insert_network_policy(
624 &mut self,
625 id: NetworkPolicyId,
626 name: String,
627 rules: Vec<NetworkPolicyRule>,
628 privileges: Vec<MzAclItem>,
629 owner_id: RoleId,
630 oid: u32,
631 ) -> Result<NetworkPolicyId, CatalogError> {
632 match self.network_policies.insert(
633 NetworkPolicyKey { id },
634 NetworkPolicyValue {
635 name: name.clone(),
636 rules,
637 privileges,
638 owner_id,
639 oid,
640 },
641 self.op_id,
642 ) {
643 Ok(_) => Ok(id),
644 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
645 }
646 }
647
648 pub fn update_introspection_source_index_gids(
653 &mut self,
654 mappings: impl Iterator<
655 Item = (
656 ClusterId,
657 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
658 ),
659 >,
660 ) -> Result<(), CatalogError> {
661 for (cluster_id, updates) in mappings {
662 for (name, item_id, index_id, oid) in updates {
663 let introspection_source_index = IntrospectionSourceIndex {
664 cluster_id,
665 name,
666 item_id,
667 index_id,
668 oid,
669 };
670 let (key, value) = introspection_source_index.into_key_value();
671
672 let prev = self
673 .introspection_sources
674 .set(key, Some(value), self.op_id)?;
675 if prev.is_none() {
676 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
677 "{index_id}"
678 ))
679 .into());
680 }
681 }
682 }
683 Ok(())
684 }
685
686 pub fn insert_user_item(
687 &mut self,
688 id: CatalogItemId,
689 global_id: GlobalId,
690 schema_id: SchemaId,
691 item_name: &str,
692 create_sql: String,
693 owner_id: RoleId,
694 privileges: Vec<MzAclItem>,
695 temporary_oids: &HashSet<u32>,
696 versions: BTreeMap<RelationVersion, GlobalId>,
697 ) -> Result<u32, CatalogError> {
698 let oid = self.allocate_oid(temporary_oids)?;
699 self.insert_item(
700 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
701 )?;
702 Ok(oid)
703 }
704
705 pub fn insert_item(
706 &mut self,
707 id: CatalogItemId,
708 oid: u32,
709 global_id: GlobalId,
710 schema_id: SchemaId,
711 item_name: &str,
712 create_sql: String,
713 owner_id: RoleId,
714 privileges: Vec<MzAclItem>,
715 extra_versions: BTreeMap<RelationVersion, GlobalId>,
716 ) -> Result<(), CatalogError> {
717 match self.items.insert(
718 ItemKey { id },
719 ItemValue {
720 schema_id,
721 name: item_name.to_string(),
722 create_sql,
723 owner_id,
724 privileges,
725 oid,
726 global_id,
727 extra_versions,
728 },
729 self.op_id,
730 ) {
731 Ok(_) => Ok(()),
732 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
733 }
734 }
735
736 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
737 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
738 }
739
740 pub fn get_and_increment_id_by(
741 &mut self,
742 key: String,
743 amount: u64,
744 ) -> Result<Vec<u64>, CatalogError> {
745 assert!(
746 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
747 "system item IDs cannot be allocated outside of bootstrap"
748 );
749
750 let current_id = self
751 .id_allocator
752 .items()
753 .get(&IdAllocKey { name: key.clone() })
754 .unwrap_or_else(|| panic!("{key} id allocator missing"))
755 .next_id;
756 let next_id = current_id
757 .checked_add(amount)
758 .ok_or(SqlCatalogError::IdExhaustion)?;
759 let prev = self.id_allocator.set(
760 IdAllocKey { name: key },
761 Some(IdAllocValue { next_id }),
762 self.op_id,
763 )?;
764 assert_eq!(
765 prev,
766 Some(IdAllocValue {
767 next_id: current_id
768 })
769 );
770 Ok((current_id..next_id).collect())
771 }
772
773 pub fn allocate_system_item_ids(
774 &mut self,
775 amount: u64,
776 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
777 assert!(
778 !self.durable_catalog.is_bootstrap_complete(),
779 "we can only allocate system item IDs during bootstrap"
780 );
781 Ok(self
782 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
783 .into_iter()
784 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
786 .collect())
787 }
788
789 pub fn allocate_introspection_source_index_id(
821 cluster_id: &ClusterId,
822 log_variant: LogVariant,
823 ) -> (CatalogItemId, GlobalId) {
824 let cluster_variant: u8 = match cluster_id {
825 ClusterId::System(_) => 1,
826 ClusterId::User(_) => 2,
827 };
828 let cluster_id: u64 = cluster_id.inner_id();
829 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
830 assert_eq!(
831 CLUSTER_ID_MASK & cluster_id,
832 0,
833 "invalid cluster ID: {cluster_id}"
834 );
835 let log_variant: u8 = match log_variant {
836 LogVariant::Timely(TimelyLog::Operates) => 1,
837 LogVariant::Timely(TimelyLog::Channels) => 2,
838 LogVariant::Timely(TimelyLog::Elapsed) => 3,
839 LogVariant::Timely(TimelyLog::Histogram) => 4,
840 LogVariant::Timely(TimelyLog::Addresses) => 5,
841 LogVariant::Timely(TimelyLog::Parks) => 6,
842 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
843 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
844 LogVariant::Timely(TimelyLog::Reachability) => 9,
845 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
846 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
847 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
848 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
849 LogVariant::Differential(DifferentialLog::Sharing) => 14,
850 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
851 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
852 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
853 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
854 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
855 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
856 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
857 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
858 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
859 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
860 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
861 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
862 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
863 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
864 LogVariant::Compute(ComputeLog::LirMapping) => 30,
865 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
866 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
867 };
868
869 let mut id: u64 = u64::from(cluster_variant) << 56;
870 id |= cluster_id << 8;
871 id |= u64::from(log_variant);
872
873 (
874 CatalogItemId::IntrospectionSourceIndex(id),
875 GlobalId::IntrospectionSourceIndex(id),
876 )
877 }
878
879 pub fn allocate_user_item_ids(
880 &mut self,
881 amount: u64,
882 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
883 Ok(self
884 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
885 .into_iter()
886 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
888 .collect())
889 }
890
891 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
892 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
893 Ok(ReplicaId::User(id))
894 }
895
896 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
897 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
898 Ok(ReplicaId::System(id))
899 }
900
901 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
902 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
903 }
904
905 pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
906 self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
907 }
908
909 #[mz_ore::instrument]
912 fn allocate_oids(
913 &mut self,
914 amount: u64,
915 temporary_oids: &HashSet<u32>,
916 ) -> Result<Vec<u32>, CatalogError> {
917 struct UserOid(u32);
920
921 impl UserOid {
922 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
923 if oid < FIRST_USER_OID {
924 Err(anyhow!("invalid user OID {oid}"))
925 } else {
926 Ok(UserOid(oid))
927 }
928 }
929 }
930
931 impl std::ops::AddAssign<u32> for UserOid {
932 fn add_assign(&mut self, rhs: u32) {
933 let (res, overflow) = self.0.overflowing_add(rhs);
934 self.0 = if overflow { FIRST_USER_OID + res } else { res };
935 }
936 }
937
938 if amount > u32::MAX.into() {
939 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
940 }
941
942 let mut allocated_oids = HashSet::with_capacity(
948 self.databases.len()
949 + self.schemas.len()
950 + self.roles.len()
951 + self.items.len()
952 + self.introspection_sources.len()
953 + temporary_oids.len(),
954 );
955 self.databases.for_values(|_, value| {
956 allocated_oids.insert(value.oid);
957 });
958 self.schemas.for_values(|_, value| {
959 allocated_oids.insert(value.oid);
960 });
961 self.roles.for_values(|_, value| {
962 allocated_oids.insert(value.oid);
963 });
964 self.items.for_values(|_, value| {
965 allocated_oids.insert(value.oid);
966 });
967 self.introspection_sources.for_values(|_, value| {
968 allocated_oids.insert(value.oid);
969 });
970
971 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
972
973 let start_oid: u32 = self
974 .id_allocator
975 .items()
976 .get(&IdAllocKey {
977 name: OID_ALLOC_KEY.to_string(),
978 })
979 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
980 .next_id
981 .try_into()
982 .expect("we should never persist an oid outside of the u32 range");
983 let mut current_oid = UserOid::new(start_oid)
984 .expect("we should never persist an oid outside of user OID range");
985 let mut oids = Vec::new();
986 while oids.len() < u64_to_usize(amount) {
987 if !is_allocated(current_oid.0) {
988 oids.push(current_oid.0);
989 }
990 current_oid += 1;
991
992 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
993 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
995 }
996 }
997
998 let next_id = current_oid.0;
999 let prev = self.id_allocator.set(
1000 IdAllocKey {
1001 name: OID_ALLOC_KEY.to_string(),
1002 },
1003 Some(IdAllocValue {
1004 next_id: next_id.into(),
1005 }),
1006 self.op_id,
1007 )?;
1008 assert_eq!(
1009 prev,
1010 Some(IdAllocValue {
1011 next_id: start_oid.into(),
1012 })
1013 );
1014
1015 Ok(oids)
1016 }
1017
1018 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1021 self.allocate_oids(1, temporary_oids)
1022 .map(|oids| oids.into_element())
1023 }
1024
1025 pub fn current_snapshot(&self) -> Snapshot {
1033 Snapshot {
1034 databases: self.databases.current_items_proto(),
1035 schemas: self.schemas.current_items_proto(),
1036 roles: self.roles.current_items_proto(),
1037 role_auth: self.role_auth.current_items_proto(),
1038 items: self.items.current_items_proto(),
1039 comments: self.comments.current_items_proto(),
1040 clusters: self.clusters.current_items_proto(),
1041 network_policies: self.network_policies.current_items_proto(),
1042 cluster_replicas: self.cluster_replicas.current_items_proto(),
1043 introspection_sources: self.introspection_sources.current_items_proto(),
1044 id_allocator: self.id_allocator.current_items_proto(),
1045 configs: self.configs.current_items_proto(),
1046 settings: self.settings.current_items_proto(),
1047 system_object_mappings: self.system_gid_mapping.current_items_proto(),
1048 system_configurations: self.system_configurations.current_items_proto(),
1049 default_privileges: self.default_privileges.current_items_proto(),
1050 source_references: self.source_references.current_items_proto(),
1051 system_privileges: self.system_privileges.current_items_proto(),
1052 storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1053 unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1054 txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1055 }
1056 }
1057
1058 pub(crate) fn insert_id_allocator(
1059 &mut self,
1060 name: String,
1061 next_id: u64,
1062 ) -> Result<(), CatalogError> {
1063 match self.id_allocator.insert(
1064 IdAllocKey { name: name.clone() },
1065 IdAllocValue { next_id },
1066 self.op_id,
1067 ) {
1068 Ok(_) => Ok(()),
1069 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1070 }
1071 }
1072
1073 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1080 let prev = self
1081 .databases
1082 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1083 if prev.is_some() {
1084 Ok(())
1085 } else {
1086 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1087 }
1088 }
1089
1090 pub fn remove_databases(
1097 &mut self,
1098 databases: &BTreeSet<DatabaseId>,
1099 ) -> Result<(), CatalogError> {
1100 if databases.is_empty() {
1101 return Ok(());
1102 }
1103
1104 let to_remove = databases
1105 .iter()
1106 .map(|id| (DatabaseKey { id: *id }, None))
1107 .collect();
1108 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1109 prev.retain(|_k, val| val.is_none());
1110
1111 if !prev.is_empty() {
1112 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1113 return Err(SqlCatalogError::UnknownDatabase(err).into());
1114 }
1115
1116 Ok(())
1117 }
1118
1119 pub fn remove_schema(
1126 &mut self,
1127 database_id: &Option<DatabaseId>,
1128 schema_id: &SchemaId,
1129 ) -> Result<(), CatalogError> {
1130 let prev = self
1131 .schemas
1132 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1133 if prev.is_some() {
1134 Ok(())
1135 } else {
1136 let database_name = match database_id {
1137 Some(id) => format!("{id}."),
1138 None => "".to_string(),
1139 };
1140 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1141 }
1142 }
1143
1144 pub fn remove_schemas(
1151 &mut self,
1152 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1153 ) -> Result<(), CatalogError> {
1154 if schemas.is_empty() {
1155 return Ok(());
1156 }
1157
1158 let to_remove = schemas
1159 .iter()
1160 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1161 .collect();
1162 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1163 prev.retain(|_k, v| v.is_none());
1164
1165 if !prev.is_empty() {
1166 let err = prev
1167 .keys()
1168 .map(|k| {
1169 let db_spec = schemas.get(&k.id).expect("should_exist");
1170 let db_name = match db_spec {
1171 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1172 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1173 };
1174 format!("{}.{}", db_name, k.id)
1175 })
1176 .join(", ");
1177
1178 return Err(SqlCatalogError::UnknownSchema(err).into());
1179 }
1180
1181 Ok(())
1182 }
1183
1184 pub fn remove_source_references(
1185 &mut self,
1186 source_id: CatalogItemId,
1187 ) -> Result<(), CatalogError> {
1188 let deleted = self
1189 .source_references
1190 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1191 .is_some();
1192 if deleted {
1193 Ok(())
1194 } else {
1195 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1196 }
1197 }
1198
1199 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1206 assert!(
1207 roles.iter().all(|id| id.is_user()),
1208 "cannot delete non-user roles"
1209 );
1210 self.remove_roles(roles)
1211 }
1212
1213 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1220 if roles.is_empty() {
1221 return Ok(());
1222 }
1223
1224 let to_remove_keys = roles
1225 .iter()
1226 .map(|role_id| RoleKey { id: *role_id })
1227 .collect::<Vec<_>>();
1228
1229 let to_remove_roles = to_remove_keys
1230 .iter()
1231 .map(|role_key| (role_key.clone(), None))
1232 .collect();
1233
1234 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1235
1236 let to_remove_role_auth = to_remove_keys
1237 .iter()
1238 .map(|role_key| {
1239 (
1240 RoleAuthKey {
1241 role_id: role_key.id,
1242 },
1243 None,
1244 )
1245 })
1246 .collect();
1247
1248 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1249
1250 prev.retain(|_k, v| v.is_none());
1251 if !prev.is_empty() {
1252 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1253 return Err(SqlCatalogError::UnknownRole(err).into());
1254 }
1255
1256 role_auth_prev.retain(|_k, v| v.is_none());
1257 Ok(())
1261 }
1262
1263 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1270 if clusters.is_empty() {
1271 return Ok(());
1272 }
1273
1274 let to_remove = clusters
1275 .iter()
1276 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1277 .collect();
1278 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1279
1280 prev.retain(|_k, v| v.is_none());
1281 if !prev.is_empty() {
1282 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1283 return Err(SqlCatalogError::UnknownCluster(err).into());
1284 }
1285
1286 self.cluster_replicas
1292 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1293 self.introspection_sources
1294 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1295
1296 Ok(())
1297 }
1298
1299 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1306 let deleted = self
1307 .cluster_replicas
1308 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1309 .is_some();
1310 if deleted {
1311 Ok(())
1312 } else {
1313 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1314 }
1315 }
1316
1317 pub fn remove_cluster_replicas(
1324 &mut self,
1325 replicas: &BTreeSet<ReplicaId>,
1326 ) -> Result<(), CatalogError> {
1327 if replicas.is_empty() {
1328 return Ok(());
1329 }
1330
1331 let to_remove = replicas
1332 .iter()
1333 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1334 .collect();
1335 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1336
1337 prev.retain(|_k, v| v.is_none());
1338 if !prev.is_empty() {
1339 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1340 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1341 }
1342
1343 Ok(())
1344 }
1345
1346 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1353 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1354 if prev.is_some() {
1355 Ok(())
1356 } else {
1357 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1358 }
1359 }
1360
1361 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1368 if ids.is_empty() {
1369 return Ok(());
1370 }
1371
1372 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1373 let n = self.items.delete_by_keys(ks, self.op_id).len();
1374 if n == ids.len() {
1375 Ok(())
1376 } else {
1377 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1378 let mut unknown = ids.difference(&item_ids);
1379 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1380 }
1381 }
1382
1383 pub fn remove_system_object_mappings(
1390 &mut self,
1391 descriptions: BTreeSet<SystemObjectDescription>,
1392 ) -> Result<(), CatalogError> {
1393 if descriptions.is_empty() {
1394 return Ok(());
1395 }
1396
1397 let ks: Vec<_> = descriptions
1398 .clone()
1399 .into_iter()
1400 .map(|desc| GidMappingKey {
1401 schema_name: desc.schema_name,
1402 object_type: desc.object_type,
1403 object_name: desc.object_name,
1404 })
1405 .collect();
1406 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1407
1408 if n == descriptions.len() {
1409 Ok(())
1410 } else {
1411 let item_descriptions = self
1412 .system_gid_mapping
1413 .items()
1414 .keys()
1415 .map(|k| SystemObjectDescription {
1416 schema_name: k.schema_name.clone(),
1417 object_type: k.object_type.clone(),
1418 object_name: k.object_name.clone(),
1419 })
1420 .collect();
1421 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1422 format!(
1423 "{} {}.{}",
1424 desc.object_type, desc.schema_name, desc.object_name
1425 )
1426 });
1427 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1428 }
1429 }
1430
1431 pub fn remove_introspection_source_indexes(
1438 &mut self,
1439 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1440 ) -> Result<(), CatalogError> {
1441 if introspection_source_indexes.is_empty() {
1442 return Ok(());
1443 }
1444
1445 let ks: Vec<_> = introspection_source_indexes
1446 .clone()
1447 .into_iter()
1448 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1449 .collect();
1450 let n = self
1451 .introspection_sources
1452 .delete_by_keys(ks, self.op_id)
1453 .len();
1454 if n == introspection_source_indexes.len() {
1455 Ok(())
1456 } else {
1457 let txn_indexes = self
1458 .introspection_sources
1459 .items()
1460 .keys()
1461 .map(|k| (k.cluster_id, k.name.clone()))
1462 .collect();
1463 let mut unknown = introspection_source_indexes
1464 .difference(&txn_indexes)
1465 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1466 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1467 }
1468 }
1469
1470 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1477 let updated =
1478 self.items
1479 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1480 if updated {
1481 Ok(())
1482 } else {
1483 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1484 }
1485 }
1486
1487 pub fn update_items(
1495 &mut self,
1496 items: BTreeMap<CatalogItemId, Item>,
1497 ) -> Result<(), CatalogError> {
1498 if items.is_empty() {
1499 return Ok(());
1500 }
1501
1502 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1503 let kvs: Vec<_> = items
1504 .clone()
1505 .into_iter()
1506 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1507 .collect();
1508 let n = self.items.update_by_keys(kvs, self.op_id)?;
1509 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1510 if n == update_ids.len() {
1511 Ok(())
1512 } else {
1513 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1514 let mut unknown = update_ids.difference(&item_ids);
1515 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1516 }
1517 }
1518
1519 pub fn update_role(
1527 &mut self,
1528 id: RoleId,
1529 role: Role,
1530 password: PasswordAction,
1531 ) -> Result<(), CatalogError> {
1532 let key = RoleKey { id };
1533 if self.roles.get(&key).is_some() {
1534 let auth_key = RoleAuthKey { role_id: id };
1535
1536 match password {
1537 PasswordAction::Set(new_password) => {
1538 let hash = mz_auth::hash::scram256_hash(
1539 &new_password.password,
1540 &new_password.scram_iterations,
1541 )
1542 .expect("password hash should be valid");
1543 let value = RoleAuthValue {
1544 password_hash: Some(hash),
1545 updated_at: SYSTEM_TIME(),
1546 };
1547
1548 if self.role_auth.get(&auth_key).is_some() {
1549 self.role_auth
1550 .update_by_key(auth_key.clone(), value, self.op_id)?;
1551 } else {
1552 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1553 }
1554 }
1555 PasswordAction::Clear => {
1556 let value = RoleAuthValue {
1557 password_hash: None,
1558 updated_at: SYSTEM_TIME(),
1559 };
1560 if self.role_auth.get(&auth_key).is_some() {
1561 self.role_auth
1562 .update_by_key(auth_key.clone(), value, self.op_id)?;
1563 }
1564 }
1565 PasswordAction::NoChange => {}
1566 }
1567
1568 self.roles
1569 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1570
1571 Ok(())
1572 } else {
1573 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1574 }
1575 }
1576
1577 pub fn update_roles_without_auth(
1588 &mut self,
1589 roles: BTreeMap<RoleId, Role>,
1590 ) -> Result<(), CatalogError> {
1591 if roles.is_empty() {
1592 return Ok(());
1593 }
1594
1595 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1596 let kvs: Vec<_> = roles
1597 .into_iter()
1598 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1599 .collect();
1600 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1601 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1602
1603 if n == update_role_ids.len() {
1604 Ok(())
1605 } else {
1606 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1607 let mut unknown = update_role_ids.difference(&role_ids);
1608 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1609 }
1610 }
1611
1612 pub fn update_system_object_mappings(
1617 &mut self,
1618 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1619 ) -> Result<(), CatalogError> {
1620 if mappings.is_empty() {
1621 return Ok(());
1622 }
1623
1624 let n = self.system_gid_mapping.update(
1625 |_k, v| {
1626 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1627 let (_, new_value) = mapping.clone().into_key_value();
1628 Some(new_value)
1629 } else {
1630 None
1631 }
1632 },
1633 self.op_id,
1634 )?;
1635
1636 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1637 != mappings.len()
1638 {
1639 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1640 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1641 }
1642
1643 Ok(())
1644 }
1645
1646 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1653 let updated = self.clusters.update_by_key(
1654 ClusterKey { id },
1655 cluster.into_key_value().1,
1656 self.op_id,
1657 )?;
1658 if updated {
1659 Ok(())
1660 } else {
1661 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1662 }
1663 }
1664
1665 pub fn update_cluster_replica(
1672 &mut self,
1673 replica_id: ReplicaId,
1674 replica: ClusterReplica,
1675 ) -> Result<(), CatalogError> {
1676 let updated = self.cluster_replicas.update_by_key(
1677 ClusterReplicaKey { id: replica_id },
1678 replica.into_key_value().1,
1679 self.op_id,
1680 )?;
1681 if updated {
1682 Ok(())
1683 } else {
1684 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1685 }
1686 }
1687
1688 pub fn update_database(
1695 &mut self,
1696 id: DatabaseId,
1697 database: Database,
1698 ) -> Result<(), CatalogError> {
1699 let updated = self.databases.update_by_key(
1700 DatabaseKey { id },
1701 database.into_key_value().1,
1702 self.op_id,
1703 )?;
1704 if updated {
1705 Ok(())
1706 } else {
1707 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1708 }
1709 }
1710
1711 pub fn update_schema(
1718 &mut self,
1719 schema_id: SchemaId,
1720 schema: Schema,
1721 ) -> Result<(), CatalogError> {
1722 let updated = self.schemas.update_by_key(
1723 SchemaKey { id: schema_id },
1724 schema.into_key_value().1,
1725 self.op_id,
1726 )?;
1727 if updated {
1728 Ok(())
1729 } else {
1730 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1731 }
1732 }
1733
1734 pub fn update_network_policy(
1741 &mut self,
1742 id: NetworkPolicyId,
1743 network_policy: NetworkPolicy,
1744 ) -> Result<(), CatalogError> {
1745 let updated = self.network_policies.update_by_key(
1746 NetworkPolicyKey { id },
1747 network_policy.into_key_value().1,
1748 self.op_id,
1749 )?;
1750 if updated {
1751 Ok(())
1752 } else {
1753 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1754 }
1755 }
1756 pub fn remove_network_policies(
1763 &mut self,
1764 network_policies: &BTreeSet<NetworkPolicyId>,
1765 ) -> Result<(), CatalogError> {
1766 if network_policies.is_empty() {
1767 return Ok(());
1768 }
1769
1770 let to_remove = network_policies
1771 .iter()
1772 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1773 .collect();
1774 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1775 assert!(
1776 prev.iter().all(|(k, _)| k.id.is_user()),
1777 "cannot delete non-user network policy"
1778 );
1779
1780 prev.retain(|_k, v| v.is_none());
1781 if !prev.is_empty() {
1782 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1783 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1784 }
1785
1786 Ok(())
1787 }
1788 pub fn set_default_privilege(
1792 &mut self,
1793 role_id: RoleId,
1794 database_id: Option<DatabaseId>,
1795 schema_id: Option<SchemaId>,
1796 object_type: ObjectType,
1797 grantee: RoleId,
1798 privileges: Option<AclMode>,
1799 ) -> Result<(), CatalogError> {
1800 self.default_privileges.set(
1801 DefaultPrivilegesKey {
1802 role_id,
1803 database_id,
1804 schema_id,
1805 object_type,
1806 grantee,
1807 },
1808 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1809 self.op_id,
1810 )?;
1811 Ok(())
1812 }
1813
1814 pub fn set_default_privileges(
1816 &mut self,
1817 default_privileges: Vec<DefaultPrivilege>,
1818 ) -> Result<(), CatalogError> {
1819 if default_privileges.is_empty() {
1820 return Ok(());
1821 }
1822
1823 let default_privileges = default_privileges
1824 .into_iter()
1825 .map(DurableType::into_key_value)
1826 .map(|(k, v)| (k, Some(v)))
1827 .collect();
1828 self.default_privileges
1829 .set_many(default_privileges, self.op_id)?;
1830 Ok(())
1831 }
1832
1833 pub fn set_system_privilege(
1837 &mut self,
1838 grantee: RoleId,
1839 grantor: RoleId,
1840 acl_mode: Option<AclMode>,
1841 ) -> Result<(), CatalogError> {
1842 self.system_privileges.set(
1843 SystemPrivilegesKey { grantee, grantor },
1844 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1845 self.op_id,
1846 )?;
1847 Ok(())
1848 }
1849
1850 pub fn set_system_privileges(
1852 &mut self,
1853 system_privileges: Vec<MzAclItem>,
1854 ) -> Result<(), CatalogError> {
1855 if system_privileges.is_empty() {
1856 return Ok(());
1857 }
1858
1859 let system_privileges = system_privileges
1860 .into_iter()
1861 .map(DurableType::into_key_value)
1862 .map(|(k, v)| (k, Some(v)))
1863 .collect();
1864 self.system_privileges
1865 .set_many(system_privileges, self.op_id)?;
1866 Ok(())
1867 }
1868
1869 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1871 self.settings.set(
1872 SettingKey { name },
1873 value.map(|value| SettingValue { value }),
1874 self.op_id,
1875 )?;
1876 Ok(())
1877 }
1878
1879 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1880 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1881 }
1882
1883 pub fn insert_introspection_source_indexes(
1885 &mut self,
1886 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1887 temporary_oids: &HashSet<u32>,
1888 ) -> Result<(), CatalogError> {
1889 if introspection_source_indexes.is_empty() {
1890 return Ok(());
1891 }
1892
1893 let amount = usize_to_u64(introspection_source_indexes.len());
1894 let oids = self.allocate_oids(amount, temporary_oids)?;
1895 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1896 .into_iter()
1897 .zip_eq(oids)
1898 .map(
1899 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1900 cluster_id,
1901 name,
1902 item_id,
1903 index_id,
1904 oid,
1905 },
1906 )
1907 .collect();
1908
1909 for introspection_source_index in introspection_source_indexes {
1910 let (key, value) = introspection_source_index.into_key_value();
1911 self.introspection_sources.insert(key, value, self.op_id)?;
1912 }
1913
1914 Ok(())
1915 }
1916
1917 pub fn set_system_object_mappings(
1919 &mut self,
1920 mappings: Vec<SystemObjectMapping>,
1921 ) -> Result<(), CatalogError> {
1922 if mappings.is_empty() {
1923 return Ok(());
1924 }
1925
1926 let mappings = mappings
1927 .into_iter()
1928 .map(DurableType::into_key_value)
1929 .map(|(k, v)| (k, Some(v)))
1930 .collect();
1931 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1932 Ok(())
1933 }
1934
1935 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1937 if replicas.is_empty() {
1938 return Ok(());
1939 }
1940
1941 let replicas = replicas
1942 .into_iter()
1943 .map(DurableType::into_key_value)
1944 .map(|(k, v)| (k, Some(v)))
1945 .collect();
1946 self.cluster_replicas.set_many(replicas, self.op_id)?;
1947 Ok(())
1948 }
1949
1950 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1952 match value {
1953 Some(value) => {
1954 let config = Config { key, value };
1955 let (key, value) = config.into_key_value();
1956 self.configs.set(key, Some(value), self.op_id)?;
1957 }
1958 None => {
1959 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1960 }
1961 }
1962 Ok(())
1963 }
1964
1965 pub fn get_config(&self, key: String) -> Option<u64> {
1967 self.configs
1968 .get(&ConfigKey { key })
1969 .map(|entry| entry.value)
1970 }
1971
1972 pub fn get_setting(&self, name: String) -> Option<&str> {
1974 self.settings
1975 .get(&SettingKey { name })
1976 .map(|entry| &*entry.value)
1977 }
1978
1979 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1980 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1981 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1982 }
1983
1984 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1985 self.set_setting(
1986 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1987 Some(shard_id.to_string()),
1988 )
1989 }
1990
1991 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1992 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1993 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1994 }
1995
1996 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1997 self.set_setting(
1998 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1999 Some(shard_id.to_string()),
2000 )
2001 }
2002
2003 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2009 self.set_config(
2010 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2011 Some(
2012 value
2013 .as_millis()
2014 .try_into()
2015 .expect("max wait fits into u64"),
2016 ),
2017 )
2018 }
2019
2020 pub fn set_0dt_deployment_ddl_check_interval(
2027 &mut self,
2028 value: Duration,
2029 ) -> Result<(), CatalogError> {
2030 self.set_config(
2031 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2032 Some(
2033 value
2034 .as_millis()
2035 .try_into()
2036 .expect("ddl check interval fits into u64"),
2037 ),
2038 )
2039 }
2040
2041 pub fn set_enable_0dt_deployment_panic_after_timeout(
2047 &mut self,
2048 value: bool,
2049 ) -> Result<(), CatalogError> {
2050 self.set_config(
2051 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2052 Some(u64::from(value)),
2053 )
2054 }
2055
2056 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2062 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2063 }
2064
2065 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2072 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2073 }
2074
2075 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2082 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2083 }
2084
2085 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2087 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2088 }
2089
2090 pub fn update_comment(
2091 &mut self,
2092 object_id: CommentObjectId,
2093 sub_component: Option<usize>,
2094 comment: Option<String>,
2095 ) -> Result<(), CatalogError> {
2096 let key = CommentKey {
2097 object_id,
2098 sub_component,
2099 };
2100 let value = comment.map(|c| CommentValue { comment: c });
2101 self.comments.set(key, value, self.op_id)?;
2102
2103 Ok(())
2104 }
2105
2106 pub fn drop_comments(
2107 &mut self,
2108 object_ids: &BTreeSet<CommentObjectId>,
2109 ) -> Result<(), CatalogError> {
2110 if object_ids.is_empty() {
2111 return Ok(());
2112 }
2113
2114 self.comments
2115 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2116 Ok(())
2117 }
2118
2119 pub fn update_source_references(
2120 &mut self,
2121 source_id: CatalogItemId,
2122 references: Vec<SourceReference>,
2123 updated_at: u64,
2124 ) -> Result<(), CatalogError> {
2125 let key = SourceReferencesKey { source_id };
2126 let value = SourceReferencesValue {
2127 references,
2128 updated_at,
2129 };
2130 self.source_references.set(key, Some(value), self.op_id)?;
2131 Ok(())
2132 }
2133
2134 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2136 let key = ServerConfigurationKey {
2137 name: name.to_string(),
2138 };
2139 let value = ServerConfigurationValue { value };
2140 self.system_configurations
2141 .set(key, Some(value), self.op_id)?;
2142 Ok(())
2143 }
2144
2145 pub fn remove_system_config(&mut self, name: &str) {
2147 let key = ServerConfigurationKey {
2148 name: name.to_string(),
2149 };
2150 self.system_configurations
2151 .set(key, None, self.op_id)
2152 .expect("cannot have uniqueness violation");
2153 }
2154
2155 pub fn clear_system_configs(&mut self) {
2157 self.system_configurations.delete(|_k, _v| true, self.op_id);
2158 }
2159
2160 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2161 match self.configs.insert(
2162 ConfigKey { key: key.clone() },
2163 ConfigValue { value },
2164 self.op_id,
2165 ) {
2166 Ok(_) => Ok(()),
2167 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2168 }
2169 }
2170
2171 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2172 self.clusters
2173 .items()
2174 .into_iter()
2175 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2176 }
2177
2178 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2179 self.cluster_replicas
2180 .items()
2181 .into_iter()
2182 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2183 }
2184
2185 pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2186 self.databases
2187 .items()
2188 .into_iter()
2189 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2190 }
2191
2192 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2193 self.roles
2194 .items()
2195 .into_iter()
2196 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2197 }
2198
2199 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2200 self.network_policies
2201 .items()
2202 .into_iter()
2203 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2204 }
2205
2206 pub fn get_system_object_mappings(
2207 &self,
2208 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2209 self.system_gid_mapping
2210 .items()
2211 .into_iter()
2212 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2213 }
2214
2215 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2216 self.schemas
2217 .items()
2218 .into_iter()
2219 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2220 }
2221
2222 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2223 self.system_configurations
2224 .items()
2225 .into_iter()
2226 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2227 }
2228
2229 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2230 let key = SchemaKey { id: *id };
2231 self.schemas
2232 .get(&key)
2233 .map(|v| DurableType::from_key_value(key, v.clone()))
2234 }
2235
2236 pub fn get_introspection_source_indexes(
2237 &self,
2238 cluster_id: ClusterId,
2239 ) -> BTreeMap<&str, (GlobalId, u32)> {
2240 self.introspection_sources
2241 .items()
2242 .into_iter()
2243 .filter(|(k, _v)| k.cluster_id == cluster_id)
2244 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2245 .collect()
2246 }
2247
2248 pub fn get_catalog_content_version(&self) -> Option<&str> {
2249 self.settings
2250 .get(&SettingKey {
2251 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2252 })
2253 .map(|value| &*value.value)
2254 }
2255
2256 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2257 self.settings
2258 .get(&SettingKey {
2259 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2260 })
2261 .map(|value| value.value.clone())
2262 }
2263
2264 #[must_use]
2270 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2271 let updates = self.get_op_updates();
2272 self.commit_op();
2273 updates
2274 }
2275
2276 fn get_op_updates(&self) -> Vec<StateUpdate> {
2277 fn get_collection_op_updates<'a, T>(
2278 table_txn: &'a TableTransaction<T::Key, T::Value>,
2279 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2280 op: Timestamp,
2281 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2282 where
2283 T::Key: Ord + Eq + Clone + Debug,
2284 T::Value: Ord + Clone + Debug,
2285 T: DurableType,
2286 {
2287 table_txn
2288 .pending
2289 .iter()
2290 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2291 .filter_map(move |(k, v)| {
2292 if v.ts == op {
2293 let key = k.clone();
2294 let value = v.value.clone();
2295 let diff = v.diff.clone().try_into().expect("invalid diff");
2296 let update = DurableType::from_key_value(key, value);
2297 let kind = kind_fn(update);
2298 Some((kind, diff))
2299 } else {
2300 None
2301 }
2302 })
2303 }
2304
2305 fn get_large_collection_op_updates<'a, T>(
2306 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2307 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2308 op: Timestamp,
2309 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2310 where
2311 T::Key: Ord + Eq + Clone + Debug,
2312 T: DurableType<Value = ()>,
2313 {
2314 collection.iter().filter_map(move |(k, diff, ts)| {
2315 if *ts == op {
2316 let key = k.clone();
2317 let diff = diff.clone().try_into().expect("invalid diff");
2318 let update = DurableType::from_key_value(key, ());
2319 let kind = kind_fn(update);
2320 Some((kind, diff))
2321 } else {
2322 None
2323 }
2324 })
2325 }
2326
2327 let Transaction {
2328 durable_catalog: _,
2329 databases,
2330 schemas,
2331 items,
2332 comments,
2333 roles,
2334 role_auth,
2335 clusters,
2336 network_policies,
2337 cluster_replicas,
2338 introspection_sources,
2339 system_gid_mapping,
2340 system_configurations,
2341 default_privileges,
2342 source_references,
2343 system_privileges,
2344 audit_log_updates,
2345 storage_collection_metadata,
2346 unfinalized_shards,
2347 id_allocator: _,
2349 configs: _,
2350 settings: _,
2351 txn_wal_shard: _,
2352 upper,
2353 op_id: _,
2354 } = &self;
2355
2356 let updates = std::iter::empty()
2357 .chain(get_collection_op_updates(
2358 roles,
2359 StateUpdateKind::Role,
2360 self.op_id,
2361 ))
2362 .chain(get_collection_op_updates(
2363 role_auth,
2364 StateUpdateKind::RoleAuth,
2365 self.op_id,
2366 ))
2367 .chain(get_collection_op_updates(
2368 databases,
2369 StateUpdateKind::Database,
2370 self.op_id,
2371 ))
2372 .chain(get_collection_op_updates(
2373 schemas,
2374 StateUpdateKind::Schema,
2375 self.op_id,
2376 ))
2377 .chain(get_collection_op_updates(
2378 default_privileges,
2379 StateUpdateKind::DefaultPrivilege,
2380 self.op_id,
2381 ))
2382 .chain(get_collection_op_updates(
2383 system_privileges,
2384 StateUpdateKind::SystemPrivilege,
2385 self.op_id,
2386 ))
2387 .chain(get_collection_op_updates(
2388 system_configurations,
2389 StateUpdateKind::SystemConfiguration,
2390 self.op_id,
2391 ))
2392 .chain(get_collection_op_updates(
2393 clusters,
2394 StateUpdateKind::Cluster,
2395 self.op_id,
2396 ))
2397 .chain(get_collection_op_updates(
2398 network_policies,
2399 StateUpdateKind::NetworkPolicy,
2400 self.op_id,
2401 ))
2402 .chain(get_collection_op_updates(
2403 introspection_sources,
2404 StateUpdateKind::IntrospectionSourceIndex,
2405 self.op_id,
2406 ))
2407 .chain(get_collection_op_updates(
2408 cluster_replicas,
2409 StateUpdateKind::ClusterReplica,
2410 self.op_id,
2411 ))
2412 .chain(get_collection_op_updates(
2413 system_gid_mapping,
2414 StateUpdateKind::SystemObjectMapping,
2415 self.op_id,
2416 ))
2417 .chain(get_collection_op_updates(
2418 items,
2419 StateUpdateKind::Item,
2420 self.op_id,
2421 ))
2422 .chain(get_collection_op_updates(
2423 comments,
2424 StateUpdateKind::Comment,
2425 self.op_id,
2426 ))
2427 .chain(get_collection_op_updates(
2428 source_references,
2429 StateUpdateKind::SourceReferences,
2430 self.op_id,
2431 ))
2432 .chain(get_collection_op_updates(
2433 storage_collection_metadata,
2434 StateUpdateKind::StorageCollectionMetadata,
2435 self.op_id,
2436 ))
2437 .chain(get_collection_op_updates(
2438 unfinalized_shards,
2439 StateUpdateKind::UnfinalizedShard,
2440 self.op_id,
2441 ))
2442 .chain(get_large_collection_op_updates(
2443 audit_log_updates,
2444 StateUpdateKind::AuditLog,
2445 self.op_id,
2446 ))
2447 .map(|(kind, diff)| StateUpdate {
2448 kind,
2449 ts: upper.clone(),
2450 diff,
2451 })
2452 .collect();
2453
2454 updates
2455 }
2456
2457 pub fn is_savepoint(&self) -> bool {
2458 self.durable_catalog.is_savepoint()
2459 }
2460
2461 fn commit_op(&mut self) {
2462 self.op_id += 1;
2463 }
2464
2465 pub fn op_id(&self) -> Timestamp {
2466 self.op_id
2467 }
2468
2469 pub fn upper(&self) -> mz_repr::Timestamp {
2470 self.upper
2471 }
2472
2473 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2474 let audit_log_updates = self
2475 .audit_log_updates
2476 .into_iter()
2477 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2478 .collect();
2479
2480 let txn_batch = TransactionBatch {
2481 databases: self.databases.pending(),
2482 schemas: self.schemas.pending(),
2483 items: self.items.pending(),
2484 comments: self.comments.pending(),
2485 roles: self.roles.pending(),
2486 role_auth: self.role_auth.pending(),
2487 clusters: self.clusters.pending(),
2488 cluster_replicas: self.cluster_replicas.pending(),
2489 network_policies: self.network_policies.pending(),
2490 introspection_sources: self.introspection_sources.pending(),
2491 id_allocator: self.id_allocator.pending(),
2492 configs: self.configs.pending(),
2493 source_references: self.source_references.pending(),
2494 settings: self.settings.pending(),
2495 system_gid_mapping: self.system_gid_mapping.pending(),
2496 system_configurations: self.system_configurations.pending(),
2497 default_privileges: self.default_privileges.pending(),
2498 system_privileges: self.system_privileges.pending(),
2499 storage_collection_metadata: self.storage_collection_metadata.pending(),
2500 unfinalized_shards: self.unfinalized_shards.pending(),
2501 txn_wal_shard: self.txn_wal_shard.pending(),
2502 audit_log_updates,
2503 upper: self.upper,
2504 };
2505 (txn_batch, self.durable_catalog)
2506 }
2507
2508 #[mz_ore::instrument(level = "debug")]
2520 pub(crate) async fn commit_internal(
2521 self,
2522 commit_ts: mz_repr::Timestamp,
2523 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2524 let (mut txn_batch, durable_catalog) = self.into_parts();
2525 let TransactionBatch {
2526 databases,
2527 schemas,
2528 items,
2529 comments,
2530 roles,
2531 role_auth,
2532 clusters,
2533 cluster_replicas,
2534 network_policies,
2535 introspection_sources,
2536 id_allocator,
2537 configs,
2538 source_references,
2539 settings,
2540 system_gid_mapping,
2541 system_configurations,
2542 default_privileges,
2543 system_privileges,
2544 storage_collection_metadata,
2545 unfinalized_shards,
2546 txn_wal_shard,
2547 audit_log_updates,
2548 upper,
2549 } = &mut txn_batch;
2550 differential_dataflow::consolidation::consolidate_updates(databases);
2553 differential_dataflow::consolidation::consolidate_updates(schemas);
2554 differential_dataflow::consolidation::consolidate_updates(items);
2555 differential_dataflow::consolidation::consolidate_updates(comments);
2556 differential_dataflow::consolidation::consolidate_updates(roles);
2557 differential_dataflow::consolidation::consolidate_updates(role_auth);
2558 differential_dataflow::consolidation::consolidate_updates(clusters);
2559 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2560 differential_dataflow::consolidation::consolidate_updates(network_policies);
2561 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2562 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2563 differential_dataflow::consolidation::consolidate_updates(configs);
2564 differential_dataflow::consolidation::consolidate_updates(settings);
2565 differential_dataflow::consolidation::consolidate_updates(source_references);
2566 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2567 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2568 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2569 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2570 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2571 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2572 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2573 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2574
2575 assert!(
2576 commit_ts >= *upper,
2577 "expected commit ts, {}, to be greater than or equal to upper, {}",
2578 commit_ts,
2579 upper
2580 );
2581 let upper = durable_catalog
2582 .commit_transaction(txn_batch, commit_ts)
2583 .await?;
2584 Ok((durable_catalog, upper))
2585 }
2586
2587 #[mz_ore::instrument(level = "debug")]
2604 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2605 let op_updates = self.get_op_updates();
2606 assert!(
2607 op_updates.is_empty(),
2608 "unconsumed transaction updates: {op_updates:?}"
2609 );
2610
2611 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2612 let updates = durable_storage.sync_updates(upper).await?;
2614 soft_assert_no_log!(
2619 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2620 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2621 );
2622 Ok(())
2623 }
2624}
2625
2626use crate::durable::async_trait;
2627
2628use super::objects::{RoleAuthKey, RoleAuthValue};
2629
2630#[async_trait]
2631impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2632 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2633 self.storage_collection_metadata
2634 .items()
2635 .into_iter()
2636 .map(
2637 |(
2638 StorageCollectionMetadataKey { id },
2639 StorageCollectionMetadataValue { shard },
2640 )| { (*id, shard.clone()) },
2641 )
2642 .collect()
2643 }
2644
2645 fn insert_collection_metadata(
2646 &mut self,
2647 metadata: BTreeMap<GlobalId, ShardId>,
2648 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2649 for (id, shard) in metadata {
2650 self.storage_collection_metadata
2651 .insert(
2652 StorageCollectionMetadataKey { id },
2653 StorageCollectionMetadataValue {
2654 shard: shard.clone(),
2655 },
2656 self.op_id,
2657 )
2658 .map_err(|err| match err {
2659 DurableCatalogError::DuplicateKey => {
2660 StorageError::CollectionMetadataAlreadyExists(id)
2661 }
2662 DurableCatalogError::UniquenessViolation => {
2663 StorageError::PersistShardAlreadyInUse(shard)
2664 }
2665 err => StorageError::Generic(anyhow::anyhow!(err)),
2666 })?;
2667 }
2668 Ok(())
2669 }
2670
2671 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2672 let ks: Vec<_> = ids
2673 .into_iter()
2674 .map(|id| StorageCollectionMetadataKey { id })
2675 .collect();
2676 self.storage_collection_metadata
2677 .delete_by_keys(ks, self.op_id)
2678 .into_iter()
2679 .map(
2680 |(
2681 StorageCollectionMetadataKey { id },
2682 StorageCollectionMetadataValue { shard },
2683 )| (id, shard),
2684 )
2685 .collect()
2686 }
2687
2688 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2689 self.unfinalized_shards
2690 .items()
2691 .into_iter()
2692 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2693 .collect()
2694 }
2695
2696 fn insert_unfinalized_shards(
2697 &mut self,
2698 s: BTreeSet<ShardId>,
2699 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2700 for shard in s {
2701 match self
2702 .unfinalized_shards
2703 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2704 {
2705 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2707 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2708 };
2709 }
2710 Ok(())
2711 }
2712
2713 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2714 let ks: Vec<_> = shards
2715 .into_iter()
2716 .map(|shard| UnfinalizedShardKey { shard })
2717 .collect();
2718 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2719 }
2720
2721 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2722 self.txn_wal_shard
2723 .values()
2724 .iter()
2725 .next()
2726 .map(|TxnWalShardValue { shard }| *shard)
2727 }
2728
2729 fn write_txn_wal_shard(
2730 &mut self,
2731 shard: ShardId,
2732 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2733 self.txn_wal_shard
2734 .insert((), TxnWalShardValue { shard }, self.op_id)
2735 .map_err(|err| match err {
2736 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2737 err => StorageError::Generic(anyhow::anyhow!(err)),
2738 })
2739 }
2740}
2741
2742#[derive(Debug, Clone, Default, PartialEq)]
2744pub struct TransactionBatch {
2745 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2746 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2747 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2748 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2749 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2750 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2751 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2752 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2753 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2754 pub(crate) introspection_sources: Vec<(
2755 proto::ClusterIntrospectionSourceIndexKey,
2756 proto::ClusterIntrospectionSourceIndexValue,
2757 Diff,
2758 )>,
2759 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2760 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2761 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2762 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2763 pub(crate) system_configurations: Vec<(
2764 proto::ServerConfigurationKey,
2765 proto::ServerConfigurationValue,
2766 Diff,
2767 )>,
2768 pub(crate) default_privileges: Vec<(
2769 proto::DefaultPrivilegesKey,
2770 proto::DefaultPrivilegesValue,
2771 Diff,
2772 )>,
2773 pub(crate) source_references: Vec<(
2774 proto::SourceReferencesKey,
2775 proto::SourceReferencesValue,
2776 Diff,
2777 )>,
2778 pub(crate) system_privileges: Vec<(
2779 proto::SystemPrivilegesKey,
2780 proto::SystemPrivilegesValue,
2781 Diff,
2782 )>,
2783 pub(crate) storage_collection_metadata: Vec<(
2784 proto::StorageCollectionMetadataKey,
2785 proto::StorageCollectionMetadataValue,
2786 Diff,
2787 )>,
2788 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2789 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2790 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2791 pub(crate) upper: mz_repr::Timestamp,
2793}
2794
2795impl TransactionBatch {
2796 pub fn is_empty(&self) -> bool {
2797 let TransactionBatch {
2798 databases,
2799 schemas,
2800 items,
2801 comments,
2802 roles,
2803 role_auth,
2804 clusters,
2805 cluster_replicas,
2806 network_policies,
2807 introspection_sources,
2808 id_allocator,
2809 configs,
2810 settings,
2811 source_references,
2812 system_gid_mapping,
2813 system_configurations,
2814 default_privileges,
2815 system_privileges,
2816 storage_collection_metadata,
2817 unfinalized_shards,
2818 txn_wal_shard,
2819 audit_log_updates,
2820 upper: _,
2821 } = self;
2822 databases.is_empty()
2823 && schemas.is_empty()
2824 && items.is_empty()
2825 && comments.is_empty()
2826 && roles.is_empty()
2827 && role_auth.is_empty()
2828 && clusters.is_empty()
2829 && cluster_replicas.is_empty()
2830 && network_policies.is_empty()
2831 && introspection_sources.is_empty()
2832 && id_allocator.is_empty()
2833 && configs.is_empty()
2834 && settings.is_empty()
2835 && source_references.is_empty()
2836 && system_gid_mapping.is_empty()
2837 && system_configurations.is_empty()
2838 && default_privileges.is_empty()
2839 && system_privileges.is_empty()
2840 && storage_collection_metadata.is_empty()
2841 && unfinalized_shards.is_empty()
2842 && txn_wal_shard.is_empty()
2843 && audit_log_updates.is_empty()
2844 }
2845}
2846
2847#[derive(Debug, Clone, PartialEq, Eq)]
2848struct TransactionUpdate<V> {
2849 value: V,
2850 ts: Timestamp,
2851 diff: Diff,
2852}
2853
2854trait UniqueName {
2856 const HAS_UNIQUE_NAME: bool;
2859 fn unique_name(&self) -> &str;
2861}
2862
2863mod unique_name {
2864 use crate::durable::objects::*;
2865
2866 macro_rules! impl_unique_name {
2867 ($($t:ty),* $(,)?) => {
2868 $(
2869 impl crate::durable::transaction::UniqueName for $t {
2870 const HAS_UNIQUE_NAME: bool = true;
2871 fn unique_name(&self) -> &str {
2872 &self.name
2873 }
2874 }
2875 )*
2876 };
2877 }
2878
2879 macro_rules! impl_no_unique_name {
2880 ($($t:ty),* $(,)?) => {
2881 $(
2882 impl crate::durable::transaction::UniqueName for $t {
2883 const HAS_UNIQUE_NAME: bool = false;
2884 fn unique_name(&self) -> &str {
2885 ""
2886 }
2887 }
2888 )*
2889 };
2890 }
2891
2892 impl_unique_name! {
2893 ClusterReplicaValue,
2894 ClusterValue,
2895 DatabaseValue,
2896 ItemValue,
2897 NetworkPolicyValue,
2898 RoleValue,
2899 SchemaValue,
2900 }
2901
2902 impl_no_unique_name!(
2903 (),
2904 ClusterIntrospectionSourceIndexValue,
2905 CommentValue,
2906 ConfigValue,
2907 DefaultPrivilegesValue,
2908 GidMappingValue,
2909 IdAllocValue,
2910 ServerConfigurationValue,
2911 SettingValue,
2912 SourceReferencesValue,
2913 StorageCollectionMetadataValue,
2914 SystemPrivilegesValue,
2915 TxnWalShardValue,
2916 RoleAuthValue,
2917 );
2918
2919 #[cfg(test)]
2920 mod test {
2921 impl_no_unique_name!(String,);
2922 }
2923}
2924
2925#[derive(Debug)]
2935struct TableTransaction<K, V> {
2936 initial: BTreeMap<K, V>,
2937 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2940 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2941}
2942
2943impl<K, V> TableTransaction<K, V>
2944where
2945 K: Ord + Eq + Clone + Debug,
2946 V: Ord + Clone + Debug + UniqueName,
2947{
2948 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2955 where
2956 K: RustType<KP>,
2957 V: RustType<VP>,
2958 {
2959 let initial = initial
2960 .into_iter()
2961 .map(RustType::from_proto)
2962 .collect::<Result<_, _>>()?;
2963
2964 Ok(Self {
2965 initial,
2966 pending: BTreeMap::new(),
2967 uniqueness_violation: None,
2968 })
2969 }
2970
2971 fn new_with_uniqueness_fn<KP, VP>(
2974 initial: BTreeMap<KP, VP>,
2975 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2976 ) -> Result<Self, TryFromProtoError>
2977 where
2978 K: RustType<KP>,
2979 V: RustType<VP>,
2980 {
2981 let initial = initial
2982 .into_iter()
2983 .map(RustType::from_proto)
2984 .collect::<Result<_, _>>()?;
2985
2986 Ok(Self {
2987 initial,
2988 pending: BTreeMap::new(),
2989 uniqueness_violation: Some(uniqueness_violation),
2990 })
2991 }
2992
2993 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2996 where
2997 K: RustType<KP>,
2998 V: RustType<VP>,
2999 {
3000 soft_assert_no_log!(self.verify().is_ok());
3001 self.pending
3004 .into_iter()
3005 .flat_map(|(k, v)| {
3006 let mut v: Vec<_> = v
3007 .into_iter()
3008 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3009 .collect();
3010 differential_dataflow::consolidation::consolidate(&mut v);
3011 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3012 })
3013 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3014 .collect()
3015 }
3016
3017 fn verify(&self) -> Result<(), DurableCatalogError> {
3022 if let Some(uniqueness_violation) = self.uniqueness_violation {
3023 let items = self.values();
3025 if V::HAS_UNIQUE_NAME {
3026 let by_name: BTreeMap<_, _> = items
3027 .iter()
3028 .enumerate()
3029 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3030 .collect();
3031 for (i, vi) in items.iter().enumerate() {
3032 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3033 if i != *j && uniqueness_violation(vi, *vj) {
3034 return Err(DurableCatalogError::UniquenessViolation);
3035 }
3036 }
3037 }
3038 } else {
3039 for (i, vi) in items.iter().enumerate() {
3040 for (j, vj) in items.iter().enumerate() {
3041 if i != j && uniqueness_violation(vi, vj) {
3042 return Err(DurableCatalogError::UniquenessViolation);
3043 }
3044 }
3045 }
3046 }
3047 }
3048 soft_assert_no_log!(
3049 self.pending
3050 .values()
3051 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3052 "pending should be sorted by timestamp: {:?}",
3053 self.pending
3054 );
3055 Ok(())
3056 }
3057
3058 fn verify_keys<'a>(
3063 &self,
3064 keys: impl IntoIterator<Item = &'a K>,
3065 ) -> Result<(), DurableCatalogError>
3066 where
3067 K: 'a,
3068 {
3069 if let Some(uniqueness_violation) = self.uniqueness_violation {
3070 let entries: Vec<_> = keys
3071 .into_iter()
3072 .filter_map(|key| self.get(key).map(|value| (key, value)))
3073 .collect();
3074 for (ki, vi) in self.items() {
3076 for (kj, vj) in &entries {
3077 if ki != *kj && uniqueness_violation(vi, vj) {
3078 return Err(DurableCatalogError::UniquenessViolation);
3079 }
3080 }
3081 }
3082 }
3083 soft_assert_no_log!(self.verify().is_ok());
3084 Ok(())
3085 }
3086
3087 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3090 let mut seen = BTreeSet::new();
3091 for k in self.pending.keys() {
3092 seen.insert(k);
3093 let v = self.get(k);
3094 if let Some(v) = v {
3097 f(k, v);
3098 }
3099 }
3100 for (k, v) in self.initial.iter() {
3101 if !seen.contains(k) {
3103 f(k, v);
3104 }
3105 }
3106 }
3107
3108 fn get(&self, k: &K) -> Option<&V> {
3110 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3111 let mut updates = Vec::with_capacity(pending.len() + 1);
3112 if let Some(initial) = self.initial.get(k) {
3113 updates.push((initial, Diff::ONE));
3114 }
3115 updates.extend(
3116 pending
3117 .into_iter()
3118 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3119 );
3120
3121 differential_dataflow::consolidation::consolidate(&mut updates);
3122 assert!(updates.len() <= 1);
3123 updates.into_iter().next().map(|(v, _)| v)
3124 }
3125
3126 #[cfg(test)]
3131 fn items_cloned(&self) -> BTreeMap<K, V> {
3132 let mut items = BTreeMap::new();
3133 self.for_values(|k, v| {
3134 items.insert(k.clone(), v.clone());
3135 });
3136 items
3137 }
3138
3139 fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3143 where
3144 K: RustType<KP>,
3145 V: RustType<VP>,
3146 KP: Ord,
3147 {
3148 let mut items = BTreeMap::new();
3149 self.for_values(|k, v| {
3150 items.insert(k.into_proto(), v.into_proto());
3151 });
3152 items
3153 }
3154
3155 fn items(&self) -> BTreeMap<&K, &V> {
3158 let mut items = BTreeMap::new();
3159 self.for_values(|k, v| {
3160 items.insert(k, v);
3161 });
3162 items
3163 }
3164
3165 fn values(&self) -> BTreeSet<&V> {
3167 let mut items = BTreeSet::new();
3168 self.for_values(|_, v| {
3169 items.insert(v);
3170 });
3171 items
3172 }
3173
3174 fn len(&self) -> usize {
3176 let mut count = 0;
3177 self.for_values(|_, _| {
3178 count += 1;
3179 });
3180 count
3181 }
3182
3183 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3187 &mut self,
3188 mut f: F,
3189 ) {
3190 let mut pending = BTreeMap::new();
3191 self.for_values(|k, v| f(&mut pending, k, v));
3192 for (k, updates) in pending {
3193 self.pending.entry(k).or_default().extend(updates);
3194 }
3195 }
3196
3197 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3201 let mut violation = None;
3202 self.for_values(|for_k, for_v| {
3203 if &k == for_k {
3204 violation = Some(DurableCatalogError::DuplicateKey);
3205 }
3206 if let Some(uniqueness_violation) = self.uniqueness_violation {
3207 if uniqueness_violation(for_v, &v) {
3208 violation = Some(DurableCatalogError::UniquenessViolation);
3209 }
3210 }
3211 });
3212 if let Some(violation) = violation {
3213 return Err(violation);
3214 }
3215 self.pending.entry(k).or_default().push(TransactionUpdate {
3216 value: v,
3217 ts,
3218 diff: Diff::ONE,
3219 });
3220 soft_assert_no_log!(self.verify().is_ok());
3221 Ok(())
3222 }
3223
3224 fn update<F: Fn(&K, &V) -> Option<V>>(
3233 &mut self,
3234 f: F,
3235 ts: Timestamp,
3236 ) -> Result<Diff, DurableCatalogError> {
3237 let mut changed = Diff::ZERO;
3238 let mut keys = BTreeSet::new();
3239 let pending = self.pending.clone();
3241 self.for_values_mut(|p, k, v| {
3242 if let Some(next) = f(k, v) {
3243 changed += Diff::ONE;
3244 keys.insert(k.clone());
3245 let updates = p.entry(k.clone()).or_default();
3246 updates.push(TransactionUpdate {
3247 value: v.clone(),
3248 ts,
3249 diff: Diff::MINUS_ONE,
3250 });
3251 updates.push(TransactionUpdate {
3252 value: next,
3253 ts,
3254 diff: Diff::ONE,
3255 });
3256 }
3257 });
3258 if let Err(err) = self.verify_keys(&keys) {
3260 self.pending = pending;
3261 Err(err)
3262 } else {
3263 Ok(changed)
3264 }
3265 }
3266
3267 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3272 if let Some(cur_v) = self.get(&k) {
3273 if v != *cur_v {
3274 self.set(k, Some(v), ts)?;
3275 }
3276 Ok(true)
3277 } else {
3278 Ok(false)
3279 }
3280 }
3281
3282 fn update_by_keys(
3287 &mut self,
3288 kvs: impl IntoIterator<Item = (K, V)>,
3289 ts: Timestamp,
3290 ) -> Result<Diff, DurableCatalogError> {
3291 let kvs: Vec<_> = kvs
3292 .into_iter()
3293 .filter_map(|(k, v)| match self.get(&k) {
3294 Some(cur_v) => Some((*cur_v == v, k, v)),
3296 None => None,
3297 })
3298 .collect();
3299 let changed = kvs.len();
3300 let changed =
3301 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3302 let kvs = kvs
3303 .into_iter()
3304 .filter(|(no_op, _, _)| !no_op)
3306 .map(|(_, k, v)| (k, Some(v)))
3307 .collect();
3308 self.set_many(kvs, ts)?;
3309 Ok(changed)
3310 }
3311
3312 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3319 let prev = self.get(&k).cloned();
3320 let entry = self.pending.entry(k.clone()).or_default();
3321 let restore_len = entry.len();
3322
3323 match (v, prev.clone()) {
3324 (Some(v), Some(prev)) => {
3325 entry.push(TransactionUpdate {
3326 value: prev,
3327 ts,
3328 diff: Diff::MINUS_ONE,
3329 });
3330 entry.push(TransactionUpdate {
3331 value: v,
3332 ts,
3333 diff: Diff::ONE,
3334 });
3335 }
3336 (Some(v), None) => {
3337 entry.push(TransactionUpdate {
3338 value: v,
3339 ts,
3340 diff: Diff::ONE,
3341 });
3342 }
3343 (None, Some(prev)) => {
3344 entry.push(TransactionUpdate {
3345 value: prev,
3346 ts,
3347 diff: Diff::MINUS_ONE,
3348 });
3349 }
3350 (None, None) => {}
3351 }
3352
3353 if let Err(err) = self.verify_keys([&k]) {
3355 let pending = self.pending.get_mut(&k).expect("inserted above");
3358 pending.truncate(restore_len);
3359 Err(err)
3360 } else {
3361 Ok(prev)
3362 }
3363 }
3364
3365 fn set_many(
3370 &mut self,
3371 kvs: BTreeMap<K, Option<V>>,
3372 ts: Timestamp,
3373 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3374 if kvs.is_empty() {
3375 return Ok(BTreeMap::new());
3376 }
3377
3378 let mut prevs = BTreeMap::new();
3379 let mut restores = BTreeMap::new();
3380
3381 for (k, v) in kvs {
3382 let prev = self.get(&k).cloned();
3383 let entry = self.pending.entry(k.clone()).or_default();
3384 restores.insert(k.clone(), entry.len());
3385
3386 match (v, prev.clone()) {
3387 (Some(v), Some(prev)) => {
3388 entry.push(TransactionUpdate {
3389 value: prev,
3390 ts,
3391 diff: Diff::MINUS_ONE,
3392 });
3393 entry.push(TransactionUpdate {
3394 value: v,
3395 ts,
3396 diff: Diff::ONE,
3397 });
3398 }
3399 (Some(v), None) => {
3400 entry.push(TransactionUpdate {
3401 value: v,
3402 ts,
3403 diff: Diff::ONE,
3404 });
3405 }
3406 (None, Some(prev)) => {
3407 entry.push(TransactionUpdate {
3408 value: prev,
3409 ts,
3410 diff: Diff::MINUS_ONE,
3411 });
3412 }
3413 (None, None) => {}
3414 }
3415
3416 prevs.insert(k, prev);
3417 }
3418
3419 if let Err(err) = self.verify_keys(prevs.keys()) {
3421 for (k, restore_len) in restores {
3422 let pending = self.pending.get_mut(&k).expect("inserted above");
3425 pending.truncate(restore_len);
3426 }
3427 Err(err)
3428 } else {
3429 Ok(prevs)
3430 }
3431 }
3432
3433 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3439 let mut deleted = Vec::new();
3440 self.for_values_mut(|p, k, v| {
3441 if f(k, v) {
3442 deleted.push((k.clone(), v.clone()));
3443 p.entry(k.clone()).or_default().push(TransactionUpdate {
3444 value: v.clone(),
3445 ts,
3446 diff: Diff::MINUS_ONE,
3447 });
3448 }
3449 });
3450 soft_assert_no_log!(self.verify().is_ok());
3451 deleted
3452 }
3453
3454 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3458 self.set(k, None, ts)
3459 .expect("deleting an entry cannot violate uniqueness")
3460 }
3461
3462 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3466 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3467 let prevs = self
3468 .set_many(kvs, ts)
3469 .expect("deleting entries cannot violate uniqueness");
3470 prevs
3471 .into_iter()
3472 .filter_map(|(k, v)| v.map(|v| (k, v)))
3473 .collect()
3474 }
3475}
3476
3477#[cfg(test)]
3478#[allow(clippy::unwrap_used)]
3479mod tests {
3480 use super::*;
3481
3482 use mz_ore::now::SYSTEM_TIME;
3483 use mz_ore::{assert_none, assert_ok};
3484 use mz_persist_client::cache::PersistClientCache;
3485 use mz_persist_types::PersistLocation;
3486 use semver::Version;
3487
3488 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3489 use crate::memory;
3490
3491 #[mz_ore::test]
3492 fn test_table_transaction_simple() {
3493 fn uniqueness_violation(a: &String, b: &String) -> bool {
3494 a == b
3495 }
3496 let mut table = TableTransaction::new_with_uniqueness_fn(
3497 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3498 uniqueness_violation,
3499 )
3500 .unwrap();
3501
3502 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3505 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3506 assert!(
3507 table
3508 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3509 .is_err()
3510 );
3511 assert!(
3512 table
3513 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3514 .is_err()
3515 );
3516 }
3517
3518 #[mz_ore::test]
3519 fn test_table_transaction() {
3520 fn uniqueness_violation(a: &String, b: &String) -> bool {
3521 a == b
3522 }
3523 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3524
3525 fn commit(
3526 table: &mut BTreeMap<Vec<u8>, String>,
3527 mut pending: Vec<(Vec<u8>, String, Diff)>,
3528 ) {
3529 pending.sort_by(|a, b| a.2.cmp(&b.2));
3531 for (k, v, diff) in pending {
3532 if diff == Diff::MINUS_ONE {
3533 let prev = table.remove(&k);
3534 assert_eq!(prev, Some(v));
3535 } else if diff == Diff::ONE {
3536 let prev = table.insert(k, v);
3537 assert_eq!(prev, None);
3538 } else {
3539 panic!("unexpected diff: {diff}");
3540 }
3541 }
3542 }
3543
3544 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3545 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3546 let mut table_txn =
3547 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3548 assert_eq!(table_txn.items_cloned(), table);
3549 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3550 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3551 assert_eq!(
3552 table_txn.items_cloned(),
3553 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3554 );
3555 assert_eq!(
3556 table_txn
3557 .update(|_k, _v| Some("v3".to_string()), 2)
3558 .unwrap(),
3559 Diff::ONE
3560 );
3561
3562 table_txn
3564 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3565 .unwrap_err();
3566
3567 table_txn
3568 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3569 .unwrap();
3570 assert_eq!(
3571 table_txn.items_cloned(),
3572 BTreeMap::from([
3573 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3574 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3575 ])
3576 );
3577 let err = table_txn
3578 .update(|_k, _v| Some("v1".to_string()), 5)
3579 .unwrap_err();
3580 assert!(
3581 matches!(err, DurableCatalogError::UniquenessViolation),
3582 "unexpected err: {err:?}"
3583 );
3584 let pending = table_txn.pending();
3585 assert_eq!(
3586 pending,
3587 vec![
3588 (
3589 1i64.to_le_bytes().to_vec(),
3590 "v1".to_string(),
3591 Diff::MINUS_ONE
3592 ),
3593 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3594 (
3595 2i64.to_le_bytes().to_vec(),
3596 "v2".to_string(),
3597 Diff::MINUS_ONE
3598 ),
3599 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3600 ]
3601 );
3602 commit(&mut table, pending);
3603 assert_eq!(
3604 table,
3605 BTreeMap::from([
3606 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3607 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3608 ])
3609 );
3610
3611 let mut table_txn =
3612 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3613 assert_eq!(
3615 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3616 1
3617 );
3618 table_txn
3619 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3620 .unwrap();
3621 table_txn
3623 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3624 .unwrap_err();
3625 table_txn
3627 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3628 .unwrap_err();
3629 assert_eq!(
3630 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3631 1
3632 );
3633 table_txn
3635 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3636 .unwrap();
3637 table_txn
3638 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3639 .unwrap();
3640 let pending = table_txn.pending();
3641 assert_eq!(
3642 pending,
3643 vec![
3644 (
3645 1i64.to_le_bytes().to_vec(),
3646 "v3".to_string(),
3647 Diff::MINUS_ONE
3648 ),
3649 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3650 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3651 ]
3652 );
3653 commit(&mut table, pending);
3654 assert_eq!(
3655 table,
3656 BTreeMap::from([
3657 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3658 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3659 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3660 ])
3661 );
3662
3663 let mut table_txn =
3664 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3665 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3666 table_txn
3667 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3668 .unwrap();
3669
3670 commit(&mut table, table_txn.pending());
3671 assert_eq!(
3672 table,
3673 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3674 );
3675
3676 let mut table_txn =
3677 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3678 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3679 table_txn
3680 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3681 .unwrap();
3682 commit(&mut table, table_txn.pending());
3683 assert_eq!(
3684 table,
3685 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3686 );
3687
3688 let mut table_txn =
3690 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3691 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3692 table_txn
3693 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3694 .unwrap();
3695 table_txn
3696 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3697 .unwrap_err();
3698 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3699 table_txn
3700 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3701 .unwrap();
3702 commit(&mut table, table_txn.pending());
3703 assert_eq!(
3704 table.clone().into_iter().collect::<Vec<_>>(),
3705 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3706 );
3707
3708 let mut table_txn =
3710 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3711 table_txn
3713 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3714 .unwrap_err();
3715 table_txn
3716 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3717 .unwrap();
3718 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3719 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3720 let pending = table_txn.pending();
3721 assert_eq!(
3722 pending,
3723 vec![
3724 (
3725 1i64.to_le_bytes().to_vec(),
3726 "v5".to_string(),
3727 Diff::MINUS_ONE
3728 ),
3729 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3730 ]
3731 );
3732 commit(&mut table, pending);
3733 assert_eq!(
3734 table,
3735 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3736 );
3737
3738 let mut table_txn =
3740 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3741 table_txn
3742 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3743 .unwrap();
3744 let pending = table_txn.pending::<Vec<u8>, String>();
3745 assert!(pending.is_empty());
3746
3747 let mut table_txn =
3749 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3750 table_txn
3752 .set_many(
3753 BTreeMap::from([
3754 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3755 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3756 ]),
3757 0,
3758 )
3759 .unwrap_err();
3760 table_txn
3761 .set_many(
3762 BTreeMap::from([
3763 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3764 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3765 ]),
3766 1,
3767 )
3768 .unwrap();
3769 table_txn
3770 .set_many(
3771 BTreeMap::from([
3772 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3773 (3i64.to_le_bytes().to_vec(), None),
3774 ]),
3775 2,
3776 )
3777 .unwrap();
3778 let pending = table_txn.pending();
3779 assert_eq!(
3780 pending,
3781 vec![
3782 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3783 (
3784 3i64.to_le_bytes().to_vec(),
3785 "v6".to_string(),
3786 Diff::MINUS_ONE
3787 ),
3788 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3789 ]
3790 );
3791 commit(&mut table, pending);
3792 assert_eq!(
3793 table,
3794 BTreeMap::from([
3795 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3796 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3797 ])
3798 );
3799
3800 let mut table_txn =
3802 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3803 table_txn
3804 .set_many(
3805 BTreeMap::from([
3806 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3807 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3808 ]),
3809 0,
3810 )
3811 .unwrap();
3812 let pending = table_txn.pending::<Vec<u8>, String>();
3813 assert!(pending.is_empty());
3814 commit(&mut table, pending);
3815 assert_eq!(
3816 table,
3817 BTreeMap::from([
3818 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3819 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3820 ])
3821 );
3822
3823 let mut table_txn =
3825 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3826 table_txn
3828 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3829 .unwrap_err();
3830 assert!(
3831 table_txn
3832 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3833 .unwrap()
3834 );
3835 assert!(
3836 !table_txn
3837 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3838 .unwrap()
3839 );
3840 let pending = table_txn.pending();
3841 assert_eq!(
3842 pending,
3843 vec![
3844 (
3845 1i64.to_le_bytes().to_vec(),
3846 "v6".to_string(),
3847 Diff::MINUS_ONE
3848 ),
3849 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3850 ]
3851 );
3852 commit(&mut table, pending);
3853 assert_eq!(
3854 table,
3855 BTreeMap::from([
3856 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3857 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3858 ])
3859 );
3860
3861 let mut table_txn =
3863 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3864 assert!(
3865 table_txn
3866 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3867 .unwrap()
3868 );
3869 let pending = table_txn.pending::<Vec<u8>, String>();
3870 assert!(pending.is_empty());
3871 commit(&mut table, pending);
3872 assert_eq!(
3873 table,
3874 BTreeMap::from([
3875 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3876 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3877 ])
3878 );
3879
3880 let mut table_txn =
3882 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3883 table_txn
3885 .update_by_keys(
3886 [
3887 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3888 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3889 ],
3890 0,
3891 )
3892 .unwrap_err();
3893 let n = table_txn
3894 .update_by_keys(
3895 [
3896 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3897 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3898 ],
3899 1,
3900 )
3901 .unwrap();
3902 assert_eq!(n, Diff::ONE);
3903 let n = table_txn
3904 .update_by_keys(
3905 [
3906 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3907 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3908 ],
3909 2,
3910 )
3911 .unwrap();
3912 assert_eq!(n, Diff::ZERO);
3913 let pending = table_txn.pending();
3914 assert_eq!(
3915 pending,
3916 vec![
3917 (
3918 1i64.to_le_bytes().to_vec(),
3919 "v8".to_string(),
3920 Diff::MINUS_ONE
3921 ),
3922 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3923 ]
3924 );
3925 commit(&mut table, pending);
3926 assert_eq!(
3927 table,
3928 BTreeMap::from([
3929 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3930 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3931 ])
3932 );
3933
3934 let mut table_txn =
3936 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3937 let n = table_txn
3938 .update_by_keys(
3939 [
3940 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3941 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3942 ],
3943 0,
3944 )
3945 .unwrap();
3946 assert_eq!(n, Diff::from(2));
3947 let pending = table_txn.pending::<Vec<u8>, String>();
3948 assert!(pending.is_empty());
3949 commit(&mut table, pending);
3950 assert_eq!(
3951 table,
3952 BTreeMap::from([
3953 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3954 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3955 ])
3956 );
3957
3958 let mut table_txn =
3960 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3961 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3962 assert_eq!(prev, Some("v9".to_string()));
3963 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3964 assert_none!(prev);
3965 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3966 assert_none!(prev);
3967 let pending = table_txn.pending();
3968 assert_eq!(
3969 pending,
3970 vec![(
3971 1i64.to_le_bytes().to_vec(),
3972 "v9".to_string(),
3973 Diff::MINUS_ONE
3974 ),]
3975 );
3976 commit(&mut table, pending);
3977 assert_eq!(
3978 table,
3979 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3980 );
3981
3982 let mut table_txn =
3984 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3985 let prevs = table_txn.delete_by_keys(
3986 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3987 0,
3988 );
3989 assert_eq!(
3990 prevs,
3991 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3992 );
3993 let prevs = table_txn.delete_by_keys(
3994 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3995 1,
3996 );
3997 assert_eq!(prevs, vec![]);
3998 let prevs = table_txn.delete_by_keys(
3999 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4000 2,
4001 );
4002 assert_eq!(prevs, vec![]);
4003 let pending = table_txn.pending();
4004 assert_eq!(
4005 pending,
4006 vec![(
4007 42i64.to_le_bytes().to_vec(),
4008 "v7".to_string(),
4009 Diff::MINUS_ONE
4010 ),]
4011 );
4012 commit(&mut table, pending);
4013 assert_eq!(table, BTreeMap::new());
4014 }
4015
4016 #[mz_ore::test(tokio::test)]
4017 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
4019 const VERSION: Version = Version::new(26, 0, 0);
4020 let mut persist_cache = PersistClientCache::new_no_metrics();
4021 persist_cache.cfg.build_version = VERSION;
4022 let persist_client = persist_cache
4023 .open(PersistLocation::new_in_mem())
4024 .await
4025 .unwrap();
4026 let state_builder = TestCatalogStateBuilder::new(persist_client)
4027 .with_default_deploy_generation()
4028 .with_version(VERSION);
4029
4030 let _ = state_builder
4032 .clone()
4033 .unwrap_build()
4034 .await
4035 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4036 .await
4037 .unwrap()
4038 .0;
4039 let mut savepoint_state = state_builder
4040 .unwrap_build()
4041 .await
4042 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4043 .await
4044 .unwrap()
4045 .0;
4046
4047 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4048 assert!(!initial_snapshot.is_empty());
4049
4050 let db_name = "db";
4051 let db_owner = RoleId::User(42);
4052 let db_privileges = Vec::new();
4053 let mut txn = savepoint_state.transaction().await.unwrap();
4054 let (db_id, db_oid) = txn
4055 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4056 .unwrap();
4057 let commit_ts = txn.upper();
4058 txn.commit_internal(commit_ts).await.unwrap();
4059 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4060 let update = updates.into_element();
4061
4062 assert_eq!(update.diff, StateDiff::Addition);
4063
4064 let db = match update.kind {
4065 memory::objects::StateUpdateKind::Database(db) => db,
4066 update => panic!("unexpected update: {update:?}"),
4067 };
4068
4069 assert_eq!(db_id, db.id);
4070 assert_eq!(db_oid, db.oid);
4071 assert_eq!(db_name, db.name);
4072 assert_eq!(db_owner, db.owner_id);
4073 assert_eq!(db_privileges, db.privileges);
4074 }
4075
4076 #[mz_ore::test]
4077 fn test_allocate_introspection_source_index_id() {
4078 let cluster_variant: u8 = 0b0000_0001;
4079 let cluster_id_inner: u64 =
4080 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4081 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4082
4083 let cluster_id = ClusterId::System(cluster_id_inner);
4084 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4085
4086 let introspection_source_index_id: u64 =
4087 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4088
4089 {
4091 let mut cluster_variant_mask = 0xFF << 56;
4092 cluster_variant_mask &= introspection_source_index_id;
4093 cluster_variant_mask >>= 56;
4094 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4095 }
4096
4097 {
4099 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4100 cluster_id_inner_mask &= introspection_source_index_id;
4101 cluster_id_inner_mask >>= 8;
4102 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4103 }
4104
4105 {
4107 let mut log_variant_mask = 0xFF;
4108 log_variant_mask &= introspection_source_index_id;
4109 assert_eq!(
4110 log_variant_mask,
4111 u64::from(timely_messages_received_log_variant)
4112 );
4113 }
4114
4115 let (catalog_item_id, global_id) =
4116 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4117
4118 assert_eq!(
4119 catalog_item_id,
4120 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4121 );
4122 assert_eq!(
4123 global_id,
4124 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4125 );
4126 }
4127}