1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey, ConfigValue,
54 Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey, DefaultPrivilegesValue,
55 DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
56 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
57 ReplicaConfig, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
58 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
59 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
60 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
61 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
62};
63use crate::durable::{
64 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
66 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
67 SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY,
68 SYSTEM_ITEM_ALLOC_KEY, SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration,
69 USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY,
70 USER_ROLE_ID_ALLOC_KEY,
71};
72use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
73
74type Timestamp = u64;
75
76#[derive(Derivative)]
79#[derivative(Debug)]
80pub struct Transaction<'a> {
81 #[derivative(Debug = "ignore")]
82 #[derivative(PartialEq = "ignore")]
83 durable_catalog: &'a mut dyn DurableCatalogState,
84 databases: TableTransaction<DatabaseKey, DatabaseValue>,
85 schemas: TableTransaction<SchemaKey, SchemaValue>,
86 items: TableTransaction<ItemKey, ItemValue>,
87 comments: TableTransaction<CommentKey, CommentValue>,
88 roles: TableTransaction<RoleKey, RoleValue>,
89 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
90 clusters: TableTransaction<ClusterKey, ClusterValue>,
91 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
92 introspection_sources:
93 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
94 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
95 configs: TableTransaction<ConfigKey, ConfigValue>,
96 settings: TableTransaction<SettingKey, SettingValue>,
97 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
98 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
99 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
100 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
101 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
102 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
103 storage_collection_metadata:
104 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
105 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
106 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
107 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
110 upper: mz_repr::Timestamp,
112 op_id: Timestamp,
114}
115
116impl<'a> Transaction<'a> {
117 pub fn new(
118 durable_catalog: &'a mut dyn DurableCatalogState,
119 Snapshot {
120 databases,
121 schemas,
122 roles,
123 role_auth,
124 items,
125 comments,
126 clusters,
127 network_policies,
128 cluster_replicas,
129 introspection_sources,
130 id_allocator,
131 configs,
132 settings,
133 source_references,
134 system_object_mappings,
135 system_configurations,
136 default_privileges,
137 system_privileges,
138 storage_collection_metadata,
139 unfinalized_shards,
140 txn_wal_shard,
141 }: Snapshot,
142 upper: mz_repr::Timestamp,
143 ) -> Result<Transaction<'a>, CatalogError> {
144 Ok(Transaction {
145 durable_catalog,
146 databases: TableTransaction::new_with_uniqueness_fn(
147 databases,
148 |a: &DatabaseValue, b| a.name == b.name,
149 )?,
150 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
151 a.database_id == b.database_id && a.name == b.name
152 })?,
153 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
154 a.schema_id == b.schema_id && a.name == b.name && {
155 let a_type = a.item_type();
157 let b_type = b.item_type();
158 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
159 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
160 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
161 }
162 })?,
163 comments: TableTransaction::new(comments)?,
164 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
165 a.name == b.name
166 })?,
167 role_auth: TableTransaction::new(role_auth)?,
168 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
169 a.name == b.name
170 })?,
171 network_policies: TableTransaction::new_with_uniqueness_fn(
172 network_policies,
173 |a: &NetworkPolicyValue, b| a.name == b.name,
174 )?,
175 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
176 cluster_replicas,
177 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
178 )?,
179 introspection_sources: TableTransaction::new(introspection_sources)?,
180 id_allocator: TableTransaction::new(id_allocator)?,
181 configs: TableTransaction::new(configs)?,
182 settings: TableTransaction::new(settings)?,
183 source_references: TableTransaction::new(source_references)?,
184 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
185 system_configurations: TableTransaction::new(system_configurations)?,
186 default_privileges: TableTransaction::new(default_privileges)?,
187 system_privileges: TableTransaction::new(system_privileges)?,
188 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
189 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
190 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
194 audit_log_updates: Vec::new(),
195 upper,
196 op_id: 0,
197 })
198 }
199
200 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
201 let key = ItemKey { id: *id };
202 self.items
203 .get(&key)
204 .map(|v| DurableType::from_key_value(key, v.clone()))
205 }
206
207 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
208 self.items
209 .items()
210 .into_iter()
211 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
212 .sorted_by_key(|Item { id, .. }| *id)
213 }
214
215 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
216 self.insert_audit_log_events([event]);
217 }
218
219 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
220 let events = events
221 .into_iter()
222 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
223 self.audit_log_updates.extend(events);
224 }
225
226 pub fn insert_user_database(
227 &mut self,
228 database_name: &str,
229 owner_id: RoleId,
230 privileges: Vec<MzAclItem>,
231 temporary_oids: &HashSet<u32>,
232 ) -> Result<(DatabaseId, u32), CatalogError> {
233 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
234 let id = DatabaseId::User(id);
235 let oid = self.allocate_oid(temporary_oids)?;
236 self.insert_database(id, database_name, owner_id, privileges, oid)?;
237 Ok((id, oid))
238 }
239
240 pub(crate) fn insert_database(
241 &mut self,
242 id: DatabaseId,
243 database_name: &str,
244 owner_id: RoleId,
245 privileges: Vec<MzAclItem>,
246 oid: u32,
247 ) -> Result<u32, CatalogError> {
248 match self.databases.insert(
249 DatabaseKey { id },
250 DatabaseValue {
251 name: database_name.to_string(),
252 owner_id,
253 privileges,
254 oid,
255 },
256 self.op_id,
257 ) {
258 Ok(_) => Ok(oid),
259 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
260 }
261 }
262
263 pub fn insert_user_schema(
264 &mut self,
265 database_id: DatabaseId,
266 schema_name: &str,
267 owner_id: RoleId,
268 privileges: Vec<MzAclItem>,
269 temporary_oids: &HashSet<u32>,
270 ) -> Result<(SchemaId, u32), CatalogError> {
271 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
272 let id = SchemaId::User(id);
273 let oid = self.allocate_oid(temporary_oids)?;
274 self.insert_schema(
275 id,
276 Some(database_id),
277 schema_name.to_string(),
278 owner_id,
279 privileges,
280 oid,
281 )?;
282 Ok((id, oid))
283 }
284
285 pub fn insert_system_schema(
286 &mut self,
287 schema_id: u64,
288 schema_name: &str,
289 owner_id: RoleId,
290 privileges: Vec<MzAclItem>,
291 oid: u32,
292 ) -> Result<(), CatalogError> {
293 let id = SchemaId::System(schema_id);
294 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
295 }
296
297 pub(crate) fn insert_schema(
298 &mut self,
299 schema_id: SchemaId,
300 database_id: Option<DatabaseId>,
301 schema_name: String,
302 owner_id: RoleId,
303 privileges: Vec<MzAclItem>,
304 oid: u32,
305 ) -> Result<(), CatalogError> {
306 match self.schemas.insert(
307 SchemaKey { id: schema_id },
308 SchemaValue {
309 database_id,
310 name: schema_name.clone(),
311 owner_id,
312 privileges,
313 oid,
314 },
315 self.op_id,
316 ) {
317 Ok(_) => Ok(()),
318 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
319 }
320 }
321
322 pub fn insert_builtin_role(
323 &mut self,
324 id: RoleId,
325 name: String,
326 attributes: RoleAttributesRaw,
327 membership: RoleMembership,
328 vars: RoleVars,
329 oid: u32,
330 ) -> Result<RoleId, CatalogError> {
331 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
332 self.insert_role(id, name, attributes, membership, vars, oid)?;
333 Ok(id)
334 }
335
336 pub fn insert_user_role(
337 &mut self,
338 name: String,
339 attributes: RoleAttributesRaw,
340 membership: RoleMembership,
341 vars: RoleVars,
342 temporary_oids: &HashSet<u32>,
343 ) -> Result<(RoleId, u32), CatalogError> {
344 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
345 let id = RoleId::User(id);
346 let oid = self.allocate_oid(temporary_oids)?;
347 self.insert_role(id, name, attributes, membership, vars, oid)?;
348 Ok((id, oid))
349 }
350
351 fn insert_role(
352 &mut self,
353 id: RoleId,
354 name: String,
355 attributes: RoleAttributesRaw,
356 membership: RoleMembership,
357 vars: RoleVars,
358 oid: u32,
359 ) -> Result<(), CatalogError> {
360 if let Some(ref password) = attributes.password {
361 let hash = mz_auth::hash::scram256_hash(
362 password,
363 &attributes
364 .scram_iterations
365 .or_else(|| {
366 soft_panic_or_log!(
367 "Hash iterations must be set if a password is provided."
368 );
369 None
370 })
371 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
374 )
375 .expect("password hash should be valid");
376 match self.role_auth.insert(
377 RoleAuthKey { role_id: id },
378 RoleAuthValue {
379 password_hash: Some(hash),
380 updated_at: SYSTEM_TIME(),
381 },
382 self.op_id,
383 ) {
384 Ok(_) => {}
385 Err(_) => {
386 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
387 }
388 }
389 }
390
391 match self.roles.insert(
392 RoleKey { id },
393 RoleValue {
394 name: name.clone(),
395 attributes: attributes.into(),
396 membership,
397 vars,
398 oid,
399 },
400 self.op_id,
401 ) {
402 Ok(_) => Ok(()),
403 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
404 }
405 }
406
407 pub fn insert_user_cluster(
409 &mut self,
410 cluster_id: ClusterId,
411 cluster_name: &str,
412 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
413 owner_id: RoleId,
414 privileges: Vec<MzAclItem>,
415 config: ClusterConfig,
416 temporary_oids: &HashSet<u32>,
417 ) -> Result<(), CatalogError> {
418 self.insert_cluster(
419 cluster_id,
420 cluster_name,
421 introspection_source_indexes,
422 owner_id,
423 privileges,
424 config,
425 temporary_oids,
426 )
427 }
428
429 pub fn insert_system_cluster(
431 &mut self,
432 cluster_name: &str,
433 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
434 privileges: Vec<MzAclItem>,
435 owner_id: RoleId,
436 config: ClusterConfig,
437 temporary_oids: &HashSet<u32>,
438 ) -> Result<ClusterId, CatalogError> {
439 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
440 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
441 self.insert_cluster(
442 cluster_id,
443 cluster_name,
444 introspection_source_indexes,
445 owner_id,
446 privileges,
447 config,
448 temporary_oids,
449 )?;
450 Ok(cluster_id)
451 }
452
453 fn insert_cluster(
454 &mut self,
455 cluster_id: ClusterId,
456 cluster_name: &str,
457 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
458 owner_id: RoleId,
459 privileges: Vec<MzAclItem>,
460 config: ClusterConfig,
461 temporary_oids: &HashSet<u32>,
462 ) -> Result<(), CatalogError> {
463 if let Err(_) = self.clusters.insert(
464 ClusterKey { id: cluster_id },
465 ClusterValue {
466 name: cluster_name.to_string(),
467 owner_id,
468 privileges,
469 config,
470 },
471 self.op_id,
472 ) {
473 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
474 };
475
476 let amount = usize_to_u64(introspection_source_indexes.len());
477 let oids = self.allocate_oids(amount, temporary_oids)?;
478 let introspection_source_indexes: Vec<_> = introspection_source_indexes
479 .into_iter()
480 .zip_eq(oids)
481 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
482 .collect();
483 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
484 let introspection_source_index = IntrospectionSourceIndex {
485 cluster_id,
486 name: builtin.name.to_string(),
487 item_id,
488 index_id,
489 oid,
490 };
491 let (key, value) = introspection_source_index.into_key_value();
492 self.introspection_sources
493 .insert(key, value, self.op_id)
494 .expect("no uniqueness violation");
495 }
496
497 Ok(())
498 }
499
500 pub fn rename_cluster(
501 &mut self,
502 cluster_id: ClusterId,
503 cluster_name: &str,
504 cluster_to_name: &str,
505 ) -> Result<(), CatalogError> {
506 let key = ClusterKey { id: cluster_id };
507
508 match self.clusters.update(
509 |k, v| {
510 if *k == key {
511 let mut value = v.clone();
512 value.name = cluster_to_name.to_string();
513 Some(value)
514 } else {
515 None
516 }
517 },
518 self.op_id,
519 )? {
520 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
521 Diff::ONE => Ok(()),
522 n => panic!(
523 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
524 ),
525 }
526 }
527
528 pub fn rename_cluster_replica(
529 &mut self,
530 replica_id: ReplicaId,
531 replica_name: &QualifiedReplica,
532 replica_to_name: &str,
533 ) -> Result<(), CatalogError> {
534 let key = ClusterReplicaKey { id: replica_id };
535
536 match self.cluster_replicas.update(
537 |k, v| {
538 if *k == key {
539 let mut value = v.clone();
540 value.name = replica_to_name.to_string();
541 Some(value)
542 } else {
543 None
544 }
545 },
546 self.op_id,
547 )? {
548 Diff::ZERO => {
549 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
550 }
551 Diff::ONE => Ok(()),
552 n => panic!(
553 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
554 ),
555 }
556 }
557
558 pub fn insert_cluster_replica(
559 &mut self,
560 cluster_id: ClusterId,
561 replica_name: &str,
562 config: ReplicaConfig,
563 owner_id: RoleId,
564 ) -> Result<ReplicaId, CatalogError> {
565 let replica_id = match cluster_id {
566 ClusterId::System(_) => self.allocate_system_replica_id()?,
567 ClusterId::User(_) => self.allocate_user_replica_id()?,
568 };
569 self.insert_cluster_replica_with_id(
570 cluster_id,
571 replica_id,
572 replica_name,
573 config,
574 owner_id,
575 )?;
576 Ok(replica_id)
577 }
578
579 pub(crate) fn insert_cluster_replica_with_id(
580 &mut self,
581 cluster_id: ClusterId,
582 replica_id: ReplicaId,
583 replica_name: &str,
584 config: ReplicaConfig,
585 owner_id: RoleId,
586 ) -> Result<(), CatalogError> {
587 if let Err(_) = self.cluster_replicas.insert(
588 ClusterReplicaKey { id: replica_id },
589 ClusterReplicaValue {
590 cluster_id,
591 name: replica_name.into(),
592 config,
593 owner_id,
594 },
595 self.op_id,
596 ) {
597 let cluster = self
598 .clusters
599 .get(&ClusterKey { id: cluster_id })
600 .expect("cluster exists");
601 return Err(SqlCatalogError::DuplicateReplica(
602 replica_name.to_string(),
603 cluster.name.to_string(),
604 )
605 .into());
606 };
607 Ok(())
608 }
609
610 pub fn insert_user_network_policy(
611 &mut self,
612 name: String,
613 rules: Vec<NetworkPolicyRule>,
614 privileges: Vec<MzAclItem>,
615 owner_id: RoleId,
616 temporary_oids: &HashSet<u32>,
617 ) -> Result<NetworkPolicyId, CatalogError> {
618 let oid = self.allocate_oid(temporary_oids)?;
619 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
620 let id = NetworkPolicyId::User(id);
621 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
622 }
623
624 pub fn insert_network_policy(
625 &mut self,
626 id: NetworkPolicyId,
627 name: String,
628 rules: Vec<NetworkPolicyRule>,
629 privileges: Vec<MzAclItem>,
630 owner_id: RoleId,
631 oid: u32,
632 ) -> Result<NetworkPolicyId, CatalogError> {
633 match self.network_policies.insert(
634 NetworkPolicyKey { id },
635 NetworkPolicyValue {
636 name: name.clone(),
637 rules,
638 privileges,
639 owner_id,
640 oid,
641 },
642 self.op_id,
643 ) {
644 Ok(_) => Ok(id),
645 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
646 }
647 }
648
649 pub fn update_introspection_source_index_gids(
654 &mut self,
655 mappings: impl Iterator<
656 Item = (
657 ClusterId,
658 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
659 ),
660 >,
661 ) -> Result<(), CatalogError> {
662 for (cluster_id, updates) in mappings {
663 for (name, item_id, index_id, oid) in updates {
664 let introspection_source_index = IntrospectionSourceIndex {
665 cluster_id,
666 name,
667 item_id,
668 index_id,
669 oid,
670 };
671 let (key, value) = introspection_source_index.into_key_value();
672
673 let prev = self
674 .introspection_sources
675 .set(key, Some(value), self.op_id)?;
676 if prev.is_none() {
677 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
678 "{index_id}"
679 ))
680 .into());
681 }
682 }
683 }
684 Ok(())
685 }
686
687 pub fn insert_user_item(
688 &mut self,
689 id: CatalogItemId,
690 global_id: GlobalId,
691 schema_id: SchemaId,
692 item_name: &str,
693 create_sql: String,
694 owner_id: RoleId,
695 privileges: Vec<MzAclItem>,
696 temporary_oids: &HashSet<u32>,
697 versions: BTreeMap<RelationVersion, GlobalId>,
698 ) -> Result<u32, CatalogError> {
699 let oid = self.allocate_oid(temporary_oids)?;
700 self.insert_item(
701 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
702 )?;
703 Ok(oid)
704 }
705
706 pub fn insert_item(
707 &mut self,
708 id: CatalogItemId,
709 oid: u32,
710 global_id: GlobalId,
711 schema_id: SchemaId,
712 item_name: &str,
713 create_sql: String,
714 owner_id: RoleId,
715 privileges: Vec<MzAclItem>,
716 extra_versions: BTreeMap<RelationVersion, GlobalId>,
717 ) -> Result<(), CatalogError> {
718 match self.items.insert(
719 ItemKey { id },
720 ItemValue {
721 schema_id,
722 name: item_name.to_string(),
723 create_sql,
724 owner_id,
725 privileges,
726 oid,
727 global_id,
728 extra_versions,
729 },
730 self.op_id,
731 ) {
732 Ok(_) => Ok(()),
733 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
734 }
735 }
736
737 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
738 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
739 }
740
741 pub fn get_and_increment_id_by(
742 &mut self,
743 key: String,
744 amount: u64,
745 ) -> Result<Vec<u64>, CatalogError> {
746 assert!(
747 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
748 "system item IDs cannot be allocated outside of bootstrap"
749 );
750
751 let current_id = self
752 .id_allocator
753 .items()
754 .get(&IdAllocKey { name: key.clone() })
755 .unwrap_or_else(|| panic!("{key} id allocator missing"))
756 .next_id;
757 let next_id = current_id
758 .checked_add(amount)
759 .ok_or(SqlCatalogError::IdExhaustion)?;
760 let prev = self.id_allocator.set(
761 IdAllocKey { name: key },
762 Some(IdAllocValue { next_id }),
763 self.op_id,
764 )?;
765 assert_eq!(
766 prev,
767 Some(IdAllocValue {
768 next_id: current_id
769 })
770 );
771 Ok((current_id..next_id).collect())
772 }
773
774 pub fn allocate_system_item_ids(
775 &mut self,
776 amount: u64,
777 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
778 assert!(
779 !self.durable_catalog.is_bootstrap_complete(),
780 "we can only allocate system item IDs during bootstrap"
781 );
782 Ok(self
783 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
784 .into_iter()
785 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
787 .collect())
788 }
789
790 pub fn allocate_introspection_source_index_id(
822 cluster_id: &ClusterId,
823 log_variant: LogVariant,
824 ) -> (CatalogItemId, GlobalId) {
825 let cluster_variant: u8 = match cluster_id {
826 ClusterId::System(_) => 1,
827 ClusterId::User(_) => 2,
828 };
829 let cluster_id: u64 = cluster_id.inner_id();
830 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
831 assert_eq!(
832 CLUSTER_ID_MASK & cluster_id,
833 0,
834 "invalid cluster ID: {cluster_id}"
835 );
836 let log_variant: u8 = match log_variant {
837 LogVariant::Timely(TimelyLog::Operates) => 1,
838 LogVariant::Timely(TimelyLog::Channels) => 2,
839 LogVariant::Timely(TimelyLog::Elapsed) => 3,
840 LogVariant::Timely(TimelyLog::Histogram) => 4,
841 LogVariant::Timely(TimelyLog::Addresses) => 5,
842 LogVariant::Timely(TimelyLog::Parks) => 6,
843 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
844 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
845 LogVariant::Timely(TimelyLog::Reachability) => 9,
846 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
847 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
848 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
849 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
850 LogVariant::Differential(DifferentialLog::Sharing) => 14,
851 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
852 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
853 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
854 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
855 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
856 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
857 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
858 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
859 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
860 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
861 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
862 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
863 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
864 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
865 LogVariant::Compute(ComputeLog::LirMapping) => 30,
866 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
867 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
868 LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
869 };
870
871 let mut id: u64 = u64::from(cluster_variant) << 56;
872 id |= cluster_id << 8;
873 id |= u64::from(log_variant);
874
875 (
876 CatalogItemId::IntrospectionSourceIndex(id),
877 GlobalId::IntrospectionSourceIndex(id),
878 )
879 }
880
881 pub fn allocate_user_item_ids(
882 &mut self,
883 amount: u64,
884 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
885 Ok(self
886 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
887 .into_iter()
888 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
890 .collect())
891 }
892
893 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
894 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
895 Ok(ReplicaId::User(id))
896 }
897
898 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
899 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
900 Ok(ReplicaId::System(id))
901 }
902
903 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
904 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
905 }
906
907 pub fn allocate_storage_usage_ids(&mut self) -> Result<u64, CatalogError> {
908 self.get_and_increment_id(STORAGE_USAGE_ID_ALLOC_KEY.to_string())
909 }
910
911 #[mz_ore::instrument]
914 fn allocate_oids(
915 &mut self,
916 amount: u64,
917 temporary_oids: &HashSet<u32>,
918 ) -> Result<Vec<u32>, CatalogError> {
919 struct UserOid(u32);
922
923 impl UserOid {
924 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
925 if oid < FIRST_USER_OID {
926 Err(anyhow!("invalid user OID {oid}"))
927 } else {
928 Ok(UserOid(oid))
929 }
930 }
931 }
932
933 impl std::ops::AddAssign<u32> for UserOid {
934 fn add_assign(&mut self, rhs: u32) {
935 let (res, overflow) = self.0.overflowing_add(rhs);
936 self.0 = if overflow { FIRST_USER_OID + res } else { res };
937 }
938 }
939
940 if amount > u32::MAX.into() {
941 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
942 }
943
944 let mut allocated_oids = HashSet::with_capacity(
950 self.databases.len()
951 + self.schemas.len()
952 + self.roles.len()
953 + self.items.len()
954 + self.introspection_sources.len()
955 + temporary_oids.len(),
956 );
957 self.databases.for_values(|_, value| {
958 allocated_oids.insert(value.oid);
959 });
960 self.schemas.for_values(|_, value| {
961 allocated_oids.insert(value.oid);
962 });
963 self.roles.for_values(|_, value| {
964 allocated_oids.insert(value.oid);
965 });
966 self.items.for_values(|_, value| {
967 allocated_oids.insert(value.oid);
968 });
969 self.introspection_sources.for_values(|_, value| {
970 allocated_oids.insert(value.oid);
971 });
972
973 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
974
975 let start_oid: u32 = self
976 .id_allocator
977 .items()
978 .get(&IdAllocKey {
979 name: OID_ALLOC_KEY.to_string(),
980 })
981 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
982 .next_id
983 .try_into()
984 .expect("we should never persist an oid outside of the u32 range");
985 let mut current_oid = UserOid::new(start_oid)
986 .expect("we should never persist an oid outside of user OID range");
987 let mut oids = Vec::new();
988 while oids.len() < u64_to_usize(amount) {
989 if !is_allocated(current_oid.0) {
990 oids.push(current_oid.0);
991 }
992 current_oid += 1;
993
994 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
995 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
997 }
998 }
999
1000 let next_id = current_oid.0;
1001 let prev = self.id_allocator.set(
1002 IdAllocKey {
1003 name: OID_ALLOC_KEY.to_string(),
1004 },
1005 Some(IdAllocValue {
1006 next_id: next_id.into(),
1007 }),
1008 self.op_id,
1009 )?;
1010 assert_eq!(
1011 prev,
1012 Some(IdAllocValue {
1013 next_id: start_oid.into(),
1014 })
1015 );
1016
1017 Ok(oids)
1018 }
1019
1020 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1023 self.allocate_oids(1, temporary_oids)
1024 .map(|oids| oids.into_element())
1025 }
1026
1027 pub fn current_snapshot(&self) -> Snapshot {
1035 Snapshot {
1036 databases: self.databases.current_items_proto(),
1037 schemas: self.schemas.current_items_proto(),
1038 roles: self.roles.current_items_proto(),
1039 role_auth: self.role_auth.current_items_proto(),
1040 items: self.items.current_items_proto(),
1041 comments: self.comments.current_items_proto(),
1042 clusters: self.clusters.current_items_proto(),
1043 network_policies: self.network_policies.current_items_proto(),
1044 cluster_replicas: self.cluster_replicas.current_items_proto(),
1045 introspection_sources: self.introspection_sources.current_items_proto(),
1046 id_allocator: self.id_allocator.current_items_proto(),
1047 configs: self.configs.current_items_proto(),
1048 settings: self.settings.current_items_proto(),
1049 system_object_mappings: self.system_gid_mapping.current_items_proto(),
1050 system_configurations: self.system_configurations.current_items_proto(),
1051 default_privileges: self.default_privileges.current_items_proto(),
1052 source_references: self.source_references.current_items_proto(),
1053 system_privileges: self.system_privileges.current_items_proto(),
1054 storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1055 unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1056 txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1057 }
1058 }
1059
1060 pub(crate) fn insert_id_allocator(
1061 &mut self,
1062 name: String,
1063 next_id: u64,
1064 ) -> Result<(), CatalogError> {
1065 match self.id_allocator.insert(
1066 IdAllocKey { name: name.clone() },
1067 IdAllocValue { next_id },
1068 self.op_id,
1069 ) {
1070 Ok(_) => Ok(()),
1071 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1072 }
1073 }
1074
1075 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1082 let prev = self
1083 .databases
1084 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1085 if prev.is_some() {
1086 Ok(())
1087 } else {
1088 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1089 }
1090 }
1091
1092 pub fn remove_databases(
1099 &mut self,
1100 databases: &BTreeSet<DatabaseId>,
1101 ) -> Result<(), CatalogError> {
1102 if databases.is_empty() {
1103 return Ok(());
1104 }
1105
1106 let to_remove = databases
1107 .iter()
1108 .map(|id| (DatabaseKey { id: *id }, None))
1109 .collect();
1110 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1111 prev.retain(|_k, val| val.is_none());
1112
1113 if !prev.is_empty() {
1114 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1115 return Err(SqlCatalogError::UnknownDatabase(err).into());
1116 }
1117
1118 Ok(())
1119 }
1120
1121 pub fn remove_schema(
1128 &mut self,
1129 database_id: &Option<DatabaseId>,
1130 schema_id: &SchemaId,
1131 ) -> Result<(), CatalogError> {
1132 let prev = self
1133 .schemas
1134 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1135 if prev.is_some() {
1136 Ok(())
1137 } else {
1138 let database_name = match database_id {
1139 Some(id) => format!("{id}."),
1140 None => "".to_string(),
1141 };
1142 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1143 }
1144 }
1145
1146 pub fn remove_schemas(
1153 &mut self,
1154 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1155 ) -> Result<(), CatalogError> {
1156 if schemas.is_empty() {
1157 return Ok(());
1158 }
1159
1160 let to_remove = schemas
1161 .iter()
1162 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1163 .collect();
1164 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1165 prev.retain(|_k, v| v.is_none());
1166
1167 if !prev.is_empty() {
1168 let err = prev
1169 .keys()
1170 .map(|k| {
1171 let db_spec = schemas.get(&k.id).expect("should_exist");
1172 let db_name = match db_spec {
1173 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1174 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1175 };
1176 format!("{}.{}", db_name, k.id)
1177 })
1178 .join(", ");
1179
1180 return Err(SqlCatalogError::UnknownSchema(err).into());
1181 }
1182
1183 Ok(())
1184 }
1185
1186 pub fn remove_source_references(
1187 &mut self,
1188 source_id: CatalogItemId,
1189 ) -> Result<(), CatalogError> {
1190 let deleted = self
1191 .source_references
1192 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1193 .is_some();
1194 if deleted {
1195 Ok(())
1196 } else {
1197 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1198 }
1199 }
1200
1201 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1208 assert!(
1209 roles.iter().all(|id| id.is_user()),
1210 "cannot delete non-user roles"
1211 );
1212 self.remove_roles(roles)
1213 }
1214
1215 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1222 if roles.is_empty() {
1223 return Ok(());
1224 }
1225
1226 let to_remove_keys = roles
1227 .iter()
1228 .map(|role_id| RoleKey { id: *role_id })
1229 .collect::<Vec<_>>();
1230
1231 let to_remove_roles = to_remove_keys
1232 .iter()
1233 .map(|role_key| (role_key.clone(), None))
1234 .collect();
1235
1236 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1237
1238 let to_remove_role_auth = to_remove_keys
1239 .iter()
1240 .map(|role_key| {
1241 (
1242 RoleAuthKey {
1243 role_id: role_key.id,
1244 },
1245 None,
1246 )
1247 })
1248 .collect();
1249
1250 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1251
1252 prev.retain(|_k, v| v.is_none());
1253 if !prev.is_empty() {
1254 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1255 return Err(SqlCatalogError::UnknownRole(err).into());
1256 }
1257
1258 role_auth_prev.retain(|_k, v| v.is_none());
1259 Ok(())
1263 }
1264
1265 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1272 if clusters.is_empty() {
1273 return Ok(());
1274 }
1275
1276 let to_remove = clusters
1277 .iter()
1278 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1279 .collect();
1280 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1281
1282 prev.retain(|_k, v| v.is_none());
1283 if !prev.is_empty() {
1284 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1285 return Err(SqlCatalogError::UnknownCluster(err).into());
1286 }
1287
1288 self.cluster_replicas
1294 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1295 self.introspection_sources
1296 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1297
1298 Ok(())
1299 }
1300
1301 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1308 let deleted = self
1309 .cluster_replicas
1310 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1311 .is_some();
1312 if deleted {
1313 Ok(())
1314 } else {
1315 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1316 }
1317 }
1318
1319 pub fn remove_cluster_replicas(
1326 &mut self,
1327 replicas: &BTreeSet<ReplicaId>,
1328 ) -> Result<(), CatalogError> {
1329 if replicas.is_empty() {
1330 return Ok(());
1331 }
1332
1333 let to_remove = replicas
1334 .iter()
1335 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1336 .collect();
1337 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1338
1339 prev.retain(|_k, v| v.is_none());
1340 if !prev.is_empty() {
1341 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1342 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1343 }
1344
1345 Ok(())
1346 }
1347
1348 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1355 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1356 if prev.is_some() {
1357 Ok(())
1358 } else {
1359 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1360 }
1361 }
1362
1363 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1370 if ids.is_empty() {
1371 return Ok(());
1372 }
1373
1374 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1375 let n = self.items.delete_by_keys(ks, self.op_id).len();
1376 if n == ids.len() {
1377 Ok(())
1378 } else {
1379 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1380 let mut unknown = ids.difference(&item_ids);
1381 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1382 }
1383 }
1384
1385 pub fn remove_system_object_mappings(
1392 &mut self,
1393 descriptions: BTreeSet<SystemObjectDescription>,
1394 ) -> Result<(), CatalogError> {
1395 if descriptions.is_empty() {
1396 return Ok(());
1397 }
1398
1399 let ks: Vec<_> = descriptions
1400 .clone()
1401 .into_iter()
1402 .map(|desc| GidMappingKey {
1403 schema_name: desc.schema_name,
1404 object_type: desc.object_type,
1405 object_name: desc.object_name,
1406 })
1407 .collect();
1408 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1409
1410 if n == descriptions.len() {
1411 Ok(())
1412 } else {
1413 let item_descriptions = self
1414 .system_gid_mapping
1415 .items()
1416 .keys()
1417 .map(|k| SystemObjectDescription {
1418 schema_name: k.schema_name.clone(),
1419 object_type: k.object_type.clone(),
1420 object_name: k.object_name.clone(),
1421 })
1422 .collect();
1423 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1424 format!(
1425 "{} {}.{}",
1426 desc.object_type, desc.schema_name, desc.object_name
1427 )
1428 });
1429 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1430 }
1431 }
1432
1433 pub fn remove_introspection_source_indexes(
1440 &mut self,
1441 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1442 ) -> Result<(), CatalogError> {
1443 if introspection_source_indexes.is_empty() {
1444 return Ok(());
1445 }
1446
1447 let ks: Vec<_> = introspection_source_indexes
1448 .clone()
1449 .into_iter()
1450 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1451 .collect();
1452 let n = self
1453 .introspection_sources
1454 .delete_by_keys(ks, self.op_id)
1455 .len();
1456 if n == introspection_source_indexes.len() {
1457 Ok(())
1458 } else {
1459 let txn_indexes = self
1460 .introspection_sources
1461 .items()
1462 .keys()
1463 .map(|k| (k.cluster_id, k.name.clone()))
1464 .collect();
1465 let mut unknown = introspection_source_indexes
1466 .difference(&txn_indexes)
1467 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1468 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1469 }
1470 }
1471
1472 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1479 let updated =
1480 self.items
1481 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1482 if updated {
1483 Ok(())
1484 } else {
1485 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1486 }
1487 }
1488
1489 pub fn update_items(
1497 &mut self,
1498 items: BTreeMap<CatalogItemId, Item>,
1499 ) -> Result<(), CatalogError> {
1500 if items.is_empty() {
1501 return Ok(());
1502 }
1503
1504 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1505 let kvs: Vec<_> = items
1506 .clone()
1507 .into_iter()
1508 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1509 .collect();
1510 let n = self.items.update_by_keys(kvs, self.op_id)?;
1511 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1512 if n == update_ids.len() {
1513 Ok(())
1514 } else {
1515 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1516 let mut unknown = update_ids.difference(&item_ids);
1517 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1518 }
1519 }
1520
1521 pub fn update_role(
1529 &mut self,
1530 id: RoleId,
1531 role: Role,
1532 password: PasswordAction,
1533 ) -> Result<(), CatalogError> {
1534 let key = RoleKey { id };
1535 if self.roles.get(&key).is_some() {
1536 let auth_key = RoleAuthKey { role_id: id };
1537
1538 match password {
1539 PasswordAction::Set(new_password) => {
1540 let hash = mz_auth::hash::scram256_hash(
1541 &new_password.password,
1542 &new_password.scram_iterations,
1543 )
1544 .expect("password hash should be valid");
1545 let value = RoleAuthValue {
1546 password_hash: Some(hash),
1547 updated_at: SYSTEM_TIME(),
1548 };
1549
1550 if self.role_auth.get(&auth_key).is_some() {
1551 self.role_auth
1552 .update_by_key(auth_key.clone(), value, self.op_id)?;
1553 } else {
1554 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1555 }
1556 }
1557 PasswordAction::Clear => {
1558 let value = RoleAuthValue {
1559 password_hash: None,
1560 updated_at: SYSTEM_TIME(),
1561 };
1562 if self.role_auth.get(&auth_key).is_some() {
1563 self.role_auth
1564 .update_by_key(auth_key.clone(), value, self.op_id)?;
1565 }
1566 }
1567 PasswordAction::NoChange => {}
1568 }
1569
1570 self.roles
1571 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1572
1573 Ok(())
1574 } else {
1575 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1576 }
1577 }
1578
1579 pub fn update_roles_without_auth(
1590 &mut self,
1591 roles: BTreeMap<RoleId, Role>,
1592 ) -> Result<(), CatalogError> {
1593 if roles.is_empty() {
1594 return Ok(());
1595 }
1596
1597 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1598 let kvs: Vec<_> = roles
1599 .into_iter()
1600 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1601 .collect();
1602 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1603 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1604
1605 if n == update_role_ids.len() {
1606 Ok(())
1607 } else {
1608 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1609 let mut unknown = update_role_ids.difference(&role_ids);
1610 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1611 }
1612 }
1613
1614 pub fn update_system_object_mappings(
1619 &mut self,
1620 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1621 ) -> Result<(), CatalogError> {
1622 if mappings.is_empty() {
1623 return Ok(());
1624 }
1625
1626 let n = self.system_gid_mapping.update(
1627 |_k, v| {
1628 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1629 let (_, new_value) = mapping.clone().into_key_value();
1630 Some(new_value)
1631 } else {
1632 None
1633 }
1634 },
1635 self.op_id,
1636 )?;
1637
1638 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1639 != mappings.len()
1640 {
1641 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1642 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1643 }
1644
1645 Ok(())
1646 }
1647
1648 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1655 let updated = self.clusters.update_by_key(
1656 ClusterKey { id },
1657 cluster.into_key_value().1,
1658 self.op_id,
1659 )?;
1660 if updated {
1661 Ok(())
1662 } else {
1663 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1664 }
1665 }
1666
1667 pub fn update_cluster_replica(
1674 &mut self,
1675 replica_id: ReplicaId,
1676 replica: ClusterReplica,
1677 ) -> Result<(), CatalogError> {
1678 let updated = self.cluster_replicas.update_by_key(
1679 ClusterReplicaKey { id: replica_id },
1680 replica.into_key_value().1,
1681 self.op_id,
1682 )?;
1683 if updated {
1684 Ok(())
1685 } else {
1686 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1687 }
1688 }
1689
1690 pub fn update_database(
1697 &mut self,
1698 id: DatabaseId,
1699 database: Database,
1700 ) -> Result<(), CatalogError> {
1701 let updated = self.databases.update_by_key(
1702 DatabaseKey { id },
1703 database.into_key_value().1,
1704 self.op_id,
1705 )?;
1706 if updated {
1707 Ok(())
1708 } else {
1709 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1710 }
1711 }
1712
1713 pub fn update_schema(
1720 &mut self,
1721 schema_id: SchemaId,
1722 schema: Schema,
1723 ) -> Result<(), CatalogError> {
1724 let updated = self.schemas.update_by_key(
1725 SchemaKey { id: schema_id },
1726 schema.into_key_value().1,
1727 self.op_id,
1728 )?;
1729 if updated {
1730 Ok(())
1731 } else {
1732 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1733 }
1734 }
1735
1736 pub fn update_network_policy(
1743 &mut self,
1744 id: NetworkPolicyId,
1745 network_policy: NetworkPolicy,
1746 ) -> Result<(), CatalogError> {
1747 let updated = self.network_policies.update_by_key(
1748 NetworkPolicyKey { id },
1749 network_policy.into_key_value().1,
1750 self.op_id,
1751 )?;
1752 if updated {
1753 Ok(())
1754 } else {
1755 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1756 }
1757 }
1758 pub fn remove_network_policies(
1765 &mut self,
1766 network_policies: &BTreeSet<NetworkPolicyId>,
1767 ) -> Result<(), CatalogError> {
1768 if network_policies.is_empty() {
1769 return Ok(());
1770 }
1771
1772 let to_remove = network_policies
1773 .iter()
1774 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1775 .collect();
1776 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1777 assert!(
1778 prev.iter().all(|(k, _)| k.id.is_user()),
1779 "cannot delete non-user network policy"
1780 );
1781
1782 prev.retain(|_k, v| v.is_none());
1783 if !prev.is_empty() {
1784 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1785 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1786 }
1787
1788 Ok(())
1789 }
1790 pub fn set_default_privilege(
1794 &mut self,
1795 role_id: RoleId,
1796 database_id: Option<DatabaseId>,
1797 schema_id: Option<SchemaId>,
1798 object_type: ObjectType,
1799 grantee: RoleId,
1800 privileges: Option<AclMode>,
1801 ) -> Result<(), CatalogError> {
1802 self.default_privileges.set(
1803 DefaultPrivilegesKey {
1804 role_id,
1805 database_id,
1806 schema_id,
1807 object_type,
1808 grantee,
1809 },
1810 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1811 self.op_id,
1812 )?;
1813 Ok(())
1814 }
1815
1816 pub fn set_default_privileges(
1818 &mut self,
1819 default_privileges: Vec<DefaultPrivilege>,
1820 ) -> Result<(), CatalogError> {
1821 if default_privileges.is_empty() {
1822 return Ok(());
1823 }
1824
1825 let default_privileges = default_privileges
1826 .into_iter()
1827 .map(DurableType::into_key_value)
1828 .map(|(k, v)| (k, Some(v)))
1829 .collect();
1830 self.default_privileges
1831 .set_many(default_privileges, self.op_id)?;
1832 Ok(())
1833 }
1834
1835 pub fn set_system_privilege(
1839 &mut self,
1840 grantee: RoleId,
1841 grantor: RoleId,
1842 acl_mode: Option<AclMode>,
1843 ) -> Result<(), CatalogError> {
1844 self.system_privileges.set(
1845 SystemPrivilegesKey { grantee, grantor },
1846 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1847 self.op_id,
1848 )?;
1849 Ok(())
1850 }
1851
1852 pub fn set_system_privileges(
1854 &mut self,
1855 system_privileges: Vec<MzAclItem>,
1856 ) -> Result<(), CatalogError> {
1857 if system_privileges.is_empty() {
1858 return Ok(());
1859 }
1860
1861 let system_privileges = system_privileges
1862 .into_iter()
1863 .map(DurableType::into_key_value)
1864 .map(|(k, v)| (k, Some(v)))
1865 .collect();
1866 self.system_privileges
1867 .set_many(system_privileges, self.op_id)?;
1868 Ok(())
1869 }
1870
1871 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1873 self.settings.set(
1874 SettingKey { name },
1875 value.map(|value| SettingValue { value }),
1876 self.op_id,
1877 )?;
1878 Ok(())
1879 }
1880
1881 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1882 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1883 }
1884
1885 pub fn insert_introspection_source_indexes(
1887 &mut self,
1888 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1889 temporary_oids: &HashSet<u32>,
1890 ) -> Result<(), CatalogError> {
1891 if introspection_source_indexes.is_empty() {
1892 return Ok(());
1893 }
1894
1895 let amount = usize_to_u64(introspection_source_indexes.len());
1896 let oids = self.allocate_oids(amount, temporary_oids)?;
1897 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1898 .into_iter()
1899 .zip_eq(oids)
1900 .map(
1901 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1902 cluster_id,
1903 name,
1904 item_id,
1905 index_id,
1906 oid,
1907 },
1908 )
1909 .collect();
1910
1911 for introspection_source_index in introspection_source_indexes {
1912 let (key, value) = introspection_source_index.into_key_value();
1913 self.introspection_sources.insert(key, value, self.op_id)?;
1914 }
1915
1916 Ok(())
1917 }
1918
1919 pub fn set_system_object_mappings(
1921 &mut self,
1922 mappings: Vec<SystemObjectMapping>,
1923 ) -> Result<(), CatalogError> {
1924 if mappings.is_empty() {
1925 return Ok(());
1926 }
1927
1928 let mappings = mappings
1929 .into_iter()
1930 .map(DurableType::into_key_value)
1931 .map(|(k, v)| (k, Some(v)))
1932 .collect();
1933 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1934 Ok(())
1935 }
1936
1937 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1939 if replicas.is_empty() {
1940 return Ok(());
1941 }
1942
1943 let replicas = replicas
1944 .into_iter()
1945 .map(DurableType::into_key_value)
1946 .map(|(k, v)| (k, Some(v)))
1947 .collect();
1948 self.cluster_replicas.set_many(replicas, self.op_id)?;
1949 Ok(())
1950 }
1951
1952 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1954 match value {
1955 Some(value) => {
1956 let config = Config { key, value };
1957 let (key, value) = config.into_key_value();
1958 self.configs.set(key, Some(value), self.op_id)?;
1959 }
1960 None => {
1961 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1962 }
1963 }
1964 Ok(())
1965 }
1966
1967 pub fn get_config(&self, key: String) -> Option<u64> {
1969 self.configs
1970 .get(&ConfigKey { key })
1971 .map(|entry| entry.value)
1972 }
1973
1974 pub fn get_setting(&self, name: String) -> Option<&str> {
1976 self.settings
1977 .get(&SettingKey { name })
1978 .map(|entry| &*entry.value)
1979 }
1980
1981 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1982 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1983 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1984 }
1985
1986 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1987 self.set_setting(
1988 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1989 Some(shard_id.to_string()),
1990 )
1991 }
1992
1993 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1994 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1995 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1996 }
1997
1998 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1999 self.set_setting(
2000 EXPRESSION_CACHE_SHARD_KEY.to_string(),
2001 Some(shard_id.to_string()),
2002 )
2003 }
2004
2005 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2011 self.set_config(
2012 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2013 Some(
2014 value
2015 .as_millis()
2016 .try_into()
2017 .expect("max wait fits into u64"),
2018 ),
2019 )
2020 }
2021
2022 pub fn set_0dt_deployment_ddl_check_interval(
2029 &mut self,
2030 value: Duration,
2031 ) -> Result<(), CatalogError> {
2032 self.set_config(
2033 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2034 Some(
2035 value
2036 .as_millis()
2037 .try_into()
2038 .expect("ddl check interval fits into u64"),
2039 ),
2040 )
2041 }
2042
2043 pub fn set_enable_0dt_deployment_panic_after_timeout(
2049 &mut self,
2050 value: bool,
2051 ) -> Result<(), CatalogError> {
2052 self.set_config(
2053 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2054 Some(u64::from(value)),
2055 )
2056 }
2057
2058 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2064 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2065 }
2066
2067 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2074 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2075 }
2076
2077 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2084 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2085 }
2086
2087 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2089 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2090 }
2091
2092 pub fn update_comment(
2093 &mut self,
2094 object_id: CommentObjectId,
2095 sub_component: Option<usize>,
2096 comment: Option<String>,
2097 ) -> Result<(), CatalogError> {
2098 let key = CommentKey {
2099 object_id,
2100 sub_component,
2101 };
2102 let value = comment.map(|c| CommentValue { comment: c });
2103 self.comments.set(key, value, self.op_id)?;
2104
2105 Ok(())
2106 }
2107
2108 pub fn drop_comments(
2109 &mut self,
2110 object_ids: &BTreeSet<CommentObjectId>,
2111 ) -> Result<(), CatalogError> {
2112 if object_ids.is_empty() {
2113 return Ok(());
2114 }
2115
2116 self.comments
2117 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2118 Ok(())
2119 }
2120
2121 pub fn update_source_references(
2122 &mut self,
2123 source_id: CatalogItemId,
2124 references: Vec<SourceReference>,
2125 updated_at: u64,
2126 ) -> Result<(), CatalogError> {
2127 let key = SourceReferencesKey { source_id };
2128 let value = SourceReferencesValue {
2129 references,
2130 updated_at,
2131 };
2132 self.source_references.set(key, Some(value), self.op_id)?;
2133 Ok(())
2134 }
2135
2136 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2138 let key = ServerConfigurationKey {
2139 name: name.to_string(),
2140 };
2141 let value = ServerConfigurationValue { value };
2142 self.system_configurations
2143 .set(key, Some(value), self.op_id)?;
2144 Ok(())
2145 }
2146
2147 pub fn remove_system_config(&mut self, name: &str) {
2149 let key = ServerConfigurationKey {
2150 name: name.to_string(),
2151 };
2152 self.system_configurations
2153 .set(key, None, self.op_id)
2154 .expect("cannot have uniqueness violation");
2155 }
2156
2157 pub fn clear_system_configs(&mut self) {
2159 self.system_configurations.delete(|_k, _v| true, self.op_id);
2160 }
2161
2162 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2163 match self.configs.insert(
2164 ConfigKey { key: key.clone() },
2165 ConfigValue { value },
2166 self.op_id,
2167 ) {
2168 Ok(_) => Ok(()),
2169 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2170 }
2171 }
2172
2173 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2174 self.clusters
2175 .items()
2176 .into_iter()
2177 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2178 }
2179
2180 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2181 self.cluster_replicas
2182 .items()
2183 .into_iter()
2184 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2185 }
2186
2187 pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2188 self.databases
2189 .items()
2190 .into_iter()
2191 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2192 }
2193
2194 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2195 self.roles
2196 .items()
2197 .into_iter()
2198 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2199 }
2200
2201 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2202 self.network_policies
2203 .items()
2204 .into_iter()
2205 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2206 }
2207
2208 pub fn get_system_object_mappings(
2209 &self,
2210 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2211 self.system_gid_mapping
2212 .items()
2213 .into_iter()
2214 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2215 }
2216
2217 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2218 self.schemas
2219 .items()
2220 .into_iter()
2221 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2222 }
2223
2224 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2225 self.system_configurations
2226 .items()
2227 .into_iter()
2228 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2229 }
2230
2231 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2232 let key = SchemaKey { id: *id };
2233 self.schemas
2234 .get(&key)
2235 .map(|v| DurableType::from_key_value(key, v.clone()))
2236 }
2237
2238 pub fn get_introspection_source_indexes(
2239 &self,
2240 cluster_id: ClusterId,
2241 ) -> BTreeMap<&str, (GlobalId, u32)> {
2242 self.introspection_sources
2243 .items()
2244 .into_iter()
2245 .filter(|(k, _v)| k.cluster_id == cluster_id)
2246 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2247 .collect()
2248 }
2249
2250 pub fn get_catalog_content_version(&self) -> Option<&str> {
2251 self.settings
2252 .get(&SettingKey {
2253 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2254 })
2255 .map(|value| &*value.value)
2256 }
2257
2258 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2259 self.settings
2260 .get(&SettingKey {
2261 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2262 })
2263 .map(|value| value.value.clone())
2264 }
2265
2266 #[must_use]
2272 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2273 let updates = self.get_op_updates();
2274 self.commit_op();
2275 updates
2276 }
2277
2278 fn get_op_updates(&self) -> Vec<StateUpdate> {
2279 fn get_collection_op_updates<'a, T>(
2280 table_txn: &'a TableTransaction<T::Key, T::Value>,
2281 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2282 op: Timestamp,
2283 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2284 where
2285 T::Key: Ord + Eq + Clone + Debug,
2286 T::Value: Ord + Clone + Debug,
2287 T: DurableType,
2288 {
2289 table_txn
2290 .pending
2291 .iter()
2292 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2293 .filter_map(move |(k, v)| {
2294 if v.ts == op {
2295 let key = k.clone();
2296 let value = v.value.clone();
2297 let diff = v.diff.clone().try_into().expect("invalid diff");
2298 let update = DurableType::from_key_value(key, value);
2299 let kind = kind_fn(update);
2300 Some((kind, diff))
2301 } else {
2302 None
2303 }
2304 })
2305 }
2306
2307 fn get_large_collection_op_updates<'a, T>(
2308 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2309 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2310 op: Timestamp,
2311 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2312 where
2313 T::Key: Ord + Eq + Clone + Debug,
2314 T: DurableType<Value = ()>,
2315 {
2316 collection.iter().filter_map(move |(k, diff, ts)| {
2317 if *ts == op {
2318 let key = k.clone();
2319 let diff = diff.clone().try_into().expect("invalid diff");
2320 let update = DurableType::from_key_value(key, ());
2321 let kind = kind_fn(update);
2322 Some((kind, diff))
2323 } else {
2324 None
2325 }
2326 })
2327 }
2328
2329 let Transaction {
2330 durable_catalog: _,
2331 databases,
2332 schemas,
2333 items,
2334 comments,
2335 roles,
2336 role_auth,
2337 clusters,
2338 network_policies,
2339 cluster_replicas,
2340 introspection_sources,
2341 system_gid_mapping,
2342 system_configurations,
2343 default_privileges,
2344 source_references,
2345 system_privileges,
2346 audit_log_updates,
2347 storage_collection_metadata,
2348 unfinalized_shards,
2349 id_allocator: _,
2351 configs: _,
2352 settings: _,
2353 txn_wal_shard: _,
2354 upper,
2355 op_id: _,
2356 } = &self;
2357
2358 let updates = std::iter::empty()
2359 .chain(get_collection_op_updates(
2360 roles,
2361 StateUpdateKind::Role,
2362 self.op_id,
2363 ))
2364 .chain(get_collection_op_updates(
2365 role_auth,
2366 StateUpdateKind::RoleAuth,
2367 self.op_id,
2368 ))
2369 .chain(get_collection_op_updates(
2370 databases,
2371 StateUpdateKind::Database,
2372 self.op_id,
2373 ))
2374 .chain(get_collection_op_updates(
2375 schemas,
2376 StateUpdateKind::Schema,
2377 self.op_id,
2378 ))
2379 .chain(get_collection_op_updates(
2380 default_privileges,
2381 StateUpdateKind::DefaultPrivilege,
2382 self.op_id,
2383 ))
2384 .chain(get_collection_op_updates(
2385 system_privileges,
2386 StateUpdateKind::SystemPrivilege,
2387 self.op_id,
2388 ))
2389 .chain(get_collection_op_updates(
2390 system_configurations,
2391 StateUpdateKind::SystemConfiguration,
2392 self.op_id,
2393 ))
2394 .chain(get_collection_op_updates(
2395 clusters,
2396 StateUpdateKind::Cluster,
2397 self.op_id,
2398 ))
2399 .chain(get_collection_op_updates(
2400 network_policies,
2401 StateUpdateKind::NetworkPolicy,
2402 self.op_id,
2403 ))
2404 .chain(get_collection_op_updates(
2405 introspection_sources,
2406 StateUpdateKind::IntrospectionSourceIndex,
2407 self.op_id,
2408 ))
2409 .chain(get_collection_op_updates(
2410 cluster_replicas,
2411 StateUpdateKind::ClusterReplica,
2412 self.op_id,
2413 ))
2414 .chain(get_collection_op_updates(
2415 system_gid_mapping,
2416 StateUpdateKind::SystemObjectMapping,
2417 self.op_id,
2418 ))
2419 .chain(get_collection_op_updates(
2420 items,
2421 StateUpdateKind::Item,
2422 self.op_id,
2423 ))
2424 .chain(get_collection_op_updates(
2425 comments,
2426 StateUpdateKind::Comment,
2427 self.op_id,
2428 ))
2429 .chain(get_collection_op_updates(
2430 source_references,
2431 StateUpdateKind::SourceReferences,
2432 self.op_id,
2433 ))
2434 .chain(get_collection_op_updates(
2435 storage_collection_metadata,
2436 StateUpdateKind::StorageCollectionMetadata,
2437 self.op_id,
2438 ))
2439 .chain(get_collection_op_updates(
2440 unfinalized_shards,
2441 StateUpdateKind::UnfinalizedShard,
2442 self.op_id,
2443 ))
2444 .chain(get_large_collection_op_updates(
2445 audit_log_updates,
2446 StateUpdateKind::AuditLog,
2447 self.op_id,
2448 ))
2449 .map(|(kind, diff)| StateUpdate {
2450 kind,
2451 ts: upper.clone(),
2452 diff,
2453 })
2454 .collect();
2455
2456 updates
2457 }
2458
2459 pub fn is_savepoint(&self) -> bool {
2460 self.durable_catalog.is_savepoint()
2461 }
2462
2463 fn commit_op(&mut self) {
2464 self.op_id += 1;
2465 }
2466
2467 pub fn op_id(&self) -> Timestamp {
2468 self.op_id
2469 }
2470
2471 pub fn upper(&self) -> mz_repr::Timestamp {
2472 self.upper
2473 }
2474
2475 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2476 let audit_log_updates = self
2477 .audit_log_updates
2478 .into_iter()
2479 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2480 .collect();
2481
2482 let txn_batch = TransactionBatch {
2483 databases: self.databases.pending(),
2484 schemas: self.schemas.pending(),
2485 items: self.items.pending(),
2486 comments: self.comments.pending(),
2487 roles: self.roles.pending(),
2488 role_auth: self.role_auth.pending(),
2489 clusters: self.clusters.pending(),
2490 cluster_replicas: self.cluster_replicas.pending(),
2491 network_policies: self.network_policies.pending(),
2492 introspection_sources: self.introspection_sources.pending(),
2493 id_allocator: self.id_allocator.pending(),
2494 configs: self.configs.pending(),
2495 source_references: self.source_references.pending(),
2496 settings: self.settings.pending(),
2497 system_gid_mapping: self.system_gid_mapping.pending(),
2498 system_configurations: self.system_configurations.pending(),
2499 default_privileges: self.default_privileges.pending(),
2500 system_privileges: self.system_privileges.pending(),
2501 storage_collection_metadata: self.storage_collection_metadata.pending(),
2502 unfinalized_shards: self.unfinalized_shards.pending(),
2503 txn_wal_shard: self.txn_wal_shard.pending(),
2504 audit_log_updates,
2505 upper: self.upper,
2506 };
2507 (txn_batch, self.durable_catalog)
2508 }
2509
2510 #[mz_ore::instrument(level = "debug")]
2522 pub(crate) async fn commit_internal(
2523 self,
2524 commit_ts: mz_repr::Timestamp,
2525 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2526 let (mut txn_batch, durable_catalog) = self.into_parts();
2527 let TransactionBatch {
2528 databases,
2529 schemas,
2530 items,
2531 comments,
2532 roles,
2533 role_auth,
2534 clusters,
2535 cluster_replicas,
2536 network_policies,
2537 introspection_sources,
2538 id_allocator,
2539 configs,
2540 source_references,
2541 settings,
2542 system_gid_mapping,
2543 system_configurations,
2544 default_privileges,
2545 system_privileges,
2546 storage_collection_metadata,
2547 unfinalized_shards,
2548 txn_wal_shard,
2549 audit_log_updates,
2550 upper: _,
2551 } = &mut txn_batch;
2552 differential_dataflow::consolidation::consolidate_updates(databases);
2555 differential_dataflow::consolidation::consolidate_updates(schemas);
2556 differential_dataflow::consolidation::consolidate_updates(items);
2557 differential_dataflow::consolidation::consolidate_updates(comments);
2558 differential_dataflow::consolidation::consolidate_updates(roles);
2559 differential_dataflow::consolidation::consolidate_updates(role_auth);
2560 differential_dataflow::consolidation::consolidate_updates(clusters);
2561 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2562 differential_dataflow::consolidation::consolidate_updates(network_policies);
2563 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2564 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2565 differential_dataflow::consolidation::consolidate_updates(configs);
2566 differential_dataflow::consolidation::consolidate_updates(settings);
2567 differential_dataflow::consolidation::consolidate_updates(source_references);
2568 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2569 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2570 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2571 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2572 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2573 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2574 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2575 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2576
2577 let upper = durable_catalog
2578 .commit_transaction(txn_batch, commit_ts)
2579 .await?;
2580 Ok((durable_catalog, upper))
2581 }
2582
2583 #[mz_ore::instrument(level = "debug")]
2600 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2601 let op_updates = self.get_op_updates();
2602 assert!(
2603 op_updates.is_empty(),
2604 "unconsumed transaction updates: {op_updates:?}"
2605 );
2606
2607 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2608 let updates = durable_storage.sync_updates(upper).await?;
2610 soft_assert_no_log!(
2615 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2616 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2617 );
2618 Ok(())
2619 }
2620}
2621
2622use crate::durable::async_trait;
2623
2624use super::objects::{RoleAuthKey, RoleAuthValue};
2625
2626#[async_trait]
2627impl StorageTxn<mz_repr::Timestamp> for Transaction<'_> {
2628 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2629 self.storage_collection_metadata
2630 .items()
2631 .into_iter()
2632 .map(
2633 |(
2634 StorageCollectionMetadataKey { id },
2635 StorageCollectionMetadataValue { shard },
2636 )| { (*id, shard.clone()) },
2637 )
2638 .collect()
2639 }
2640
2641 fn insert_collection_metadata(
2642 &mut self,
2643 metadata: BTreeMap<GlobalId, ShardId>,
2644 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2645 for (id, shard) in metadata {
2646 self.storage_collection_metadata
2647 .insert(
2648 StorageCollectionMetadataKey { id },
2649 StorageCollectionMetadataValue {
2650 shard: shard.clone(),
2651 },
2652 self.op_id,
2653 )
2654 .map_err(|err| match err {
2655 DurableCatalogError::DuplicateKey => {
2656 StorageError::CollectionMetadataAlreadyExists(id)
2657 }
2658 DurableCatalogError::UniquenessViolation => {
2659 StorageError::PersistShardAlreadyInUse(shard)
2660 }
2661 err => StorageError::Generic(anyhow::anyhow!(err)),
2662 })?;
2663 }
2664 Ok(())
2665 }
2666
2667 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2668 let ks: Vec<_> = ids
2669 .into_iter()
2670 .map(|id| StorageCollectionMetadataKey { id })
2671 .collect();
2672 self.storage_collection_metadata
2673 .delete_by_keys(ks, self.op_id)
2674 .into_iter()
2675 .map(
2676 |(
2677 StorageCollectionMetadataKey { id },
2678 StorageCollectionMetadataValue { shard },
2679 )| (id, shard),
2680 )
2681 .collect()
2682 }
2683
2684 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2685 self.unfinalized_shards
2686 .items()
2687 .into_iter()
2688 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2689 .collect()
2690 }
2691
2692 fn insert_unfinalized_shards(
2693 &mut self,
2694 s: BTreeSet<ShardId>,
2695 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2696 for shard in s {
2697 match self
2698 .unfinalized_shards
2699 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2700 {
2701 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2703 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2704 };
2705 }
2706 Ok(())
2707 }
2708
2709 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2710 let ks: Vec<_> = shards
2711 .into_iter()
2712 .map(|shard| UnfinalizedShardKey { shard })
2713 .collect();
2714 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2715 }
2716
2717 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2718 self.txn_wal_shard
2719 .values()
2720 .iter()
2721 .next()
2722 .map(|TxnWalShardValue { shard }| *shard)
2723 }
2724
2725 fn write_txn_wal_shard(
2726 &mut self,
2727 shard: ShardId,
2728 ) -> Result<(), StorageError<mz_repr::Timestamp>> {
2729 self.txn_wal_shard
2730 .insert((), TxnWalShardValue { shard }, self.op_id)
2731 .map_err(|err| match err {
2732 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2733 err => StorageError::Generic(anyhow::anyhow!(err)),
2734 })
2735 }
2736}
2737
2738#[derive(Debug, Clone, Default, PartialEq)]
2740pub struct TransactionBatch {
2741 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2742 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2743 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2744 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2745 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2746 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2747 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2748 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2749 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2750 pub(crate) introspection_sources: Vec<(
2751 proto::ClusterIntrospectionSourceIndexKey,
2752 proto::ClusterIntrospectionSourceIndexValue,
2753 Diff,
2754 )>,
2755 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2756 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2757 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2758 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2759 pub(crate) system_configurations: Vec<(
2760 proto::ServerConfigurationKey,
2761 proto::ServerConfigurationValue,
2762 Diff,
2763 )>,
2764 pub(crate) default_privileges: Vec<(
2765 proto::DefaultPrivilegesKey,
2766 proto::DefaultPrivilegesValue,
2767 Diff,
2768 )>,
2769 pub(crate) source_references: Vec<(
2770 proto::SourceReferencesKey,
2771 proto::SourceReferencesValue,
2772 Diff,
2773 )>,
2774 pub(crate) system_privileges: Vec<(
2775 proto::SystemPrivilegesKey,
2776 proto::SystemPrivilegesValue,
2777 Diff,
2778 )>,
2779 pub(crate) storage_collection_metadata: Vec<(
2780 proto::StorageCollectionMetadataKey,
2781 proto::StorageCollectionMetadataValue,
2782 Diff,
2783 )>,
2784 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2785 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2786 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2787 pub(crate) upper: mz_repr::Timestamp,
2789}
2790
2791impl TransactionBatch {
2792 pub fn is_empty(&self) -> bool {
2793 let TransactionBatch {
2794 databases,
2795 schemas,
2796 items,
2797 comments,
2798 roles,
2799 role_auth,
2800 clusters,
2801 cluster_replicas,
2802 network_policies,
2803 introspection_sources,
2804 id_allocator,
2805 configs,
2806 settings,
2807 source_references,
2808 system_gid_mapping,
2809 system_configurations,
2810 default_privileges,
2811 system_privileges,
2812 storage_collection_metadata,
2813 unfinalized_shards,
2814 txn_wal_shard,
2815 audit_log_updates,
2816 upper: _,
2817 } = self;
2818 databases.is_empty()
2819 && schemas.is_empty()
2820 && items.is_empty()
2821 && comments.is_empty()
2822 && roles.is_empty()
2823 && role_auth.is_empty()
2824 && clusters.is_empty()
2825 && cluster_replicas.is_empty()
2826 && network_policies.is_empty()
2827 && introspection_sources.is_empty()
2828 && id_allocator.is_empty()
2829 && configs.is_empty()
2830 && settings.is_empty()
2831 && source_references.is_empty()
2832 && system_gid_mapping.is_empty()
2833 && system_configurations.is_empty()
2834 && default_privileges.is_empty()
2835 && system_privileges.is_empty()
2836 && storage_collection_metadata.is_empty()
2837 && unfinalized_shards.is_empty()
2838 && txn_wal_shard.is_empty()
2839 && audit_log_updates.is_empty()
2840 }
2841}
2842
2843#[derive(Debug, Clone, PartialEq, Eq)]
2844struct TransactionUpdate<V> {
2845 value: V,
2846 ts: Timestamp,
2847 diff: Diff,
2848}
2849
2850trait UniqueName {
2852 const HAS_UNIQUE_NAME: bool;
2855 fn unique_name(&self) -> &str;
2857}
2858
2859mod unique_name {
2860 use crate::durable::objects::*;
2861
2862 macro_rules! impl_unique_name {
2863 ($($t:ty),* $(,)?) => {
2864 $(
2865 impl crate::durable::transaction::UniqueName for $t {
2866 const HAS_UNIQUE_NAME: bool = true;
2867 fn unique_name(&self) -> &str {
2868 &self.name
2869 }
2870 }
2871 )*
2872 };
2873 }
2874
2875 macro_rules! impl_no_unique_name {
2876 ($($t:ty),* $(,)?) => {
2877 $(
2878 impl crate::durable::transaction::UniqueName for $t {
2879 const HAS_UNIQUE_NAME: bool = false;
2880 fn unique_name(&self) -> &str {
2881 ""
2882 }
2883 }
2884 )*
2885 };
2886 }
2887
2888 impl_unique_name! {
2889 ClusterReplicaValue,
2890 ClusterValue,
2891 DatabaseValue,
2892 ItemValue,
2893 NetworkPolicyValue,
2894 RoleValue,
2895 SchemaValue,
2896 }
2897
2898 impl_no_unique_name!(
2899 (),
2900 ClusterIntrospectionSourceIndexValue,
2901 CommentValue,
2902 ConfigValue,
2903 DefaultPrivilegesValue,
2904 GidMappingValue,
2905 IdAllocValue,
2906 ServerConfigurationValue,
2907 SettingValue,
2908 SourceReferencesValue,
2909 StorageCollectionMetadataValue,
2910 SystemPrivilegesValue,
2911 TxnWalShardValue,
2912 RoleAuthValue,
2913 );
2914
2915 #[cfg(test)]
2916 mod test {
2917 impl_no_unique_name!(String,);
2918 }
2919}
2920
2921#[derive(Debug)]
2931struct TableTransaction<K, V> {
2932 initial: BTreeMap<K, V>,
2933 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
2936 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
2937}
2938
2939impl<K, V> TableTransaction<K, V>
2940where
2941 K: Ord + Eq + Clone + Debug,
2942 V: Ord + Clone + Debug + UniqueName,
2943{
2944 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
2951 where
2952 K: RustType<KP>,
2953 V: RustType<VP>,
2954 {
2955 let initial = initial
2956 .into_iter()
2957 .map(RustType::from_proto)
2958 .collect::<Result<_, _>>()?;
2959
2960 Ok(Self {
2961 initial,
2962 pending: BTreeMap::new(),
2963 uniqueness_violation: None,
2964 })
2965 }
2966
2967 fn new_with_uniqueness_fn<KP, VP>(
2970 initial: BTreeMap<KP, VP>,
2971 uniqueness_violation: fn(a: &V, b: &V) -> bool,
2972 ) -> Result<Self, TryFromProtoError>
2973 where
2974 K: RustType<KP>,
2975 V: RustType<VP>,
2976 {
2977 let initial = initial
2978 .into_iter()
2979 .map(RustType::from_proto)
2980 .collect::<Result<_, _>>()?;
2981
2982 Ok(Self {
2983 initial,
2984 pending: BTreeMap::new(),
2985 uniqueness_violation: Some(uniqueness_violation),
2986 })
2987 }
2988
2989 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
2992 where
2993 K: RustType<KP>,
2994 V: RustType<VP>,
2995 {
2996 soft_assert_no_log!(self.verify().is_ok());
2997 self.pending
3000 .into_iter()
3001 .flat_map(|(k, v)| {
3002 let mut v: Vec<_> = v
3003 .into_iter()
3004 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3005 .collect();
3006 differential_dataflow::consolidation::consolidate(&mut v);
3007 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3008 })
3009 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3010 .collect()
3011 }
3012
3013 fn verify(&self) -> Result<(), DurableCatalogError> {
3018 if let Some(uniqueness_violation) = self.uniqueness_violation {
3019 let items = self.values();
3021 if V::HAS_UNIQUE_NAME {
3022 let by_name: BTreeMap<_, _> = items
3023 .iter()
3024 .enumerate()
3025 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3026 .collect();
3027 for (i, vi) in items.iter().enumerate() {
3028 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3029 if i != *j && uniqueness_violation(vi, *vj) {
3030 return Err(DurableCatalogError::UniquenessViolation);
3031 }
3032 }
3033 }
3034 } else {
3035 for (i, vi) in items.iter().enumerate() {
3036 for (j, vj) in items.iter().enumerate() {
3037 if i != j && uniqueness_violation(vi, vj) {
3038 return Err(DurableCatalogError::UniquenessViolation);
3039 }
3040 }
3041 }
3042 }
3043 }
3044 soft_assert_no_log!(
3045 self.pending
3046 .values()
3047 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3048 "pending should be sorted by timestamp: {:?}",
3049 self.pending
3050 );
3051 Ok(())
3052 }
3053
3054 fn verify_keys<'a>(
3059 &self,
3060 keys: impl IntoIterator<Item = &'a K>,
3061 ) -> Result<(), DurableCatalogError>
3062 where
3063 K: 'a,
3064 {
3065 if let Some(uniqueness_violation) = self.uniqueness_violation {
3066 let entries: Vec<_> = keys
3067 .into_iter()
3068 .filter_map(|key| self.get(key).map(|value| (key, value)))
3069 .collect();
3070 for (ki, vi) in self.items() {
3072 for (kj, vj) in &entries {
3073 if ki != *kj && uniqueness_violation(vi, vj) {
3074 return Err(DurableCatalogError::UniquenessViolation);
3075 }
3076 }
3077 }
3078 }
3079 soft_assert_no_log!(self.verify().is_ok());
3080 Ok(())
3081 }
3082
3083 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3086 let mut seen = BTreeSet::new();
3087 for k in self.pending.keys() {
3088 seen.insert(k);
3089 let v = self.get(k);
3090 if let Some(v) = v {
3093 f(k, v);
3094 }
3095 }
3096 for (k, v) in self.initial.iter() {
3097 if !seen.contains(k) {
3099 f(k, v);
3100 }
3101 }
3102 }
3103
3104 fn get(&self, k: &K) -> Option<&V> {
3106 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3107 let mut updates = Vec::with_capacity(pending.len() + 1);
3108 if let Some(initial) = self.initial.get(k) {
3109 updates.push((initial, Diff::ONE));
3110 }
3111 updates.extend(
3112 pending
3113 .into_iter()
3114 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3115 );
3116
3117 differential_dataflow::consolidation::consolidate(&mut updates);
3118 assert!(updates.len() <= 1);
3119 updates.into_iter().next().map(|(v, _)| v)
3120 }
3121
3122 #[cfg(test)]
3127 fn items_cloned(&self) -> BTreeMap<K, V> {
3128 let mut items = BTreeMap::new();
3129 self.for_values(|k, v| {
3130 items.insert(k.clone(), v.clone());
3131 });
3132 items
3133 }
3134
3135 fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3139 where
3140 K: RustType<KP>,
3141 V: RustType<VP>,
3142 KP: Ord,
3143 {
3144 let mut items = BTreeMap::new();
3145 self.for_values(|k, v| {
3146 items.insert(k.into_proto(), v.into_proto());
3147 });
3148 items
3149 }
3150
3151 fn items(&self) -> BTreeMap<&K, &V> {
3154 let mut items = BTreeMap::new();
3155 self.for_values(|k, v| {
3156 items.insert(k, v);
3157 });
3158 items
3159 }
3160
3161 fn values(&self) -> BTreeSet<&V> {
3163 let mut items = BTreeSet::new();
3164 self.for_values(|_, v| {
3165 items.insert(v);
3166 });
3167 items
3168 }
3169
3170 fn len(&self) -> usize {
3172 let mut count = 0;
3173 self.for_values(|_, _| {
3174 count += 1;
3175 });
3176 count
3177 }
3178
3179 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3183 &mut self,
3184 mut f: F,
3185 ) {
3186 let mut pending = BTreeMap::new();
3187 self.for_values(|k, v| f(&mut pending, k, v));
3188 for (k, updates) in pending {
3189 self.pending.entry(k).or_default().extend(updates);
3190 }
3191 }
3192
3193 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3197 let mut violation = None;
3198 self.for_values(|for_k, for_v| {
3199 if &k == for_k {
3200 violation = Some(DurableCatalogError::DuplicateKey);
3201 }
3202 if let Some(uniqueness_violation) = self.uniqueness_violation {
3203 if uniqueness_violation(for_v, &v) {
3204 violation = Some(DurableCatalogError::UniquenessViolation);
3205 }
3206 }
3207 });
3208 if let Some(violation) = violation {
3209 return Err(violation);
3210 }
3211 self.pending.entry(k).or_default().push(TransactionUpdate {
3212 value: v,
3213 ts,
3214 diff: Diff::ONE,
3215 });
3216 soft_assert_no_log!(self.verify().is_ok());
3217 Ok(())
3218 }
3219
3220 fn update<F: Fn(&K, &V) -> Option<V>>(
3229 &mut self,
3230 f: F,
3231 ts: Timestamp,
3232 ) -> Result<Diff, DurableCatalogError> {
3233 let mut changed = Diff::ZERO;
3234 let mut keys = BTreeSet::new();
3235 let pending = self.pending.clone();
3237 self.for_values_mut(|p, k, v| {
3238 if let Some(next) = f(k, v) {
3239 changed += Diff::ONE;
3240 keys.insert(k.clone());
3241 let updates = p.entry(k.clone()).or_default();
3242 updates.push(TransactionUpdate {
3243 value: v.clone(),
3244 ts,
3245 diff: Diff::MINUS_ONE,
3246 });
3247 updates.push(TransactionUpdate {
3248 value: next,
3249 ts,
3250 diff: Diff::ONE,
3251 });
3252 }
3253 });
3254 if let Err(err) = self.verify_keys(&keys) {
3256 self.pending = pending;
3257 Err(err)
3258 } else {
3259 Ok(changed)
3260 }
3261 }
3262
3263 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3268 if let Some(cur_v) = self.get(&k) {
3269 if v != *cur_v {
3270 self.set(k, Some(v), ts)?;
3271 }
3272 Ok(true)
3273 } else {
3274 Ok(false)
3275 }
3276 }
3277
3278 fn update_by_keys(
3283 &mut self,
3284 kvs: impl IntoIterator<Item = (K, V)>,
3285 ts: Timestamp,
3286 ) -> Result<Diff, DurableCatalogError> {
3287 let kvs: Vec<_> = kvs
3288 .into_iter()
3289 .filter_map(|(k, v)| match self.get(&k) {
3290 Some(cur_v) => Some((*cur_v == v, k, v)),
3292 None => None,
3293 })
3294 .collect();
3295 let changed = kvs.len();
3296 let changed =
3297 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3298 let kvs = kvs
3299 .into_iter()
3300 .filter(|(no_op, _, _)| !no_op)
3302 .map(|(_, k, v)| (k, Some(v)))
3303 .collect();
3304 self.set_many(kvs, ts)?;
3305 Ok(changed)
3306 }
3307
3308 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3315 let prev = self.get(&k).cloned();
3316 let entry = self.pending.entry(k.clone()).or_default();
3317 let restore_len = entry.len();
3318
3319 match (v, prev.clone()) {
3320 (Some(v), Some(prev)) => {
3321 entry.push(TransactionUpdate {
3322 value: prev,
3323 ts,
3324 diff: Diff::MINUS_ONE,
3325 });
3326 entry.push(TransactionUpdate {
3327 value: v,
3328 ts,
3329 diff: Diff::ONE,
3330 });
3331 }
3332 (Some(v), None) => {
3333 entry.push(TransactionUpdate {
3334 value: v,
3335 ts,
3336 diff: Diff::ONE,
3337 });
3338 }
3339 (None, Some(prev)) => {
3340 entry.push(TransactionUpdate {
3341 value: prev,
3342 ts,
3343 diff: Diff::MINUS_ONE,
3344 });
3345 }
3346 (None, None) => {}
3347 }
3348
3349 if let Err(err) = self.verify_keys([&k]) {
3351 let pending = self.pending.get_mut(&k).expect("inserted above");
3354 pending.truncate(restore_len);
3355 Err(err)
3356 } else {
3357 Ok(prev)
3358 }
3359 }
3360
3361 fn set_many(
3366 &mut self,
3367 kvs: BTreeMap<K, Option<V>>,
3368 ts: Timestamp,
3369 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3370 if kvs.is_empty() {
3371 return Ok(BTreeMap::new());
3372 }
3373
3374 let mut prevs = BTreeMap::new();
3375 let mut restores = BTreeMap::new();
3376
3377 for (k, v) in kvs {
3378 let prev = self.get(&k).cloned();
3379 let entry = self.pending.entry(k.clone()).or_default();
3380 restores.insert(k.clone(), entry.len());
3381
3382 match (v, prev.clone()) {
3383 (Some(v), Some(prev)) => {
3384 entry.push(TransactionUpdate {
3385 value: prev,
3386 ts,
3387 diff: Diff::MINUS_ONE,
3388 });
3389 entry.push(TransactionUpdate {
3390 value: v,
3391 ts,
3392 diff: Diff::ONE,
3393 });
3394 }
3395 (Some(v), None) => {
3396 entry.push(TransactionUpdate {
3397 value: v,
3398 ts,
3399 diff: Diff::ONE,
3400 });
3401 }
3402 (None, Some(prev)) => {
3403 entry.push(TransactionUpdate {
3404 value: prev,
3405 ts,
3406 diff: Diff::MINUS_ONE,
3407 });
3408 }
3409 (None, None) => {}
3410 }
3411
3412 prevs.insert(k, prev);
3413 }
3414
3415 if let Err(err) = self.verify_keys(prevs.keys()) {
3417 for (k, restore_len) in restores {
3418 let pending = self.pending.get_mut(&k).expect("inserted above");
3421 pending.truncate(restore_len);
3422 }
3423 Err(err)
3424 } else {
3425 Ok(prevs)
3426 }
3427 }
3428
3429 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3435 let mut deleted = Vec::new();
3436 self.for_values_mut(|p, k, v| {
3437 if f(k, v) {
3438 deleted.push((k.clone(), v.clone()));
3439 p.entry(k.clone()).or_default().push(TransactionUpdate {
3440 value: v.clone(),
3441 ts,
3442 diff: Diff::MINUS_ONE,
3443 });
3444 }
3445 });
3446 soft_assert_no_log!(self.verify().is_ok());
3447 deleted
3448 }
3449
3450 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3454 self.set(k, None, ts)
3455 .expect("deleting an entry cannot violate uniqueness")
3456 }
3457
3458 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3462 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3463 let prevs = self
3464 .set_many(kvs, ts)
3465 .expect("deleting entries cannot violate uniqueness");
3466 prevs
3467 .into_iter()
3468 .filter_map(|(k, v)| v.map(|v| (k, v)))
3469 .collect()
3470 }
3471}
3472
3473#[cfg(test)]
3474#[allow(clippy::unwrap_used)]
3475mod tests {
3476 use super::*;
3477
3478 use mz_ore::now::SYSTEM_TIME;
3479 use mz_ore::{assert_none, assert_ok};
3480 use mz_persist_client::cache::PersistClientCache;
3481 use mz_persist_types::PersistLocation;
3482 use semver::Version;
3483
3484 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3485 use crate::memory;
3486
3487 #[mz_ore::test]
3488 fn test_table_transaction_simple() {
3489 fn uniqueness_violation(a: &String, b: &String) -> bool {
3490 a == b
3491 }
3492 let mut table = TableTransaction::new_with_uniqueness_fn(
3493 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3494 uniqueness_violation,
3495 )
3496 .unwrap();
3497
3498 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3501 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3502 assert!(
3503 table
3504 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3505 .is_err()
3506 );
3507 assert!(
3508 table
3509 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3510 .is_err()
3511 );
3512 }
3513
3514 #[mz_ore::test]
3515 fn test_table_transaction() {
3516 fn uniqueness_violation(a: &String, b: &String) -> bool {
3517 a == b
3518 }
3519 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3520
3521 fn commit(
3522 table: &mut BTreeMap<Vec<u8>, String>,
3523 mut pending: Vec<(Vec<u8>, String, Diff)>,
3524 ) {
3525 pending.sort_by(|a, b| a.2.cmp(&b.2));
3527 for (k, v, diff) in pending {
3528 if diff == Diff::MINUS_ONE {
3529 let prev = table.remove(&k);
3530 assert_eq!(prev, Some(v));
3531 } else if diff == Diff::ONE {
3532 let prev = table.insert(k, v);
3533 assert_eq!(prev, None);
3534 } else {
3535 panic!("unexpected diff: {diff}");
3536 }
3537 }
3538 }
3539
3540 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3541 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3542 let mut table_txn =
3543 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3544 assert_eq!(table_txn.items_cloned(), table);
3545 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3546 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3547 assert_eq!(
3548 table_txn.items_cloned(),
3549 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3550 );
3551 assert_eq!(
3552 table_txn
3553 .update(|_k, _v| Some("v3".to_string()), 2)
3554 .unwrap(),
3555 Diff::ONE
3556 );
3557
3558 table_txn
3560 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3561 .unwrap_err();
3562
3563 table_txn
3564 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3565 .unwrap();
3566 assert_eq!(
3567 table_txn.items_cloned(),
3568 BTreeMap::from([
3569 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3570 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3571 ])
3572 );
3573 let err = table_txn
3574 .update(|_k, _v| Some("v1".to_string()), 5)
3575 .unwrap_err();
3576 assert!(
3577 matches!(err, DurableCatalogError::UniquenessViolation),
3578 "unexpected err: {err:?}"
3579 );
3580 let pending = table_txn.pending();
3581 assert_eq!(
3582 pending,
3583 vec![
3584 (
3585 1i64.to_le_bytes().to_vec(),
3586 "v1".to_string(),
3587 Diff::MINUS_ONE
3588 ),
3589 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3590 (
3591 2i64.to_le_bytes().to_vec(),
3592 "v2".to_string(),
3593 Diff::MINUS_ONE
3594 ),
3595 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3596 ]
3597 );
3598 commit(&mut table, pending);
3599 assert_eq!(
3600 table,
3601 BTreeMap::from([
3602 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3603 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3604 ])
3605 );
3606
3607 let mut table_txn =
3608 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3609 assert_eq!(
3611 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3612 1
3613 );
3614 table_txn
3615 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3616 .unwrap();
3617 table_txn
3619 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3620 .unwrap_err();
3621 table_txn
3623 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3624 .unwrap_err();
3625 assert_eq!(
3626 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3627 1
3628 );
3629 table_txn
3631 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3632 .unwrap();
3633 table_txn
3634 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3635 .unwrap();
3636 let pending = table_txn.pending();
3637 assert_eq!(
3638 pending,
3639 vec![
3640 (
3641 1i64.to_le_bytes().to_vec(),
3642 "v3".to_string(),
3643 Diff::MINUS_ONE
3644 ),
3645 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3646 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3647 ]
3648 );
3649 commit(&mut table, pending);
3650 assert_eq!(
3651 table,
3652 BTreeMap::from([
3653 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3654 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3655 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3656 ])
3657 );
3658
3659 let mut table_txn =
3660 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3661 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3662 table_txn
3663 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3664 .unwrap();
3665
3666 commit(&mut table, table_txn.pending());
3667 assert_eq!(
3668 table,
3669 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3670 );
3671
3672 let mut table_txn =
3673 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3674 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3675 table_txn
3676 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3677 .unwrap();
3678 commit(&mut table, table_txn.pending());
3679 assert_eq!(
3680 table,
3681 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3682 );
3683
3684 let mut table_txn =
3686 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3687 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3688 table_txn
3689 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3690 .unwrap();
3691 table_txn
3692 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3693 .unwrap_err();
3694 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3695 table_txn
3696 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3697 .unwrap();
3698 commit(&mut table, table_txn.pending());
3699 assert_eq!(
3700 table.clone().into_iter().collect::<Vec<_>>(),
3701 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3702 );
3703
3704 let mut table_txn =
3706 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3707 table_txn
3709 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3710 .unwrap_err();
3711 table_txn
3712 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3713 .unwrap();
3714 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3715 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3716 let pending = table_txn.pending();
3717 assert_eq!(
3718 pending,
3719 vec![
3720 (
3721 1i64.to_le_bytes().to_vec(),
3722 "v5".to_string(),
3723 Diff::MINUS_ONE
3724 ),
3725 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3726 ]
3727 );
3728 commit(&mut table, pending);
3729 assert_eq!(
3730 table,
3731 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3732 );
3733
3734 let mut table_txn =
3736 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3737 table_txn
3738 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3739 .unwrap();
3740 let pending = table_txn.pending::<Vec<u8>, String>();
3741 assert!(pending.is_empty());
3742
3743 let mut table_txn =
3745 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3746 table_txn
3748 .set_many(
3749 BTreeMap::from([
3750 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3751 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3752 ]),
3753 0,
3754 )
3755 .unwrap_err();
3756 table_txn
3757 .set_many(
3758 BTreeMap::from([
3759 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3760 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3761 ]),
3762 1,
3763 )
3764 .unwrap();
3765 table_txn
3766 .set_many(
3767 BTreeMap::from([
3768 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3769 (3i64.to_le_bytes().to_vec(), None),
3770 ]),
3771 2,
3772 )
3773 .unwrap();
3774 let pending = table_txn.pending();
3775 assert_eq!(
3776 pending,
3777 vec![
3778 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3779 (
3780 3i64.to_le_bytes().to_vec(),
3781 "v6".to_string(),
3782 Diff::MINUS_ONE
3783 ),
3784 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3785 ]
3786 );
3787 commit(&mut table, pending);
3788 assert_eq!(
3789 table,
3790 BTreeMap::from([
3791 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3792 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3793 ])
3794 );
3795
3796 let mut table_txn =
3798 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3799 table_txn
3800 .set_many(
3801 BTreeMap::from([
3802 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3803 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3804 ]),
3805 0,
3806 )
3807 .unwrap();
3808 let pending = table_txn.pending::<Vec<u8>, String>();
3809 assert!(pending.is_empty());
3810 commit(&mut table, pending);
3811 assert_eq!(
3812 table,
3813 BTreeMap::from([
3814 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3815 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3816 ])
3817 );
3818
3819 let mut table_txn =
3821 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3822 table_txn
3824 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3825 .unwrap_err();
3826 assert!(
3827 table_txn
3828 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3829 .unwrap()
3830 );
3831 assert!(
3832 !table_txn
3833 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3834 .unwrap()
3835 );
3836 let pending = table_txn.pending();
3837 assert_eq!(
3838 pending,
3839 vec![
3840 (
3841 1i64.to_le_bytes().to_vec(),
3842 "v6".to_string(),
3843 Diff::MINUS_ONE
3844 ),
3845 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3846 ]
3847 );
3848 commit(&mut table, pending);
3849 assert_eq!(
3850 table,
3851 BTreeMap::from([
3852 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3853 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3854 ])
3855 );
3856
3857 let mut table_txn =
3859 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3860 assert!(
3861 table_txn
3862 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3863 .unwrap()
3864 );
3865 let pending = table_txn.pending::<Vec<u8>, String>();
3866 assert!(pending.is_empty());
3867 commit(&mut table, pending);
3868 assert_eq!(
3869 table,
3870 BTreeMap::from([
3871 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3872 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3873 ])
3874 );
3875
3876 let mut table_txn =
3878 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3879 table_txn
3881 .update_by_keys(
3882 [
3883 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3884 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3885 ],
3886 0,
3887 )
3888 .unwrap_err();
3889 let n = table_txn
3890 .update_by_keys(
3891 [
3892 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3893 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3894 ],
3895 1,
3896 )
3897 .unwrap();
3898 assert_eq!(n, Diff::ONE);
3899 let n = table_txn
3900 .update_by_keys(
3901 [
3902 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3903 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3904 ],
3905 2,
3906 )
3907 .unwrap();
3908 assert_eq!(n, Diff::ZERO);
3909 let pending = table_txn.pending();
3910 assert_eq!(
3911 pending,
3912 vec![
3913 (
3914 1i64.to_le_bytes().to_vec(),
3915 "v8".to_string(),
3916 Diff::MINUS_ONE
3917 ),
3918 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
3919 ]
3920 );
3921 commit(&mut table, pending);
3922 assert_eq!(
3923 table,
3924 BTreeMap::from([
3925 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3926 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3927 ])
3928 );
3929
3930 let mut table_txn =
3932 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3933 let n = table_txn
3934 .update_by_keys(
3935 [
3936 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3937 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
3938 ],
3939 0,
3940 )
3941 .unwrap();
3942 assert_eq!(n, Diff::from(2));
3943 let pending = table_txn.pending::<Vec<u8>, String>();
3944 assert!(pending.is_empty());
3945 commit(&mut table, pending);
3946 assert_eq!(
3947 table,
3948 BTreeMap::from([
3949 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3950 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3951 ])
3952 );
3953
3954 let mut table_txn =
3956 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3957 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
3958 assert_eq!(prev, Some("v9".to_string()));
3959 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
3960 assert_none!(prev);
3961 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
3962 assert_none!(prev);
3963 let pending = table_txn.pending();
3964 assert_eq!(
3965 pending,
3966 vec![(
3967 1i64.to_le_bytes().to_vec(),
3968 "v9".to_string(),
3969 Diff::MINUS_ONE
3970 ),]
3971 );
3972 commit(&mut table, pending);
3973 assert_eq!(
3974 table,
3975 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
3976 );
3977
3978 let mut table_txn =
3980 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3981 let prevs = table_txn.delete_by_keys(
3982 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3983 0,
3984 );
3985 assert_eq!(
3986 prevs,
3987 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
3988 );
3989 let prevs = table_txn.delete_by_keys(
3990 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3991 1,
3992 );
3993 assert_eq!(prevs, vec![]);
3994 let prevs = table_txn.delete_by_keys(
3995 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
3996 2,
3997 );
3998 assert_eq!(prevs, vec![]);
3999 let pending = table_txn.pending();
4000 assert_eq!(
4001 pending,
4002 vec![(
4003 42i64.to_le_bytes().to_vec(),
4004 "v7".to_string(),
4005 Diff::MINUS_ONE
4006 ),]
4007 );
4008 commit(&mut table, pending);
4009 assert_eq!(table, BTreeMap::new());
4010 }
4011
4012 #[mz_ore::test(tokio::test)]
4013 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
4015 const VERSION: Version = Version::new(26, 0, 0);
4016 let mut persist_cache = PersistClientCache::new_no_metrics();
4017 persist_cache.cfg.build_version = VERSION;
4018 let persist_client = persist_cache
4019 .open(PersistLocation::new_in_mem())
4020 .await
4021 .unwrap();
4022 let state_builder = TestCatalogStateBuilder::new(persist_client)
4023 .with_default_deploy_generation()
4024 .with_version(VERSION);
4025
4026 let _ = state_builder
4028 .clone()
4029 .unwrap_build()
4030 .await
4031 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4032 .await
4033 .unwrap()
4034 .0;
4035 let mut savepoint_state = state_builder
4036 .unwrap_build()
4037 .await
4038 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4039 .await
4040 .unwrap()
4041 .0;
4042
4043 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4044 assert!(!initial_snapshot.is_empty());
4045
4046 let db_name = "db";
4047 let db_owner = RoleId::User(42);
4048 let db_privileges = Vec::new();
4049 let mut txn = savepoint_state.transaction().await.unwrap();
4050 let (db_id, db_oid) = txn
4051 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4052 .unwrap();
4053 let commit_ts = txn.upper();
4054 txn.commit_internal(commit_ts).await.unwrap();
4055 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4056 let update = updates.into_element();
4057
4058 assert_eq!(update.diff, StateDiff::Addition);
4059
4060 let db = match update.kind {
4061 memory::objects::StateUpdateKind::Database(db) => db,
4062 update => panic!("unexpected update: {update:?}"),
4063 };
4064
4065 assert_eq!(db_id, db.id);
4066 assert_eq!(db_oid, db.oid);
4067 assert_eq!(db_name, db.name);
4068 assert_eq!(db_owner, db.owner_id);
4069 assert_eq!(db_privileges, db.privileges);
4070 }
4071
4072 #[mz_ore::test]
4073 fn test_allocate_introspection_source_index_id() {
4074 let cluster_variant: u8 = 0b0000_0001;
4075 let cluster_id_inner: u64 =
4076 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4077 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4078
4079 let cluster_id = ClusterId::System(cluster_id_inner);
4080 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4081
4082 let introspection_source_index_id: u64 =
4083 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4084
4085 {
4087 let mut cluster_variant_mask = 0xFF << 56;
4088 cluster_variant_mask &= introspection_source_index_id;
4089 cluster_variant_mask >>= 56;
4090 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4091 }
4092
4093 {
4095 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4096 cluster_id_inner_mask &= introspection_source_index_id;
4097 cluster_id_inner_mask >>= 8;
4098 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4099 }
4100
4101 {
4103 let mut log_variant_mask = 0xFF;
4104 log_variant_mask &= introspection_source_index_id;
4105 assert_eq!(
4106 log_variant_mask,
4107 u64::from(timely_messages_received_log_variant)
4108 );
4109 }
4110
4111 let (catalog_item_id, global_id) =
4112 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4113
4114 assert_eq!(
4115 catalog_item_id,
4116 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4117 );
4118 assert_eq!(
4119 global_id,
4120 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4121 );
4122 }
4123}