1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterSystemConfiguration, ClusterSystemConfigurationKey,
54 ClusterSystemConfigurationValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey,
55 ConfigValue, Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey,
56 DefaultPrivilegesValue, DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
57 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
58 ReplicaConfig, ReplicaSystemConfiguration, ReplicaSystemConfigurationKey,
59 ReplicaSystemConfigurationValue, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
60 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
61 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
62 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
63 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
64};
65use crate::durable::{
66 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
67 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
68 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
69 SCHEMA_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
70 SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
71 USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
72};
73use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
74
75type Timestamp = u64;
76
77#[derive(Derivative)]
80#[derivative(Debug)]
81pub struct Transaction<'a> {
82 #[derivative(Debug = "ignore")]
83 #[derivative(PartialEq = "ignore")]
84 durable_catalog: &'a mut dyn DurableCatalogState,
85 databases: TableTransaction<DatabaseKey, DatabaseValue>,
86 schemas: TableTransaction<SchemaKey, SchemaValue>,
87 items: TableTransaction<ItemKey, ItemValue>,
88 comments: TableTransaction<CommentKey, CommentValue>,
89 roles: TableTransaction<RoleKey, RoleValue>,
90 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
91 clusters: TableTransaction<ClusterKey, ClusterValue>,
92 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
93 introspection_sources:
94 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
95 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
96 configs: TableTransaction<ConfigKey, ConfigValue>,
97 settings: TableTransaction<SettingKey, SettingValue>,
98 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
99 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
100 cluster_system_configurations:
101 TableTransaction<ClusterSystemConfigurationKey, ClusterSystemConfigurationValue>,
102 replica_system_configurations:
103 TableTransaction<ReplicaSystemConfigurationKey, ReplicaSystemConfigurationValue>,
104 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
105 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
106 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
107 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
108 storage_collection_metadata:
109 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
110 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
111 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
112 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
115 upper: mz_repr::Timestamp,
117 op_id: Timestamp,
119}
120
121impl<'a> Transaction<'a> {
122 pub fn new(
123 durable_catalog: &'a mut dyn DurableCatalogState,
124 Snapshot {
125 databases,
126 schemas,
127 roles,
128 role_auth,
129 items,
130 comments,
131 clusters,
132 network_policies,
133 cluster_replicas,
134 introspection_sources,
135 id_allocator,
136 configs,
137 settings,
138 source_references,
139 system_object_mappings,
140 system_configurations,
141 cluster_system_configurations,
142 replica_system_configurations,
143 default_privileges,
144 system_privileges,
145 storage_collection_metadata,
146 unfinalized_shards,
147 txn_wal_shard,
148 }: Snapshot,
149 upper: mz_repr::Timestamp,
150 ) -> Result<Transaction<'a>, CatalogError> {
151 Ok(Transaction {
152 durable_catalog,
153 databases: TableTransaction::new_with_uniqueness_fn(
154 databases,
155 |a: &DatabaseValue, b| a.name == b.name,
156 )?,
157 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
158 a.database_id == b.database_id && a.name == b.name
159 })?,
160 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
161 a.schema_id == b.schema_id && a.name == b.name && {
162 let a_type = a.item_type();
164 let b_type = b.item_type();
165 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
166 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
167 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
168 }
169 })?,
170 comments: TableTransaction::new(comments)?,
171 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
172 a.name == b.name
173 })?,
174 role_auth: TableTransaction::new(role_auth)?,
175 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
176 a.name == b.name
177 })?,
178 network_policies: TableTransaction::new_with_uniqueness_fn(
179 network_policies,
180 |a: &NetworkPolicyValue, b| a.name == b.name,
181 )?,
182 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
183 cluster_replicas,
184 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
185 )?,
186 introspection_sources: TableTransaction::new(introspection_sources)?,
187 id_allocator: TableTransaction::new(id_allocator)?,
188 configs: TableTransaction::new(configs)?,
189 settings: TableTransaction::new(settings)?,
190 source_references: TableTransaction::new(source_references)?,
191 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
192 system_configurations: TableTransaction::new(system_configurations)?,
193 cluster_system_configurations: TableTransaction::new(cluster_system_configurations)?,
194 replica_system_configurations: TableTransaction::new(replica_system_configurations)?,
195 default_privileges: TableTransaction::new(default_privileges)?,
196 system_privileges: TableTransaction::new(system_privileges)?,
197 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
198 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
199 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
203 audit_log_updates: Vec::new(),
204 upper,
205 op_id: 0,
206 })
207 }
208
209 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
210 let key = ItemKey { id: *id };
211 self.items
212 .get(&key)
213 .map(|v| DurableType::from_key_value(key, v.clone()))
214 }
215
216 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
217 self.items
218 .items()
219 .into_iter()
220 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
221 .sorted_by_key(|Item { id, .. }| *id)
222 }
223
224 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
225 self.insert_audit_log_events([event]);
226 }
227
228 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
229 let events = events
230 .into_iter()
231 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
232 self.audit_log_updates.extend(events);
233 }
234
235 pub fn insert_user_database(
236 &mut self,
237 database_name: &str,
238 owner_id: RoleId,
239 privileges: Vec<MzAclItem>,
240 temporary_oids: &HashSet<u32>,
241 ) -> Result<(DatabaseId, u32), CatalogError> {
242 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
243 let id = DatabaseId::User(id);
244 let oid = self.allocate_oid(temporary_oids)?;
245 self.insert_database(id, database_name, owner_id, privileges, oid)?;
246 Ok((id, oid))
247 }
248
249 pub(crate) fn insert_database(
250 &mut self,
251 id: DatabaseId,
252 database_name: &str,
253 owner_id: RoleId,
254 privileges: Vec<MzAclItem>,
255 oid: u32,
256 ) -> Result<u32, CatalogError> {
257 match self.databases.insert(
258 DatabaseKey { id },
259 DatabaseValue {
260 name: database_name.to_string(),
261 owner_id,
262 privileges,
263 oid,
264 },
265 self.op_id,
266 ) {
267 Ok(_) => Ok(oid),
268 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
269 }
270 }
271
272 pub fn insert_user_schema(
273 &mut self,
274 database_id: DatabaseId,
275 schema_name: &str,
276 owner_id: RoleId,
277 privileges: Vec<MzAclItem>,
278 temporary_oids: &HashSet<u32>,
279 ) -> Result<(SchemaId, u32), CatalogError> {
280 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
281 let id = SchemaId::User(id);
282 let oid = self.allocate_oid(temporary_oids)?;
283 self.insert_schema(
284 id,
285 Some(database_id),
286 schema_name.to_string(),
287 owner_id,
288 privileges,
289 oid,
290 )?;
291 Ok((id, oid))
292 }
293
294 pub fn insert_system_schema(
295 &mut self,
296 schema_id: u64,
297 schema_name: &str,
298 owner_id: RoleId,
299 privileges: Vec<MzAclItem>,
300 oid: u32,
301 ) -> Result<(), CatalogError> {
302 let id = SchemaId::System(schema_id);
303 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
304 }
305
306 pub(crate) fn insert_schema(
307 &mut self,
308 schema_id: SchemaId,
309 database_id: Option<DatabaseId>,
310 schema_name: String,
311 owner_id: RoleId,
312 privileges: Vec<MzAclItem>,
313 oid: u32,
314 ) -> Result<(), CatalogError> {
315 match self.schemas.insert(
316 SchemaKey { id: schema_id },
317 SchemaValue {
318 database_id,
319 name: schema_name.clone(),
320 owner_id,
321 privileges,
322 oid,
323 },
324 self.op_id,
325 ) {
326 Ok(_) => Ok(()),
327 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
328 }
329 }
330
331 pub fn insert_builtin_role(
332 &mut self,
333 id: RoleId,
334 name: String,
335 attributes: RoleAttributesRaw,
336 membership: RoleMembership,
337 vars: RoleVars,
338 oid: u32,
339 ) -> Result<RoleId, CatalogError> {
340 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
341 self.insert_role(id, name, attributes, membership, vars, oid)?;
342 Ok(id)
343 }
344
345 pub fn insert_user_role(
346 &mut self,
347 name: String,
348 attributes: RoleAttributesRaw,
349 membership: RoleMembership,
350 vars: RoleVars,
351 temporary_oids: &HashSet<u32>,
352 ) -> Result<(RoleId, u32), CatalogError> {
353 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
354 let id = RoleId::User(id);
355 let oid = self.allocate_oid(temporary_oids)?;
356 self.insert_role(id, name, attributes, membership, vars, oid)?;
357 Ok((id, oid))
358 }
359
360 fn insert_role(
361 &mut self,
362 id: RoleId,
363 name: String,
364 attributes: RoleAttributesRaw,
365 membership: RoleMembership,
366 vars: RoleVars,
367 oid: u32,
368 ) -> Result<(), CatalogError> {
369 if let Some(ref password) = attributes.password {
370 let hash = mz_auth::hash::scram256_hash(
371 password,
372 &attributes
373 .scram_iterations
374 .or_else(|| {
375 soft_panic_or_log!(
376 "Hash iterations must be set if a password is provided."
377 );
378 None
379 })
380 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
383 )
384 .expect("password hash should be valid");
385 match self.role_auth.insert(
386 RoleAuthKey { role_id: id },
387 RoleAuthValue {
388 password_hash: Some(hash),
389 updated_at: SYSTEM_TIME(),
390 },
391 self.op_id,
392 ) {
393 Ok(_) => {}
394 Err(_) => {
395 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
396 }
397 }
398 }
399
400 match self.roles.insert(
401 RoleKey { id },
402 RoleValue {
403 name: name.clone(),
404 attributes: attributes.into(),
405 membership,
406 vars,
407 oid,
408 },
409 self.op_id,
410 ) {
411 Ok(_) => Ok(()),
412 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
413 }
414 }
415
416 pub fn insert_user_cluster(
418 &mut self,
419 cluster_id: ClusterId,
420 cluster_name: &str,
421 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
422 owner_id: RoleId,
423 privileges: Vec<MzAclItem>,
424 config: ClusterConfig,
425 temporary_oids: &HashSet<u32>,
426 ) -> Result<(), CatalogError> {
427 self.insert_cluster(
428 cluster_id,
429 cluster_name,
430 introspection_source_indexes,
431 owner_id,
432 privileges,
433 config,
434 temporary_oids,
435 )
436 }
437
438 pub fn insert_system_cluster(
440 &mut self,
441 cluster_name: &str,
442 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
443 privileges: Vec<MzAclItem>,
444 owner_id: RoleId,
445 config: ClusterConfig,
446 temporary_oids: &HashSet<u32>,
447 ) -> Result<ClusterId, CatalogError> {
448 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
449 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
450 self.insert_cluster(
451 cluster_id,
452 cluster_name,
453 introspection_source_indexes,
454 owner_id,
455 privileges,
456 config,
457 temporary_oids,
458 )?;
459 Ok(cluster_id)
460 }
461
462 fn insert_cluster(
463 &mut self,
464 cluster_id: ClusterId,
465 cluster_name: &str,
466 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
467 owner_id: RoleId,
468 privileges: Vec<MzAclItem>,
469 config: ClusterConfig,
470 temporary_oids: &HashSet<u32>,
471 ) -> Result<(), CatalogError> {
472 if let Err(_) = self.clusters.insert(
473 ClusterKey { id: cluster_id },
474 ClusterValue {
475 name: cluster_name.to_string(),
476 owner_id,
477 privileges,
478 config,
479 },
480 self.op_id,
481 ) {
482 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
483 };
484
485 let amount = usize_to_u64(introspection_source_indexes.len());
486 let oids = self.allocate_oids(amount, temporary_oids)?;
487 let introspection_source_indexes: Vec<_> = introspection_source_indexes
488 .into_iter()
489 .zip_eq(oids)
490 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
491 .collect();
492 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
493 let introspection_source_index = IntrospectionSourceIndex {
494 cluster_id,
495 name: builtin.name.to_string(),
496 item_id,
497 index_id,
498 oid,
499 };
500 let (key, value) = introspection_source_index.into_key_value();
501 self.introspection_sources
502 .insert(key, value, self.op_id)
503 .expect("no uniqueness violation");
504 }
505
506 Ok(())
507 }
508
509 pub fn rename_cluster(
510 &mut self,
511 cluster_id: ClusterId,
512 cluster_name: &str,
513 cluster_to_name: &str,
514 ) -> Result<(), CatalogError> {
515 let key = ClusterKey { id: cluster_id };
516
517 match self.clusters.update(
518 |k, v| {
519 if *k == key {
520 let mut value = v.clone();
521 value.name = cluster_to_name.to_string();
522 Some(value)
523 } else {
524 None
525 }
526 },
527 self.op_id,
528 )? {
529 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
530 Diff::ONE => Ok(()),
531 n => panic!(
532 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
533 ),
534 }
535 }
536
537 pub fn rename_cluster_replica(
538 &mut self,
539 replica_id: ReplicaId,
540 replica_name: &QualifiedReplica,
541 replica_to_name: &str,
542 ) -> Result<(), CatalogError> {
543 let key = ClusterReplicaKey { id: replica_id };
544
545 match self.cluster_replicas.update(
546 |k, v| {
547 if *k == key {
548 let mut value = v.clone();
549 value.name = replica_to_name.to_string();
550 Some(value)
551 } else {
552 None
553 }
554 },
555 self.op_id,
556 )? {
557 Diff::ZERO => {
558 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
559 }
560 Diff::ONE => Ok(()),
561 n => panic!(
562 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
563 ),
564 }
565 }
566
567 pub fn insert_cluster_replica(
568 &mut self,
569 cluster_id: ClusterId,
570 replica_name: &str,
571 config: ReplicaConfig,
572 owner_id: RoleId,
573 ) -> Result<ReplicaId, CatalogError> {
574 let replica_id = match cluster_id {
575 ClusterId::System(_) => self.allocate_system_replica_id()?,
576 ClusterId::User(_) => self.allocate_user_replica_id()?,
577 };
578 self.insert_cluster_replica_with_id(
579 cluster_id,
580 replica_id,
581 replica_name,
582 config,
583 owner_id,
584 )?;
585 Ok(replica_id)
586 }
587
588 pub(crate) fn insert_cluster_replica_with_id(
589 &mut self,
590 cluster_id: ClusterId,
591 replica_id: ReplicaId,
592 replica_name: &str,
593 config: ReplicaConfig,
594 owner_id: RoleId,
595 ) -> Result<(), CatalogError> {
596 if let Err(_) = self.cluster_replicas.insert(
597 ClusterReplicaKey { id: replica_id },
598 ClusterReplicaValue {
599 cluster_id,
600 name: replica_name.into(),
601 config,
602 owner_id,
603 },
604 self.op_id,
605 ) {
606 let cluster = self
607 .clusters
608 .get(&ClusterKey { id: cluster_id })
609 .expect("cluster exists");
610 return Err(SqlCatalogError::DuplicateReplica(
611 replica_name.to_string(),
612 cluster.name.to_string(),
613 )
614 .into());
615 };
616 Ok(())
617 }
618
619 pub fn insert_user_network_policy(
620 &mut self,
621 name: String,
622 rules: Vec<NetworkPolicyRule>,
623 privileges: Vec<MzAclItem>,
624 owner_id: RoleId,
625 temporary_oids: &HashSet<u32>,
626 ) -> Result<NetworkPolicyId, CatalogError> {
627 let oid = self.allocate_oid(temporary_oids)?;
628 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
629 let id = NetworkPolicyId::User(id);
630 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
631 }
632
633 pub fn insert_network_policy(
634 &mut self,
635 id: NetworkPolicyId,
636 name: String,
637 rules: Vec<NetworkPolicyRule>,
638 privileges: Vec<MzAclItem>,
639 owner_id: RoleId,
640 oid: u32,
641 ) -> Result<NetworkPolicyId, CatalogError> {
642 match self.network_policies.insert(
643 NetworkPolicyKey { id },
644 NetworkPolicyValue {
645 name: name.clone(),
646 rules,
647 privileges,
648 owner_id,
649 oid,
650 },
651 self.op_id,
652 ) {
653 Ok(_) => Ok(id),
654 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
655 }
656 }
657
658 pub fn update_introspection_source_index_gids(
663 &mut self,
664 mappings: impl Iterator<
665 Item = (
666 ClusterId,
667 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
668 ),
669 >,
670 ) -> Result<(), CatalogError> {
671 for (cluster_id, updates) in mappings {
672 for (name, item_id, index_id, oid) in updates {
673 let introspection_source_index = IntrospectionSourceIndex {
674 cluster_id,
675 name,
676 item_id,
677 index_id,
678 oid,
679 };
680 let (key, value) = introspection_source_index.into_key_value();
681
682 let prev = self
683 .introspection_sources
684 .set(key, Some(value), self.op_id)?;
685 if prev.is_none() {
686 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
687 "{index_id}"
688 ))
689 .into());
690 }
691 }
692 }
693 Ok(())
694 }
695
696 pub fn insert_user_item(
697 &mut self,
698 id: CatalogItemId,
699 global_id: GlobalId,
700 schema_id: SchemaId,
701 item_name: &str,
702 create_sql: String,
703 owner_id: RoleId,
704 privileges: Vec<MzAclItem>,
705 temporary_oids: &HashSet<u32>,
706 versions: BTreeMap<RelationVersion, GlobalId>,
707 ) -> Result<u32, CatalogError> {
708 let oid = self.allocate_oid(temporary_oids)?;
709 self.insert_item(
710 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
711 )?;
712 Ok(oid)
713 }
714
715 pub fn insert_item(
716 &mut self,
717 id: CatalogItemId,
718 oid: u32,
719 global_id: GlobalId,
720 schema_id: SchemaId,
721 item_name: &str,
722 create_sql: String,
723 owner_id: RoleId,
724 privileges: Vec<MzAclItem>,
725 extra_versions: BTreeMap<RelationVersion, GlobalId>,
726 ) -> Result<(), CatalogError> {
727 match self.items.insert(
728 ItemKey { id },
729 ItemValue {
730 schema_id,
731 name: item_name.to_string(),
732 create_sql,
733 owner_id,
734 privileges,
735 oid,
736 global_id,
737 extra_versions,
738 },
739 self.op_id,
740 ) {
741 Ok(_) => Ok(()),
742 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
743 }
744 }
745
746 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
747 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
748 }
749
750 pub fn get_and_increment_id_by(
751 &mut self,
752 key: String,
753 amount: u64,
754 ) -> Result<Vec<u64>, CatalogError> {
755 assert!(
756 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
757 "system item IDs cannot be allocated outside of bootstrap"
758 );
759
760 let current_id = self
761 .id_allocator
762 .items()
763 .get(&IdAllocKey { name: key.clone() })
764 .unwrap_or_else(|| panic!("{key} id allocator missing"))
765 .next_id;
766 let next_id = current_id
767 .checked_add(amount)
768 .ok_or(SqlCatalogError::IdExhaustion)?;
769 let prev = self.id_allocator.set(
770 IdAllocKey { name: key },
771 Some(IdAllocValue { next_id }),
772 self.op_id,
773 )?;
774 assert_eq!(
775 prev,
776 Some(IdAllocValue {
777 next_id: current_id
778 })
779 );
780 Ok((current_id..next_id).collect())
781 }
782
783 pub fn allocate_system_item_ids(
784 &mut self,
785 amount: u64,
786 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
787 assert!(
788 !self.durable_catalog.is_bootstrap_complete(),
789 "we can only allocate system item IDs during bootstrap"
790 );
791 Ok(self
792 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
793 .into_iter()
794 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
796 .collect())
797 }
798
799 pub fn allocate_introspection_source_index_id(
831 cluster_id: &ClusterId,
832 log_variant: LogVariant,
833 ) -> (CatalogItemId, GlobalId) {
834 let cluster_variant: u8 = match cluster_id {
835 ClusterId::System(_) => 1,
836 ClusterId::User(_) => 2,
837 };
838 let cluster_id: u64 = cluster_id.inner_id();
839 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
840 assert_eq!(
841 CLUSTER_ID_MASK & cluster_id,
842 0,
843 "invalid cluster ID: {cluster_id}"
844 );
845 let log_variant: u8 = match log_variant {
846 LogVariant::Timely(TimelyLog::Operates) => 1,
847 LogVariant::Timely(TimelyLog::Channels) => 2,
848 LogVariant::Timely(TimelyLog::Elapsed) => 3,
849 LogVariant::Timely(TimelyLog::Histogram) => 4,
850 LogVariant::Timely(TimelyLog::Addresses) => 5,
851 LogVariant::Timely(TimelyLog::Parks) => 6,
852 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
853 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
854 LogVariant::Timely(TimelyLog::Reachability) => 9,
855 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
856 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
857 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
858 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
859 LogVariant::Differential(DifferentialLog::Sharing) => 14,
860 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
861 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
862 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
863 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
864 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
865 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
866 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
867 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
868 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
869 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
870 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
871 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
872 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
873 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
874 LogVariant::Compute(ComputeLog::LirMapping) => 30,
875 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
876 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
877 LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
878 };
879
880 let mut id: u64 = u64::from(cluster_variant) << 56;
881 id |= cluster_id << 8;
882 id |= u64::from(log_variant);
883
884 (
885 CatalogItemId::IntrospectionSourceIndex(id),
886 GlobalId::IntrospectionSourceIndex(id),
887 )
888 }
889
890 pub fn allocate_user_item_ids(
891 &mut self,
892 amount: u64,
893 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
894 Ok(self
895 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
896 .into_iter()
897 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
899 .collect())
900 }
901
902 pub fn allocate_user_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
903 let id = self.get_and_increment_id(USER_REPLICA_ID_ALLOC_KEY.to_string())?;
904 Ok(ReplicaId::User(id))
905 }
906
907 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
908 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
909 Ok(ReplicaId::System(id))
910 }
911
912 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
913 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
914 }
915
916 #[mz_ore::instrument]
919 fn allocate_oids(
920 &mut self,
921 amount: u64,
922 temporary_oids: &HashSet<u32>,
923 ) -> Result<Vec<u32>, CatalogError> {
924 struct UserOid(u32);
927
928 impl UserOid {
929 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
930 if oid < FIRST_USER_OID {
931 Err(anyhow!("invalid user OID {oid}"))
932 } else {
933 Ok(UserOid(oid))
934 }
935 }
936 }
937
938 impl std::ops::AddAssign<u32> for UserOid {
939 fn add_assign(&mut self, rhs: u32) {
940 let (res, overflow) = self.0.overflowing_add(rhs);
941 self.0 = if overflow { FIRST_USER_OID + res } else { res };
942 }
943 }
944
945 if amount > u32::MAX.into() {
946 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
947 }
948
949 let mut allocated_oids = HashSet::with_capacity(
955 self.databases.len()
956 + self.schemas.len()
957 + self.roles.len()
958 + self.items.len()
959 + self.introspection_sources.len()
960 + temporary_oids.len(),
961 );
962 self.databases.for_values(|_, value| {
963 allocated_oids.insert(value.oid);
964 });
965 self.schemas.for_values(|_, value| {
966 allocated_oids.insert(value.oid);
967 });
968 self.roles.for_values(|_, value| {
969 allocated_oids.insert(value.oid);
970 });
971 self.items.for_values(|_, value| {
972 allocated_oids.insert(value.oid);
973 });
974 self.introspection_sources.for_values(|_, value| {
975 allocated_oids.insert(value.oid);
976 });
977
978 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
979
980 let start_oid: u32 = self
981 .id_allocator
982 .items()
983 .get(&IdAllocKey {
984 name: OID_ALLOC_KEY.to_string(),
985 })
986 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
987 .next_id
988 .try_into()
989 .expect("we should never persist an oid outside of the u32 range");
990 let mut current_oid = UserOid::new(start_oid)
991 .expect("we should never persist an oid outside of user OID range");
992 let mut oids = Vec::new();
993 while oids.len() < u64_to_usize(amount) {
994 if !is_allocated(current_oid.0) {
995 oids.push(current_oid.0);
996 }
997 current_oid += 1;
998
999 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
1000 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
1002 }
1003 }
1004
1005 let next_id = current_oid.0;
1006 let prev = self.id_allocator.set(
1007 IdAllocKey {
1008 name: OID_ALLOC_KEY.to_string(),
1009 },
1010 Some(IdAllocValue {
1011 next_id: next_id.into(),
1012 }),
1013 self.op_id,
1014 )?;
1015 assert_eq!(
1016 prev,
1017 Some(IdAllocValue {
1018 next_id: start_oid.into(),
1019 })
1020 );
1021
1022 Ok(oids)
1023 }
1024
1025 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1028 self.allocate_oids(1, temporary_oids)
1029 .map(|oids| oids.into_element())
1030 }
1031
1032 pub fn current_snapshot(&self) -> Snapshot {
1040 Snapshot {
1041 databases: self.databases.current_items_proto(),
1042 schemas: self.schemas.current_items_proto(),
1043 roles: self.roles.current_items_proto(),
1044 role_auth: self.role_auth.current_items_proto(),
1045 items: self.items.current_items_proto(),
1046 comments: self.comments.current_items_proto(),
1047 clusters: self.clusters.current_items_proto(),
1048 network_policies: self.network_policies.current_items_proto(),
1049 cluster_replicas: self.cluster_replicas.current_items_proto(),
1050 introspection_sources: self.introspection_sources.current_items_proto(),
1051 id_allocator: self.id_allocator.current_items_proto(),
1052 configs: self.configs.current_items_proto(),
1053 settings: self.settings.current_items_proto(),
1054 system_object_mappings: self.system_gid_mapping.current_items_proto(),
1055 system_configurations: self.system_configurations.current_items_proto(),
1056 cluster_system_configurations: self.cluster_system_configurations.current_items_proto(),
1057 replica_system_configurations: self.replica_system_configurations.current_items_proto(),
1058 default_privileges: self.default_privileges.current_items_proto(),
1059 source_references: self.source_references.current_items_proto(),
1060 system_privileges: self.system_privileges.current_items_proto(),
1061 storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1062 unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1063 txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1064 }
1065 }
1066
1067 pub(crate) fn insert_id_allocator(
1068 &mut self,
1069 name: String,
1070 next_id: u64,
1071 ) -> Result<(), CatalogError> {
1072 match self.id_allocator.insert(
1073 IdAllocKey { name: name.clone() },
1074 IdAllocValue { next_id },
1075 self.op_id,
1076 ) {
1077 Ok(_) => Ok(()),
1078 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1079 }
1080 }
1081
1082 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1089 let prev = self
1090 .databases
1091 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1092 if prev.is_some() {
1093 Ok(())
1094 } else {
1095 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1096 }
1097 }
1098
1099 pub fn remove_databases(
1106 &mut self,
1107 databases: &BTreeSet<DatabaseId>,
1108 ) -> Result<(), CatalogError> {
1109 if databases.is_empty() {
1110 return Ok(());
1111 }
1112
1113 let to_remove = databases
1114 .iter()
1115 .map(|id| (DatabaseKey { id: *id }, None))
1116 .collect();
1117 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1118 prev.retain(|_k, val| val.is_none());
1119
1120 if !prev.is_empty() {
1121 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1122 return Err(SqlCatalogError::UnknownDatabase(err).into());
1123 }
1124
1125 Ok(())
1126 }
1127
1128 pub fn remove_schema(
1135 &mut self,
1136 database_id: &Option<DatabaseId>,
1137 schema_id: &SchemaId,
1138 ) -> Result<(), CatalogError> {
1139 let prev = self
1140 .schemas
1141 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1142 if prev.is_some() {
1143 Ok(())
1144 } else {
1145 let database_name = match database_id {
1146 Some(id) => format!("{id}."),
1147 None => "".to_string(),
1148 };
1149 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1150 }
1151 }
1152
1153 pub fn remove_schemas(
1160 &mut self,
1161 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1162 ) -> Result<(), CatalogError> {
1163 if schemas.is_empty() {
1164 return Ok(());
1165 }
1166
1167 let to_remove = schemas
1168 .iter()
1169 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1170 .collect();
1171 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1172 prev.retain(|_k, v| v.is_none());
1173
1174 if !prev.is_empty() {
1175 let err = prev
1176 .keys()
1177 .map(|k| {
1178 let db_spec = schemas.get(&k.id).expect("should_exist");
1179 let db_name = match db_spec {
1180 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1181 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1182 };
1183 format!("{}.{}", db_name, k.id)
1184 })
1185 .join(", ");
1186
1187 return Err(SqlCatalogError::UnknownSchema(err).into());
1188 }
1189
1190 Ok(())
1191 }
1192
1193 pub fn remove_source_references(
1194 &mut self,
1195 source_id: CatalogItemId,
1196 ) -> Result<(), CatalogError> {
1197 let deleted = self
1198 .source_references
1199 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1200 .is_some();
1201 if deleted {
1202 Ok(())
1203 } else {
1204 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1205 }
1206 }
1207
1208 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1215 assert!(
1216 roles.iter().all(|id| id.is_user()),
1217 "cannot delete non-user roles"
1218 );
1219 self.remove_roles(roles)
1220 }
1221
1222 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1229 if roles.is_empty() {
1230 return Ok(());
1231 }
1232
1233 let to_remove_keys = roles
1234 .iter()
1235 .map(|role_id| RoleKey { id: *role_id })
1236 .collect::<Vec<_>>();
1237
1238 let to_remove_roles = to_remove_keys
1239 .iter()
1240 .map(|role_key| (role_key.clone(), None))
1241 .collect();
1242
1243 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1244
1245 let to_remove_role_auth = to_remove_keys
1246 .iter()
1247 .map(|role_key| {
1248 (
1249 RoleAuthKey {
1250 role_id: role_key.id,
1251 },
1252 None,
1253 )
1254 })
1255 .collect();
1256
1257 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1258
1259 prev.retain(|_k, v| v.is_none());
1260 if !prev.is_empty() {
1261 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1262 return Err(SqlCatalogError::UnknownRole(err).into());
1263 }
1264
1265 role_auth_prev.retain(|_k, v| v.is_none());
1266 Ok(())
1270 }
1271
1272 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1279 if clusters.is_empty() {
1280 return Ok(());
1281 }
1282
1283 let to_remove = clusters
1284 .iter()
1285 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1286 .collect();
1287 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1288
1289 prev.retain(|_k, v| v.is_none());
1290 if !prev.is_empty() {
1291 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1292 return Err(SqlCatalogError::UnknownCluster(err).into());
1293 }
1294
1295 self.cluster_replicas
1301 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1302 self.introspection_sources
1303 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1304
1305 Ok(())
1306 }
1307
1308 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1315 let deleted = self
1316 .cluster_replicas
1317 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1318 .is_some();
1319 if deleted {
1320 Ok(())
1321 } else {
1322 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1323 }
1324 }
1325
1326 pub fn remove_cluster_replicas(
1333 &mut self,
1334 replicas: &BTreeSet<ReplicaId>,
1335 ) -> Result<(), CatalogError> {
1336 if replicas.is_empty() {
1337 return Ok(());
1338 }
1339
1340 let to_remove = replicas
1341 .iter()
1342 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1343 .collect();
1344 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1345
1346 prev.retain(|_k, v| v.is_none());
1347 if !prev.is_empty() {
1348 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1349 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1350 }
1351
1352 Ok(())
1353 }
1354
1355 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1362 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1363 if prev.is_some() {
1364 Ok(())
1365 } else {
1366 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1367 }
1368 }
1369
1370 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1377 if ids.is_empty() {
1378 return Ok(());
1379 }
1380
1381 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1382 let n = self.items.delete_by_keys(ks, self.op_id).len();
1383 if n == ids.len() {
1384 Ok(())
1385 } else {
1386 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1387 let mut unknown = ids.difference(&item_ids);
1388 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1389 }
1390 }
1391
1392 pub fn remove_system_object_mappings(
1399 &mut self,
1400 descriptions: BTreeSet<SystemObjectDescription>,
1401 ) -> Result<(), CatalogError> {
1402 if descriptions.is_empty() {
1403 return Ok(());
1404 }
1405
1406 let ks: Vec<_> = descriptions
1407 .clone()
1408 .into_iter()
1409 .map(|desc| GidMappingKey {
1410 schema_name: desc.schema_name,
1411 object_type: desc.object_type,
1412 object_name: desc.object_name,
1413 })
1414 .collect();
1415 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1416
1417 if n == descriptions.len() {
1418 Ok(())
1419 } else {
1420 let item_descriptions = self
1421 .system_gid_mapping
1422 .items()
1423 .keys()
1424 .map(|k| SystemObjectDescription {
1425 schema_name: k.schema_name.clone(),
1426 object_type: k.object_type.clone(),
1427 object_name: k.object_name.clone(),
1428 })
1429 .collect();
1430 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1431 format!(
1432 "{} {}.{}",
1433 desc.object_type, desc.schema_name, desc.object_name
1434 )
1435 });
1436 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1437 }
1438 }
1439
1440 pub fn remove_introspection_source_indexes(
1447 &mut self,
1448 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1449 ) -> Result<(), CatalogError> {
1450 if introspection_source_indexes.is_empty() {
1451 return Ok(());
1452 }
1453
1454 let ks: Vec<_> = introspection_source_indexes
1455 .clone()
1456 .into_iter()
1457 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1458 .collect();
1459 let n = self
1460 .introspection_sources
1461 .delete_by_keys(ks, self.op_id)
1462 .len();
1463 if n == introspection_source_indexes.len() {
1464 Ok(())
1465 } else {
1466 let txn_indexes = self
1467 .introspection_sources
1468 .items()
1469 .keys()
1470 .map(|k| (k.cluster_id, k.name.clone()))
1471 .collect();
1472 let mut unknown = introspection_source_indexes
1473 .difference(&txn_indexes)
1474 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1475 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1476 }
1477 }
1478
1479 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1486 let updated =
1487 self.items
1488 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1489 if updated {
1490 Ok(())
1491 } else {
1492 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1493 }
1494 }
1495
1496 pub fn update_items(
1504 &mut self,
1505 items: BTreeMap<CatalogItemId, Item>,
1506 ) -> Result<(), CatalogError> {
1507 if items.is_empty() {
1508 return Ok(());
1509 }
1510
1511 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1512 let kvs: Vec<_> = items
1513 .clone()
1514 .into_iter()
1515 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1516 .collect();
1517 let n = self.items.update_by_keys(kvs, self.op_id)?;
1518 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1519 if n == update_ids.len() {
1520 Ok(())
1521 } else {
1522 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1523 let mut unknown = update_ids.difference(&item_ids);
1524 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1525 }
1526 }
1527
1528 pub fn update_role(
1536 &mut self,
1537 id: RoleId,
1538 role: Role,
1539 password: PasswordAction,
1540 ) -> Result<(), CatalogError> {
1541 let key = RoleKey { id };
1542 if self.roles.get(&key).is_some() {
1543 let auth_key = RoleAuthKey { role_id: id };
1544
1545 match password {
1546 PasswordAction::Set(new_password) => {
1547 let hash = mz_auth::hash::scram256_hash(
1548 &new_password.password,
1549 &new_password.scram_iterations,
1550 )
1551 .expect("password hash should be valid");
1552 let value = RoleAuthValue {
1553 password_hash: Some(hash),
1554 updated_at: SYSTEM_TIME(),
1555 };
1556
1557 if self.role_auth.get(&auth_key).is_some() {
1558 self.role_auth
1559 .update_by_key(auth_key.clone(), value, self.op_id)?;
1560 } else {
1561 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1562 }
1563 }
1564 PasswordAction::Clear => {
1565 let value = RoleAuthValue {
1566 password_hash: None,
1567 updated_at: SYSTEM_TIME(),
1568 };
1569 if self.role_auth.get(&auth_key).is_some() {
1570 self.role_auth
1571 .update_by_key(auth_key.clone(), value, self.op_id)?;
1572 }
1573 }
1574 PasswordAction::NoChange => {}
1575 }
1576
1577 self.roles
1578 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1579
1580 Ok(())
1581 } else {
1582 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1583 }
1584 }
1585
1586 pub fn update_roles_without_auth(
1597 &mut self,
1598 roles: BTreeMap<RoleId, Role>,
1599 ) -> Result<(), CatalogError> {
1600 if roles.is_empty() {
1601 return Ok(());
1602 }
1603
1604 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1605 let kvs: Vec<_> = roles
1606 .into_iter()
1607 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1608 .collect();
1609 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1610 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1611
1612 if n == update_role_ids.len() {
1613 Ok(())
1614 } else {
1615 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1616 let mut unknown = update_role_ids.difference(&role_ids);
1617 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1618 }
1619 }
1620
1621 pub fn update_system_object_mappings(
1626 &mut self,
1627 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1628 ) -> Result<(), CatalogError> {
1629 if mappings.is_empty() {
1630 return Ok(());
1631 }
1632
1633 let n = self.system_gid_mapping.update(
1634 |_k, v| {
1635 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1636 let (_, new_value) = mapping.clone().into_key_value();
1637 Some(new_value)
1638 } else {
1639 None
1640 }
1641 },
1642 self.op_id,
1643 )?;
1644
1645 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1646 != mappings.len()
1647 {
1648 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1649 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1650 }
1651
1652 Ok(())
1653 }
1654
1655 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1662 let updated = self.clusters.update_by_key(
1663 ClusterKey { id },
1664 cluster.into_key_value().1,
1665 self.op_id,
1666 )?;
1667 if updated {
1668 Ok(())
1669 } else {
1670 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1671 }
1672 }
1673
1674 pub fn update_cluster_replica(
1681 &mut self,
1682 replica_id: ReplicaId,
1683 replica: ClusterReplica,
1684 ) -> Result<(), CatalogError> {
1685 let updated = self.cluster_replicas.update_by_key(
1686 ClusterReplicaKey { id: replica_id },
1687 replica.into_key_value().1,
1688 self.op_id,
1689 )?;
1690 if updated {
1691 Ok(())
1692 } else {
1693 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1694 }
1695 }
1696
1697 pub fn update_database(
1704 &mut self,
1705 id: DatabaseId,
1706 database: Database,
1707 ) -> Result<(), CatalogError> {
1708 let updated = self.databases.update_by_key(
1709 DatabaseKey { id },
1710 database.into_key_value().1,
1711 self.op_id,
1712 )?;
1713 if updated {
1714 Ok(())
1715 } else {
1716 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1717 }
1718 }
1719
1720 pub fn update_schema(
1727 &mut self,
1728 schema_id: SchemaId,
1729 schema: Schema,
1730 ) -> Result<(), CatalogError> {
1731 let updated = self.schemas.update_by_key(
1732 SchemaKey { id: schema_id },
1733 schema.into_key_value().1,
1734 self.op_id,
1735 )?;
1736 if updated {
1737 Ok(())
1738 } else {
1739 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1740 }
1741 }
1742
1743 pub fn update_network_policy(
1750 &mut self,
1751 id: NetworkPolicyId,
1752 network_policy: NetworkPolicy,
1753 ) -> Result<(), CatalogError> {
1754 let updated = self.network_policies.update_by_key(
1755 NetworkPolicyKey { id },
1756 network_policy.into_key_value().1,
1757 self.op_id,
1758 )?;
1759 if updated {
1760 Ok(())
1761 } else {
1762 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1763 }
1764 }
1765 pub fn remove_network_policies(
1772 &mut self,
1773 network_policies: &BTreeSet<NetworkPolicyId>,
1774 ) -> Result<(), CatalogError> {
1775 if network_policies.is_empty() {
1776 return Ok(());
1777 }
1778
1779 let to_remove = network_policies
1780 .iter()
1781 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1782 .collect();
1783 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1784 assert!(
1785 prev.iter().all(|(k, _)| k.id.is_user()),
1786 "cannot delete non-user network policy"
1787 );
1788
1789 prev.retain(|_k, v| v.is_none());
1790 if !prev.is_empty() {
1791 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1792 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1793 }
1794
1795 Ok(())
1796 }
1797 pub fn set_default_privilege(
1801 &mut self,
1802 role_id: RoleId,
1803 database_id: Option<DatabaseId>,
1804 schema_id: Option<SchemaId>,
1805 object_type: ObjectType,
1806 grantee: RoleId,
1807 privileges: Option<AclMode>,
1808 ) -> Result<(), CatalogError> {
1809 self.default_privileges.set(
1810 DefaultPrivilegesKey {
1811 role_id,
1812 database_id,
1813 schema_id,
1814 object_type,
1815 grantee,
1816 },
1817 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1818 self.op_id,
1819 )?;
1820 Ok(())
1821 }
1822
1823 pub fn set_default_privileges(
1825 &mut self,
1826 default_privileges: Vec<DefaultPrivilege>,
1827 ) -> Result<(), CatalogError> {
1828 if default_privileges.is_empty() {
1829 return Ok(());
1830 }
1831
1832 let default_privileges = default_privileges
1833 .into_iter()
1834 .map(DurableType::into_key_value)
1835 .map(|(k, v)| (k, Some(v)))
1836 .collect();
1837 self.default_privileges
1838 .set_many(default_privileges, self.op_id)?;
1839 Ok(())
1840 }
1841
1842 pub fn set_system_privilege(
1846 &mut self,
1847 grantee: RoleId,
1848 grantor: RoleId,
1849 acl_mode: Option<AclMode>,
1850 ) -> Result<(), CatalogError> {
1851 self.system_privileges.set(
1852 SystemPrivilegesKey { grantee, grantor },
1853 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1854 self.op_id,
1855 )?;
1856 Ok(())
1857 }
1858
1859 pub fn set_system_privileges(
1861 &mut self,
1862 system_privileges: Vec<MzAclItem>,
1863 ) -> Result<(), CatalogError> {
1864 if system_privileges.is_empty() {
1865 return Ok(());
1866 }
1867
1868 let system_privileges = system_privileges
1869 .into_iter()
1870 .map(DurableType::into_key_value)
1871 .map(|(k, v)| (k, Some(v)))
1872 .collect();
1873 self.system_privileges
1874 .set_many(system_privileges, self.op_id)?;
1875 Ok(())
1876 }
1877
1878 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1880 self.settings.set(
1881 SettingKey { name },
1882 value.map(|value| SettingValue { value }),
1883 self.op_id,
1884 )?;
1885 Ok(())
1886 }
1887
1888 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1889 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1890 }
1891
1892 pub fn insert_introspection_source_indexes(
1894 &mut self,
1895 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1896 temporary_oids: &HashSet<u32>,
1897 ) -> Result<(), CatalogError> {
1898 if introspection_source_indexes.is_empty() {
1899 return Ok(());
1900 }
1901
1902 let amount = usize_to_u64(introspection_source_indexes.len());
1903 let oids = self.allocate_oids(amount, temporary_oids)?;
1904 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1905 .into_iter()
1906 .zip_eq(oids)
1907 .map(
1908 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1909 cluster_id,
1910 name,
1911 item_id,
1912 index_id,
1913 oid,
1914 },
1915 )
1916 .collect();
1917
1918 for introspection_source_index in introspection_source_indexes {
1919 let (key, value) = introspection_source_index.into_key_value();
1920 self.introspection_sources.insert(key, value, self.op_id)?;
1921 }
1922
1923 Ok(())
1924 }
1925
1926 pub fn set_system_object_mappings(
1928 &mut self,
1929 mappings: Vec<SystemObjectMapping>,
1930 ) -> Result<(), CatalogError> {
1931 if mappings.is_empty() {
1932 return Ok(());
1933 }
1934
1935 let mappings = mappings
1936 .into_iter()
1937 .map(DurableType::into_key_value)
1938 .map(|(k, v)| (k, Some(v)))
1939 .collect();
1940 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1941 Ok(())
1942 }
1943
1944 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1946 if replicas.is_empty() {
1947 return Ok(());
1948 }
1949
1950 let replicas = replicas
1951 .into_iter()
1952 .map(DurableType::into_key_value)
1953 .map(|(k, v)| (k, Some(v)))
1954 .collect();
1955 self.cluster_replicas.set_many(replicas, self.op_id)?;
1956 Ok(())
1957 }
1958
1959 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1961 match value {
1962 Some(value) => {
1963 let config = Config { key, value };
1964 let (key, value) = config.into_key_value();
1965 self.configs.set(key, Some(value), self.op_id)?;
1966 }
1967 None => {
1968 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1969 }
1970 }
1971 Ok(())
1972 }
1973
1974 pub fn get_config(&self, key: String) -> Option<u64> {
1976 self.configs
1977 .get(&ConfigKey { key })
1978 .map(|entry| entry.value)
1979 }
1980
1981 pub fn get_setting(&self, name: String) -> Option<&str> {
1983 self.settings
1984 .get(&SettingKey { name })
1985 .map(|entry| &*entry.value)
1986 }
1987
1988 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1989 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1990 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1991 }
1992
1993 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1994 self.set_setting(
1995 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1996 Some(shard_id.to_string()),
1997 )
1998 }
1999
2000 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
2001 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
2002 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
2003 }
2004
2005 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
2006 self.set_setting(
2007 EXPRESSION_CACHE_SHARD_KEY.to_string(),
2008 Some(shard_id.to_string()),
2009 )
2010 }
2011
2012 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
2018 self.set_config(
2019 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
2020 Some(
2021 value
2022 .as_millis()
2023 .try_into()
2024 .expect("max wait fits into u64"),
2025 ),
2026 )
2027 }
2028
2029 pub fn set_0dt_deployment_ddl_check_interval(
2036 &mut self,
2037 value: Duration,
2038 ) -> Result<(), CatalogError> {
2039 self.set_config(
2040 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2041 Some(
2042 value
2043 .as_millis()
2044 .try_into()
2045 .expect("ddl check interval fits into u64"),
2046 ),
2047 )
2048 }
2049
2050 pub fn set_enable_0dt_deployment_panic_after_timeout(
2056 &mut self,
2057 value: bool,
2058 ) -> Result<(), CatalogError> {
2059 self.set_config(
2060 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2061 Some(u64::from(value)),
2062 )
2063 }
2064
2065 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2071 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2072 }
2073
2074 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2081 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2082 }
2083
2084 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2091 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2092 }
2093
2094 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2096 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2097 }
2098
2099 pub fn update_comment(
2100 &mut self,
2101 object_id: CommentObjectId,
2102 sub_component: Option<usize>,
2103 comment: Option<String>,
2104 ) -> Result<(), CatalogError> {
2105 let key = CommentKey {
2106 object_id,
2107 sub_component,
2108 };
2109 let value = comment.map(|c| CommentValue { comment: c });
2110 self.comments.set(key, value, self.op_id)?;
2111
2112 Ok(())
2113 }
2114
2115 pub fn drop_comments(
2116 &mut self,
2117 object_ids: &BTreeSet<CommentObjectId>,
2118 ) -> Result<(), CatalogError> {
2119 if object_ids.is_empty() {
2120 return Ok(());
2121 }
2122
2123 self.comments
2124 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2125 Ok(())
2126 }
2127
2128 pub fn update_source_references(
2129 &mut self,
2130 source_id: CatalogItemId,
2131 references: Vec<SourceReference>,
2132 updated_at: u64,
2133 ) -> Result<(), CatalogError> {
2134 let key = SourceReferencesKey { source_id };
2135 let value = SourceReferencesValue {
2136 references,
2137 updated_at,
2138 };
2139 self.source_references.set(key, Some(value), self.op_id)?;
2140 Ok(())
2141 }
2142
2143 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2145 let key = ServerConfigurationKey {
2146 name: name.to_string(),
2147 };
2148 let value = ServerConfigurationValue { value };
2149 self.system_configurations
2150 .set(key, Some(value), self.op_id)?;
2151 Ok(())
2152 }
2153
2154 pub fn remove_system_config(&mut self, name: &str) {
2156 let key = ServerConfigurationKey {
2157 name: name.to_string(),
2158 };
2159 self.system_configurations
2160 .set(key, None, self.op_id)
2161 .expect("cannot have uniqueness violation");
2162 }
2163
2164 pub fn clear_system_configs(&mut self) {
2166 self.system_configurations.delete(|_k, _v| true, self.op_id);
2167 }
2168
2169 pub fn get_cluster_system_configurations(
2171 &self,
2172 ) -> impl Iterator<Item = ClusterSystemConfiguration> + use<'_> {
2173 self.cluster_system_configurations
2174 .items()
2175 .into_iter()
2176 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2177 }
2178
2179 pub fn upsert_cluster_system_config(
2182 &mut self,
2183 cluster_id: ClusterId,
2184 name: &str,
2185 value: String,
2186 ) -> Result<(), CatalogError> {
2187 let key = ClusterSystemConfigurationKey {
2188 cluster_id,
2189 name: name.to_string(),
2190 };
2191 let value = ClusterSystemConfigurationValue { value };
2192 self.cluster_system_configurations
2193 .set(key, Some(value), self.op_id)?;
2194 Ok(())
2195 }
2196
2197 pub fn remove_cluster_system_config(&mut self, cluster_id: ClusterId, name: &str) {
2200 let key = ClusterSystemConfigurationKey {
2201 cluster_id,
2202 name: name.to_string(),
2203 };
2204 self.cluster_system_configurations
2205 .set(key, None, self.op_id)
2206 .expect("cannot have uniqueness violation");
2207 }
2208
2209 pub fn get_replica_system_configurations(
2211 &self,
2212 ) -> impl Iterator<Item = ReplicaSystemConfiguration> + use<'_> {
2213 self.replica_system_configurations
2214 .items()
2215 .into_iter()
2216 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2217 }
2218
2219 pub fn upsert_replica_system_config(
2222 &mut self,
2223 replica_id: ReplicaId,
2224 name: &str,
2225 value: String,
2226 ) -> Result<(), CatalogError> {
2227 let key = ReplicaSystemConfigurationKey {
2228 replica_id,
2229 name: name.to_string(),
2230 };
2231 let value = ReplicaSystemConfigurationValue { value };
2232 self.replica_system_configurations
2233 .set(key, Some(value), self.op_id)?;
2234 Ok(())
2235 }
2236
2237 pub fn remove_replica_system_config(&mut self, replica_id: ReplicaId, name: &str) {
2240 let key = ReplicaSystemConfigurationKey {
2241 replica_id,
2242 name: name.to_string(),
2243 };
2244 self.replica_system_configurations
2245 .set(key, None, self.op_id)
2246 .expect("cannot have uniqueness violation");
2247 }
2248
2249 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2250 match self.configs.insert(
2251 ConfigKey { key: key.clone() },
2252 ConfigValue { value },
2253 self.op_id,
2254 ) {
2255 Ok(_) => Ok(()),
2256 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2257 }
2258 }
2259
2260 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2261 self.clusters
2262 .items()
2263 .into_iter()
2264 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2265 }
2266
2267 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2268 self.cluster_replicas
2269 .items()
2270 .into_iter()
2271 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2272 }
2273
2274 pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2275 self.databases
2276 .items()
2277 .into_iter()
2278 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2279 }
2280
2281 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2282 self.roles
2283 .items()
2284 .into_iter()
2285 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2286 }
2287
2288 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2289 self.network_policies
2290 .items()
2291 .into_iter()
2292 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2293 }
2294
2295 pub fn get_system_object_mappings(
2296 &self,
2297 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2298 self.system_gid_mapping
2299 .items()
2300 .into_iter()
2301 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2302 }
2303
2304 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2305 self.schemas
2306 .items()
2307 .into_iter()
2308 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2309 }
2310
2311 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2312 self.system_configurations
2313 .items()
2314 .into_iter()
2315 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2316 }
2317
2318 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2319 let key = SchemaKey { id: *id };
2320 self.schemas
2321 .get(&key)
2322 .map(|v| DurableType::from_key_value(key, v.clone()))
2323 }
2324
2325 pub fn get_introspection_source_indexes(
2326 &self,
2327 cluster_id: ClusterId,
2328 ) -> BTreeMap<&str, (GlobalId, u32)> {
2329 self.introspection_sources
2330 .items()
2331 .into_iter()
2332 .filter(|(k, _v)| k.cluster_id == cluster_id)
2333 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2334 .collect()
2335 }
2336
2337 pub fn get_catalog_content_version(&self) -> Option<&str> {
2338 self.settings
2339 .get(&SettingKey {
2340 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2341 })
2342 .map(|value| &*value.value)
2343 }
2344
2345 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2346 self.settings
2347 .get(&SettingKey {
2348 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2349 })
2350 .map(|value| value.value.clone())
2351 }
2352
2353 #[must_use]
2359 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2360 let updates = self.get_op_updates();
2361 self.commit_op();
2362 updates
2363 }
2364
2365 fn get_op_updates(&self) -> Vec<StateUpdate> {
2366 fn get_collection_op_updates<'a, T>(
2367 table_txn: &'a TableTransaction<T::Key, T::Value>,
2368 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2369 op: Timestamp,
2370 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2371 where
2372 T::Key: Ord + Eq + Clone + Debug,
2373 T::Value: Ord + Clone + Debug,
2374 T: DurableType,
2375 {
2376 table_txn
2377 .pending
2378 .iter()
2379 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2380 .filter_map(move |(k, v)| {
2381 if v.ts == op {
2382 let key = k.clone();
2383 let value = v.value.clone();
2384 let diff = v.diff.clone().try_into().expect("invalid diff");
2385 let update = DurableType::from_key_value(key, value);
2386 let kind = kind_fn(update);
2387 Some((kind, diff))
2388 } else {
2389 None
2390 }
2391 })
2392 }
2393
2394 fn get_large_collection_op_updates<'a, T>(
2395 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2396 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2397 op: Timestamp,
2398 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2399 where
2400 T::Key: Ord + Eq + Clone + Debug,
2401 T: DurableType<Value = ()>,
2402 {
2403 collection.iter().filter_map(move |(k, diff, ts)| {
2404 if *ts == op {
2405 let key = k.clone();
2406 let diff = diff.clone().try_into().expect("invalid diff");
2407 let update = DurableType::from_key_value(key, ());
2408 let kind = kind_fn(update);
2409 Some((kind, diff))
2410 } else {
2411 None
2412 }
2413 })
2414 }
2415
2416 let Transaction {
2417 durable_catalog: _,
2418 databases,
2419 schemas,
2420 items,
2421 comments,
2422 roles,
2423 role_auth,
2424 clusters,
2425 network_policies,
2426 cluster_replicas,
2427 introspection_sources,
2428 system_gid_mapping,
2429 system_configurations,
2430 cluster_system_configurations,
2431 replica_system_configurations,
2432 default_privileges,
2433 source_references,
2434 system_privileges,
2435 audit_log_updates,
2436 storage_collection_metadata,
2437 unfinalized_shards,
2438 id_allocator: _,
2440 configs: _,
2441 settings: _,
2442 txn_wal_shard: _,
2443 upper,
2444 op_id: _,
2445 } = &self;
2446
2447 let updates = std::iter::empty()
2448 .chain(get_collection_op_updates(
2449 roles,
2450 StateUpdateKind::Role,
2451 self.op_id,
2452 ))
2453 .chain(get_collection_op_updates(
2454 role_auth,
2455 StateUpdateKind::RoleAuth,
2456 self.op_id,
2457 ))
2458 .chain(get_collection_op_updates(
2459 databases,
2460 StateUpdateKind::Database,
2461 self.op_id,
2462 ))
2463 .chain(get_collection_op_updates(
2464 schemas,
2465 StateUpdateKind::Schema,
2466 self.op_id,
2467 ))
2468 .chain(get_collection_op_updates(
2469 default_privileges,
2470 StateUpdateKind::DefaultPrivilege,
2471 self.op_id,
2472 ))
2473 .chain(get_collection_op_updates(
2474 system_privileges,
2475 StateUpdateKind::SystemPrivilege,
2476 self.op_id,
2477 ))
2478 .chain(get_collection_op_updates(
2479 system_configurations,
2480 StateUpdateKind::SystemConfiguration,
2481 self.op_id,
2482 ))
2483 .chain(get_collection_op_updates(
2484 cluster_system_configurations,
2485 StateUpdateKind::ClusterSystemConfiguration,
2486 self.op_id,
2487 ))
2488 .chain(get_collection_op_updates(
2489 replica_system_configurations,
2490 StateUpdateKind::ReplicaSystemConfiguration,
2491 self.op_id,
2492 ))
2493 .chain(get_collection_op_updates(
2494 clusters,
2495 StateUpdateKind::Cluster,
2496 self.op_id,
2497 ))
2498 .chain(get_collection_op_updates(
2499 network_policies,
2500 StateUpdateKind::NetworkPolicy,
2501 self.op_id,
2502 ))
2503 .chain(get_collection_op_updates(
2504 introspection_sources,
2505 StateUpdateKind::IntrospectionSourceIndex,
2506 self.op_id,
2507 ))
2508 .chain(get_collection_op_updates(
2509 cluster_replicas,
2510 StateUpdateKind::ClusterReplica,
2511 self.op_id,
2512 ))
2513 .chain(get_collection_op_updates(
2514 system_gid_mapping,
2515 StateUpdateKind::SystemObjectMapping,
2516 self.op_id,
2517 ))
2518 .chain(get_collection_op_updates(
2519 items,
2520 StateUpdateKind::Item,
2521 self.op_id,
2522 ))
2523 .chain(get_collection_op_updates(
2524 comments,
2525 StateUpdateKind::Comment,
2526 self.op_id,
2527 ))
2528 .chain(get_collection_op_updates(
2529 source_references,
2530 StateUpdateKind::SourceReferences,
2531 self.op_id,
2532 ))
2533 .chain(get_collection_op_updates(
2534 storage_collection_metadata,
2535 StateUpdateKind::StorageCollectionMetadata,
2536 self.op_id,
2537 ))
2538 .chain(get_collection_op_updates(
2539 unfinalized_shards,
2540 StateUpdateKind::UnfinalizedShard,
2541 self.op_id,
2542 ))
2543 .chain(get_large_collection_op_updates(
2544 audit_log_updates,
2545 StateUpdateKind::AuditLog,
2546 self.op_id,
2547 ))
2548 .map(|(kind, diff)| StateUpdate {
2549 kind,
2550 ts: upper.clone(),
2551 diff,
2552 })
2553 .collect();
2554
2555 updates
2556 }
2557
2558 pub fn is_savepoint(&self) -> bool {
2559 self.durable_catalog.is_savepoint()
2560 }
2561
2562 fn commit_op(&mut self) {
2563 self.op_id += 1;
2564 }
2565
2566 pub fn op_id(&self) -> Timestamp {
2567 self.op_id
2568 }
2569
2570 pub fn upper(&self) -> mz_repr::Timestamp {
2571 self.upper
2572 }
2573
2574 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2575 let audit_log_updates = self
2576 .audit_log_updates
2577 .into_iter()
2578 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2579 .collect();
2580
2581 let txn_batch = TransactionBatch {
2582 databases: self.databases.pending(),
2583 schemas: self.schemas.pending(),
2584 items: self.items.pending(),
2585 comments: self.comments.pending(),
2586 roles: self.roles.pending(),
2587 role_auth: self.role_auth.pending(),
2588 clusters: self.clusters.pending(),
2589 cluster_replicas: self.cluster_replicas.pending(),
2590 network_policies: self.network_policies.pending(),
2591 introspection_sources: self.introspection_sources.pending(),
2592 id_allocator: self.id_allocator.pending(),
2593 configs: self.configs.pending(),
2594 source_references: self.source_references.pending(),
2595 settings: self.settings.pending(),
2596 system_gid_mapping: self.system_gid_mapping.pending(),
2597 system_configurations: self.system_configurations.pending(),
2598 cluster_system_configurations: self.cluster_system_configurations.pending(),
2599 replica_system_configurations: self.replica_system_configurations.pending(),
2600 default_privileges: self.default_privileges.pending(),
2601 system_privileges: self.system_privileges.pending(),
2602 storage_collection_metadata: self.storage_collection_metadata.pending(),
2603 unfinalized_shards: self.unfinalized_shards.pending(),
2604 txn_wal_shard: self.txn_wal_shard.pending(),
2605 audit_log_updates,
2606 upper: self.upper,
2607 };
2608 (txn_batch, self.durable_catalog)
2609 }
2610
2611 #[mz_ore::instrument(level = "debug")]
2623 pub(crate) async fn commit_internal(
2624 self,
2625 commit_ts: mz_repr::Timestamp,
2626 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2627 let (mut txn_batch, durable_catalog) = self.into_parts();
2628 let TransactionBatch {
2629 databases,
2630 schemas,
2631 items,
2632 comments,
2633 roles,
2634 role_auth,
2635 clusters,
2636 cluster_replicas,
2637 network_policies,
2638 introspection_sources,
2639 id_allocator,
2640 configs,
2641 source_references,
2642 settings,
2643 system_gid_mapping,
2644 system_configurations,
2645 cluster_system_configurations,
2646 replica_system_configurations,
2647 default_privileges,
2648 system_privileges,
2649 storage_collection_metadata,
2650 unfinalized_shards,
2651 txn_wal_shard,
2652 audit_log_updates,
2653 upper: _,
2654 } = &mut txn_batch;
2655 differential_dataflow::consolidation::consolidate_updates(databases);
2658 differential_dataflow::consolidation::consolidate_updates(schemas);
2659 differential_dataflow::consolidation::consolidate_updates(items);
2660 differential_dataflow::consolidation::consolidate_updates(comments);
2661 differential_dataflow::consolidation::consolidate_updates(roles);
2662 differential_dataflow::consolidation::consolidate_updates(role_auth);
2663 differential_dataflow::consolidation::consolidate_updates(clusters);
2664 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2665 differential_dataflow::consolidation::consolidate_updates(network_policies);
2666 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2667 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2668 differential_dataflow::consolidation::consolidate_updates(configs);
2669 differential_dataflow::consolidation::consolidate_updates(settings);
2670 differential_dataflow::consolidation::consolidate_updates(source_references);
2671 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2672 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2673 differential_dataflow::consolidation::consolidate_updates(cluster_system_configurations);
2674 differential_dataflow::consolidation::consolidate_updates(replica_system_configurations);
2675 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2676 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2677 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2678 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2679 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2680 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2681
2682 let upper = durable_catalog
2683 .commit_transaction(txn_batch, commit_ts)
2684 .await?;
2685 Ok((durable_catalog, upper))
2686 }
2687
2688 #[mz_ore::instrument(level = "debug")]
2705 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2706 let op_updates = self.get_op_updates();
2707 assert!(
2708 op_updates.is_empty(),
2709 "unconsumed transaction updates: {op_updates:?}"
2710 );
2711
2712 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2713 let updates = durable_storage.sync_updates(upper).await?;
2715 soft_assert_no_log!(
2720 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2721 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2722 );
2723 Ok(())
2724 }
2725}
2726
2727use crate::durable::async_trait;
2728
2729use super::objects::{RoleAuthKey, RoleAuthValue};
2730
2731#[async_trait]
2732impl StorageTxn for Transaction<'_> {
2733 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2734 self.storage_collection_metadata
2735 .items()
2736 .into_iter()
2737 .map(
2738 |(
2739 StorageCollectionMetadataKey { id },
2740 StorageCollectionMetadataValue { shard },
2741 )| { (*id, shard.clone()) },
2742 )
2743 .collect()
2744 }
2745
2746 fn insert_collection_metadata(
2747 &mut self,
2748 metadata: BTreeMap<GlobalId, ShardId>,
2749 ) -> Result<(), StorageError> {
2750 for (id, shard) in metadata {
2751 self.storage_collection_metadata
2752 .insert(
2753 StorageCollectionMetadataKey { id },
2754 StorageCollectionMetadataValue {
2755 shard: shard.clone(),
2756 },
2757 self.op_id,
2758 )
2759 .map_err(|err| match err {
2760 DurableCatalogError::DuplicateKey => {
2761 StorageError::CollectionMetadataAlreadyExists(id)
2762 }
2763 DurableCatalogError::UniquenessViolation => {
2764 StorageError::PersistShardAlreadyInUse(shard)
2765 }
2766 err => StorageError::Generic(anyhow::anyhow!(err)),
2767 })?;
2768 }
2769 Ok(())
2770 }
2771
2772 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2773 let ks: Vec<_> = ids
2774 .into_iter()
2775 .map(|id| StorageCollectionMetadataKey { id })
2776 .collect();
2777 self.storage_collection_metadata
2778 .delete_by_keys(ks, self.op_id)
2779 .into_iter()
2780 .map(
2781 |(
2782 StorageCollectionMetadataKey { id },
2783 StorageCollectionMetadataValue { shard },
2784 )| (id, shard),
2785 )
2786 .collect()
2787 }
2788
2789 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2790 self.unfinalized_shards
2791 .items()
2792 .into_iter()
2793 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2794 .collect()
2795 }
2796
2797 fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError> {
2798 for shard in s {
2799 match self
2800 .unfinalized_shards
2801 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2802 {
2803 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2805 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2806 };
2807 }
2808 Ok(())
2809 }
2810
2811 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2812 let ks: Vec<_> = shards
2813 .into_iter()
2814 .map(|shard| UnfinalizedShardKey { shard })
2815 .collect();
2816 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2817 }
2818
2819 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2820 self.txn_wal_shard
2821 .values()
2822 .iter()
2823 .next()
2824 .map(|TxnWalShardValue { shard }| *shard)
2825 }
2826
2827 fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError> {
2828 self.txn_wal_shard
2829 .insert((), TxnWalShardValue { shard }, self.op_id)
2830 .map_err(|err| match err {
2831 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2832 err => StorageError::Generic(anyhow::anyhow!(err)),
2833 })
2834 }
2835}
2836
2837#[derive(Debug, Clone, Default, PartialEq)]
2839pub struct TransactionBatch {
2840 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2841 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2842 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2843 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2844 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2845 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2846 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2847 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2848 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2849 pub(crate) introspection_sources: Vec<(
2850 proto::ClusterIntrospectionSourceIndexKey,
2851 proto::ClusterIntrospectionSourceIndexValue,
2852 Diff,
2853 )>,
2854 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2855 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2856 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2857 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2858 pub(crate) system_configurations: Vec<(
2859 proto::ServerConfigurationKey,
2860 proto::ServerConfigurationValue,
2861 Diff,
2862 )>,
2863 pub(crate) cluster_system_configurations: Vec<(
2864 proto::ClusterSystemConfigurationKey,
2865 proto::ClusterSystemConfigurationValue,
2866 Diff,
2867 )>,
2868 pub(crate) replica_system_configurations: Vec<(
2869 proto::ReplicaSystemConfigurationKey,
2870 proto::ReplicaSystemConfigurationValue,
2871 Diff,
2872 )>,
2873 pub(crate) default_privileges: Vec<(
2874 proto::DefaultPrivilegesKey,
2875 proto::DefaultPrivilegesValue,
2876 Diff,
2877 )>,
2878 pub(crate) source_references: Vec<(
2879 proto::SourceReferencesKey,
2880 proto::SourceReferencesValue,
2881 Diff,
2882 )>,
2883 pub(crate) system_privileges: Vec<(
2884 proto::SystemPrivilegesKey,
2885 proto::SystemPrivilegesValue,
2886 Diff,
2887 )>,
2888 pub(crate) storage_collection_metadata: Vec<(
2889 proto::StorageCollectionMetadataKey,
2890 proto::StorageCollectionMetadataValue,
2891 Diff,
2892 )>,
2893 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2894 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2895 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2896 pub(crate) upper: mz_repr::Timestamp,
2898}
2899
2900impl TransactionBatch {
2901 pub fn is_empty(&self) -> bool {
2902 let TransactionBatch {
2903 databases,
2904 schemas,
2905 items,
2906 comments,
2907 roles,
2908 role_auth,
2909 clusters,
2910 cluster_replicas,
2911 network_policies,
2912 introspection_sources,
2913 id_allocator,
2914 configs,
2915 settings,
2916 source_references,
2917 system_gid_mapping,
2918 system_configurations,
2919 cluster_system_configurations,
2920 replica_system_configurations,
2921 default_privileges,
2922 system_privileges,
2923 storage_collection_metadata,
2924 unfinalized_shards,
2925 txn_wal_shard,
2926 audit_log_updates,
2927 upper: _,
2928 } = self;
2929 databases.is_empty()
2930 && schemas.is_empty()
2931 && items.is_empty()
2932 && comments.is_empty()
2933 && roles.is_empty()
2934 && role_auth.is_empty()
2935 && clusters.is_empty()
2936 && cluster_replicas.is_empty()
2937 && network_policies.is_empty()
2938 && introspection_sources.is_empty()
2939 && id_allocator.is_empty()
2940 && configs.is_empty()
2941 && settings.is_empty()
2942 && source_references.is_empty()
2943 && system_gid_mapping.is_empty()
2944 && system_configurations.is_empty()
2945 && cluster_system_configurations.is_empty()
2946 && replica_system_configurations.is_empty()
2947 && default_privileges.is_empty()
2948 && system_privileges.is_empty()
2949 && storage_collection_metadata.is_empty()
2950 && unfinalized_shards.is_empty()
2951 && txn_wal_shard.is_empty()
2952 && audit_log_updates.is_empty()
2953 }
2954}
2955
2956#[derive(Debug, Clone, PartialEq, Eq)]
2957struct TransactionUpdate<V> {
2958 value: V,
2959 ts: Timestamp,
2960 diff: Diff,
2961}
2962
2963trait UniqueName {
2965 const HAS_UNIQUE_NAME: bool;
2968 fn unique_name(&self) -> &str;
2970}
2971
2972mod unique_name {
2973 use crate::durable::objects::*;
2974
2975 macro_rules! impl_unique_name {
2976 ($($t:ty),* $(,)?) => {
2977 $(
2978 impl crate::durable::transaction::UniqueName for $t {
2979 const HAS_UNIQUE_NAME: bool = true;
2980 fn unique_name(&self) -> &str {
2981 &self.name
2982 }
2983 }
2984 )*
2985 };
2986 }
2987
2988 macro_rules! impl_no_unique_name {
2989 ($($t:ty),* $(,)?) => {
2990 $(
2991 impl crate::durable::transaction::UniqueName for $t {
2992 const HAS_UNIQUE_NAME: bool = false;
2993 fn unique_name(&self) -> &str {
2994 ""
2995 }
2996 }
2997 )*
2998 };
2999 }
3000
3001 impl_unique_name! {
3002 ClusterReplicaValue,
3003 ClusterValue,
3004 DatabaseValue,
3005 ItemValue,
3006 NetworkPolicyValue,
3007 RoleValue,
3008 SchemaValue,
3009 }
3010
3011 impl_no_unique_name!(
3012 (),
3013 ClusterIntrospectionSourceIndexValue,
3014 ClusterSystemConfigurationValue,
3015 CommentValue,
3016 ConfigValue,
3017 DefaultPrivilegesValue,
3018 GidMappingValue,
3019 IdAllocValue,
3020 ReplicaSystemConfigurationValue,
3021 ServerConfigurationValue,
3022 SettingValue,
3023 SourceReferencesValue,
3024 StorageCollectionMetadataValue,
3025 SystemPrivilegesValue,
3026 TxnWalShardValue,
3027 RoleAuthValue,
3028 );
3029
3030 #[cfg(test)]
3031 mod test {
3032 impl_no_unique_name!(String,);
3033 }
3034}
3035
3036#[derive(Debug)]
3046struct TableTransaction<K, V> {
3047 initial: BTreeMap<K, V>,
3048 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
3051 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
3052}
3053
3054impl<K, V> TableTransaction<K, V>
3055where
3056 K: Ord + Eq + Clone + Debug,
3057 V: Ord + Clone + Debug + UniqueName,
3058{
3059 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
3066 where
3067 K: RustType<KP>,
3068 V: RustType<VP>,
3069 {
3070 let initial = initial
3071 .into_iter()
3072 .map(RustType::from_proto)
3073 .collect::<Result<_, _>>()?;
3074
3075 Ok(Self {
3076 initial,
3077 pending: BTreeMap::new(),
3078 uniqueness_violation: None,
3079 })
3080 }
3081
3082 fn new_with_uniqueness_fn<KP, VP>(
3085 initial: BTreeMap<KP, VP>,
3086 uniqueness_violation: fn(a: &V, b: &V) -> bool,
3087 ) -> Result<Self, TryFromProtoError>
3088 where
3089 K: RustType<KP>,
3090 V: RustType<VP>,
3091 {
3092 let initial = initial
3093 .into_iter()
3094 .map(RustType::from_proto)
3095 .collect::<Result<_, _>>()?;
3096
3097 Ok(Self {
3098 initial,
3099 pending: BTreeMap::new(),
3100 uniqueness_violation: Some(uniqueness_violation),
3101 })
3102 }
3103
3104 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
3107 where
3108 K: RustType<KP>,
3109 V: RustType<VP>,
3110 {
3111 soft_assert_no_log!(self.verify().is_ok());
3112 self.pending
3115 .into_iter()
3116 .flat_map(|(k, v)| {
3117 let mut v: Vec<_> = v
3118 .into_iter()
3119 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3120 .collect();
3121 differential_dataflow::consolidation::consolidate(&mut v);
3122 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3123 })
3124 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3125 .collect()
3126 }
3127
3128 fn verify(&self) -> Result<(), DurableCatalogError> {
3133 if let Some(uniqueness_violation) = self.uniqueness_violation {
3134 let items = self.values();
3136 if V::HAS_UNIQUE_NAME {
3137 let by_name: BTreeMap<_, _> = items
3138 .iter()
3139 .enumerate()
3140 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3141 .collect();
3142 for (i, vi) in items.iter().enumerate() {
3143 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3144 if i != *j && uniqueness_violation(vi, *vj) {
3145 return Err(DurableCatalogError::UniquenessViolation);
3146 }
3147 }
3148 }
3149 } else {
3150 for (i, vi) in items.iter().enumerate() {
3151 for (j, vj) in items.iter().enumerate() {
3152 if i != j && uniqueness_violation(vi, vj) {
3153 return Err(DurableCatalogError::UniquenessViolation);
3154 }
3155 }
3156 }
3157 }
3158 }
3159 soft_assert_no_log!(
3160 self.pending
3161 .values()
3162 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3163 "pending should be sorted by timestamp: {:?}",
3164 self.pending
3165 );
3166 Ok(())
3167 }
3168
3169 fn verify_keys<'a>(
3174 &self,
3175 keys: impl IntoIterator<Item = &'a K>,
3176 ) -> Result<(), DurableCatalogError>
3177 where
3178 K: 'a,
3179 {
3180 if let Some(uniqueness_violation) = self.uniqueness_violation {
3181 let entries: Vec<_> = keys
3182 .into_iter()
3183 .filter_map(|key| self.get(key).map(|value| (key, value)))
3184 .collect();
3185 for (ki, vi) in self.items() {
3187 for (kj, vj) in &entries {
3188 if ki != *kj && uniqueness_violation(vi, vj) {
3189 return Err(DurableCatalogError::UniquenessViolation);
3190 }
3191 }
3192 }
3193 }
3194 soft_assert_no_log!(self.verify().is_ok());
3195 Ok(())
3196 }
3197
3198 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3201 let mut seen = BTreeSet::new();
3202 for k in self.pending.keys() {
3203 seen.insert(k);
3204 let v = self.get(k);
3205 if let Some(v) = v {
3208 f(k, v);
3209 }
3210 }
3211 for (k, v) in self.initial.iter() {
3212 if !seen.contains(k) {
3214 f(k, v);
3215 }
3216 }
3217 }
3218
3219 fn get(&self, k: &K) -> Option<&V> {
3221 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3222 let mut updates = Vec::with_capacity(pending.len() + 1);
3223 if let Some(initial) = self.initial.get(k) {
3224 updates.push((initial, Diff::ONE));
3225 }
3226 updates.extend(
3227 pending
3228 .into_iter()
3229 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3230 );
3231
3232 differential_dataflow::consolidation::consolidate(&mut updates);
3233 assert!(updates.len() <= 1);
3234 updates.into_iter().next().map(|(v, _)| v)
3235 }
3236
3237 #[cfg(test)]
3242 fn items_cloned(&self) -> BTreeMap<K, V> {
3243 let mut items = BTreeMap::new();
3244 self.for_values(|k, v| {
3245 items.insert(k.clone(), v.clone());
3246 });
3247 items
3248 }
3249
3250 fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3254 where
3255 K: RustType<KP>,
3256 V: RustType<VP>,
3257 KP: Ord,
3258 {
3259 let mut items = BTreeMap::new();
3260 self.for_values(|k, v| {
3261 items.insert(k.into_proto(), v.into_proto());
3262 });
3263 items
3264 }
3265
3266 fn items(&self) -> BTreeMap<&K, &V> {
3269 let mut items = BTreeMap::new();
3270 self.for_values(|k, v| {
3271 items.insert(k, v);
3272 });
3273 items
3274 }
3275
3276 fn values(&self) -> BTreeSet<&V> {
3278 let mut items = BTreeSet::new();
3279 self.for_values(|_, v| {
3280 items.insert(v);
3281 });
3282 items
3283 }
3284
3285 fn len(&self) -> usize {
3287 let mut count = 0;
3288 self.for_values(|_, _| {
3289 count += 1;
3290 });
3291 count
3292 }
3293
3294 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3298 &mut self,
3299 mut f: F,
3300 ) {
3301 let mut pending = BTreeMap::new();
3302 self.for_values(|k, v| f(&mut pending, k, v));
3303 for (k, updates) in pending {
3304 self.pending.entry(k).or_default().extend(updates);
3305 }
3306 }
3307
3308 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3312 let mut violation = None;
3313 self.for_values(|for_k, for_v| {
3314 if &k == for_k {
3315 violation = Some(DurableCatalogError::DuplicateKey);
3316 }
3317 if let Some(uniqueness_violation) = self.uniqueness_violation {
3318 if uniqueness_violation(for_v, &v) {
3319 violation = Some(DurableCatalogError::UniquenessViolation);
3320 }
3321 }
3322 });
3323 if let Some(violation) = violation {
3324 return Err(violation);
3325 }
3326 self.pending.entry(k).or_default().push(TransactionUpdate {
3327 value: v,
3328 ts,
3329 diff: Diff::ONE,
3330 });
3331 soft_assert_no_log!(self.verify().is_ok());
3332 Ok(())
3333 }
3334
3335 fn update<F: Fn(&K, &V) -> Option<V>>(
3344 &mut self,
3345 f: F,
3346 ts: Timestamp,
3347 ) -> Result<Diff, DurableCatalogError> {
3348 let mut changed = Diff::ZERO;
3349 let mut keys = BTreeSet::new();
3350 let pending = self.pending.clone();
3352 self.for_values_mut(|p, k, v| {
3353 if let Some(next) = f(k, v) {
3354 changed += Diff::ONE;
3355 keys.insert(k.clone());
3356 let updates = p.entry(k.clone()).or_default();
3357 updates.push(TransactionUpdate {
3358 value: v.clone(),
3359 ts,
3360 diff: Diff::MINUS_ONE,
3361 });
3362 updates.push(TransactionUpdate {
3363 value: next,
3364 ts,
3365 diff: Diff::ONE,
3366 });
3367 }
3368 });
3369 if let Err(err) = self.verify_keys(&keys) {
3371 self.pending = pending;
3372 Err(err)
3373 } else {
3374 Ok(changed)
3375 }
3376 }
3377
3378 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3383 if let Some(cur_v) = self.get(&k) {
3384 if v != *cur_v {
3385 self.set(k, Some(v), ts)?;
3386 }
3387 Ok(true)
3388 } else {
3389 Ok(false)
3390 }
3391 }
3392
3393 fn update_by_keys(
3398 &mut self,
3399 kvs: impl IntoIterator<Item = (K, V)>,
3400 ts: Timestamp,
3401 ) -> Result<Diff, DurableCatalogError> {
3402 let kvs: Vec<_> = kvs
3403 .into_iter()
3404 .filter_map(|(k, v)| match self.get(&k) {
3405 Some(cur_v) => Some((*cur_v == v, k, v)),
3407 None => None,
3408 })
3409 .collect();
3410 let changed = kvs.len();
3411 let changed =
3412 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3413 let kvs = kvs
3414 .into_iter()
3415 .filter(|(no_op, _, _)| !no_op)
3417 .map(|(_, k, v)| (k, Some(v)))
3418 .collect();
3419 self.set_many(kvs, ts)?;
3420 Ok(changed)
3421 }
3422
3423 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3430 let prev = self.get(&k).cloned();
3431 let entry = self.pending.entry(k.clone()).or_default();
3432 let restore_len = entry.len();
3433
3434 match (v, prev.clone()) {
3435 (Some(v), Some(prev)) => {
3436 entry.push(TransactionUpdate {
3437 value: prev,
3438 ts,
3439 diff: Diff::MINUS_ONE,
3440 });
3441 entry.push(TransactionUpdate {
3442 value: v,
3443 ts,
3444 diff: Diff::ONE,
3445 });
3446 }
3447 (Some(v), None) => {
3448 entry.push(TransactionUpdate {
3449 value: v,
3450 ts,
3451 diff: Diff::ONE,
3452 });
3453 }
3454 (None, Some(prev)) => {
3455 entry.push(TransactionUpdate {
3456 value: prev,
3457 ts,
3458 diff: Diff::MINUS_ONE,
3459 });
3460 }
3461 (None, None) => {}
3462 }
3463
3464 if let Err(err) = self.verify_keys([&k]) {
3466 let pending = self.pending.get_mut(&k).expect("inserted above");
3469 pending.truncate(restore_len);
3470 Err(err)
3471 } else {
3472 Ok(prev)
3473 }
3474 }
3475
3476 fn set_many(
3481 &mut self,
3482 kvs: BTreeMap<K, Option<V>>,
3483 ts: Timestamp,
3484 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3485 if kvs.is_empty() {
3486 return Ok(BTreeMap::new());
3487 }
3488
3489 let mut prevs = BTreeMap::new();
3490 let mut restores = BTreeMap::new();
3491
3492 for (k, v) in kvs {
3493 let prev = self.get(&k).cloned();
3494 let entry = self.pending.entry(k.clone()).or_default();
3495 restores.insert(k.clone(), entry.len());
3496
3497 match (v, prev.clone()) {
3498 (Some(v), Some(prev)) => {
3499 entry.push(TransactionUpdate {
3500 value: prev,
3501 ts,
3502 diff: Diff::MINUS_ONE,
3503 });
3504 entry.push(TransactionUpdate {
3505 value: v,
3506 ts,
3507 diff: Diff::ONE,
3508 });
3509 }
3510 (Some(v), None) => {
3511 entry.push(TransactionUpdate {
3512 value: v,
3513 ts,
3514 diff: Diff::ONE,
3515 });
3516 }
3517 (None, Some(prev)) => {
3518 entry.push(TransactionUpdate {
3519 value: prev,
3520 ts,
3521 diff: Diff::MINUS_ONE,
3522 });
3523 }
3524 (None, None) => {}
3525 }
3526
3527 prevs.insert(k, prev);
3528 }
3529
3530 if let Err(err) = self.verify_keys(prevs.keys()) {
3532 for (k, restore_len) in restores {
3533 let pending = self.pending.get_mut(&k).expect("inserted above");
3536 pending.truncate(restore_len);
3537 }
3538 Err(err)
3539 } else {
3540 Ok(prevs)
3541 }
3542 }
3543
3544 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3550 let mut deleted = Vec::new();
3551 self.for_values_mut(|p, k, v| {
3552 if f(k, v) {
3553 deleted.push((k.clone(), v.clone()));
3554 p.entry(k.clone()).or_default().push(TransactionUpdate {
3555 value: v.clone(),
3556 ts,
3557 diff: Diff::MINUS_ONE,
3558 });
3559 }
3560 });
3561 soft_assert_no_log!(self.verify().is_ok());
3562 deleted
3563 }
3564
3565 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3569 self.set(k, None, ts)
3570 .expect("deleting an entry cannot violate uniqueness")
3571 }
3572
3573 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3577 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3578 let prevs = self
3579 .set_many(kvs, ts)
3580 .expect("deleting entries cannot violate uniqueness");
3581 prevs
3582 .into_iter()
3583 .filter_map(|(k, v)| v.map(|v| (k, v)))
3584 .collect()
3585 }
3586}
3587
3588#[cfg(test)]
3589#[allow(clippy::unwrap_used)]
3590mod tests {
3591 use super::*;
3592
3593 use mz_ore::now::SYSTEM_TIME;
3594 use mz_ore::{assert_none, assert_ok};
3595 use mz_persist_client::cache::PersistClientCache;
3596 use mz_persist_types::PersistLocation;
3597 use semver::Version;
3598
3599 use crate::durable::{TestCatalogStateBuilder, test_bootstrap_args};
3600 use crate::memory;
3601
3602 #[mz_ore::test]
3603 fn test_table_transaction_simple() {
3604 fn uniqueness_violation(a: &String, b: &String) -> bool {
3605 a == b
3606 }
3607 let mut table = TableTransaction::new_with_uniqueness_fn(
3608 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3609 uniqueness_violation,
3610 )
3611 .unwrap();
3612
3613 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3616 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3617 assert!(
3618 table
3619 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3620 .is_err()
3621 );
3622 assert!(
3623 table
3624 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3625 .is_err()
3626 );
3627 }
3628
3629 #[mz_ore::test]
3630 fn test_table_transaction() {
3631 fn uniqueness_violation(a: &String, b: &String) -> bool {
3632 a == b
3633 }
3634 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3635
3636 fn commit(
3637 table: &mut BTreeMap<Vec<u8>, String>,
3638 mut pending: Vec<(Vec<u8>, String, Diff)>,
3639 ) {
3640 pending.sort_by(|a, b| a.2.cmp(&b.2));
3642 for (k, v, diff) in pending {
3643 if diff == Diff::MINUS_ONE {
3644 let prev = table.remove(&k);
3645 assert_eq!(prev, Some(v));
3646 } else if diff == Diff::ONE {
3647 let prev = table.insert(k, v);
3648 assert_eq!(prev, None);
3649 } else {
3650 panic!("unexpected diff: {diff}");
3651 }
3652 }
3653 }
3654
3655 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3656 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3657 let mut table_txn =
3658 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3659 assert_eq!(table_txn.items_cloned(), table);
3660 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3661 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3662 assert_eq!(
3663 table_txn.items_cloned(),
3664 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3665 );
3666 assert_eq!(
3667 table_txn
3668 .update(|_k, _v| Some("v3".to_string()), 2)
3669 .unwrap(),
3670 Diff::ONE
3671 );
3672
3673 table_txn
3675 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3676 .unwrap_err();
3677
3678 table_txn
3679 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3680 .unwrap();
3681 assert_eq!(
3682 table_txn.items_cloned(),
3683 BTreeMap::from([
3684 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3685 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3686 ])
3687 );
3688 let err = table_txn
3689 .update(|_k, _v| Some("v1".to_string()), 5)
3690 .unwrap_err();
3691 assert!(
3692 matches!(err, DurableCatalogError::UniquenessViolation),
3693 "unexpected err: {err:?}"
3694 );
3695 let pending = table_txn.pending();
3696 assert_eq!(
3697 pending,
3698 vec![
3699 (
3700 1i64.to_le_bytes().to_vec(),
3701 "v1".to_string(),
3702 Diff::MINUS_ONE
3703 ),
3704 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3705 (
3706 2i64.to_le_bytes().to_vec(),
3707 "v2".to_string(),
3708 Diff::MINUS_ONE
3709 ),
3710 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3711 ]
3712 );
3713 commit(&mut table, pending);
3714 assert_eq!(
3715 table,
3716 BTreeMap::from([
3717 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3718 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3719 ])
3720 );
3721
3722 let mut table_txn =
3723 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3724 assert_eq!(
3726 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3727 1
3728 );
3729 table_txn
3730 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3731 .unwrap();
3732 table_txn
3734 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3735 .unwrap_err();
3736 table_txn
3738 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3739 .unwrap_err();
3740 assert_eq!(
3741 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3742 1
3743 );
3744 table_txn
3746 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3747 .unwrap();
3748 table_txn
3749 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3750 .unwrap();
3751 let pending = table_txn.pending();
3752 assert_eq!(
3753 pending,
3754 vec![
3755 (
3756 1i64.to_le_bytes().to_vec(),
3757 "v3".to_string(),
3758 Diff::MINUS_ONE
3759 ),
3760 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3761 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3762 ]
3763 );
3764 commit(&mut table, pending);
3765 assert_eq!(
3766 table,
3767 BTreeMap::from([
3768 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3769 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3770 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3771 ])
3772 );
3773
3774 let mut table_txn =
3775 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3776 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3777 table_txn
3778 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3779 .unwrap();
3780
3781 commit(&mut table, table_txn.pending());
3782 assert_eq!(
3783 table,
3784 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3785 );
3786
3787 let mut table_txn =
3788 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3789 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3790 table_txn
3791 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3792 .unwrap();
3793 commit(&mut table, table_txn.pending());
3794 assert_eq!(
3795 table,
3796 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3797 );
3798
3799 let mut table_txn =
3801 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3802 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3803 table_txn
3804 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3805 .unwrap();
3806 table_txn
3807 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3808 .unwrap_err();
3809 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3810 table_txn
3811 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3812 .unwrap();
3813 commit(&mut table, table_txn.pending());
3814 assert_eq!(
3815 table.clone().into_iter().collect::<Vec<_>>(),
3816 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3817 );
3818
3819 let mut table_txn =
3821 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3822 table_txn
3824 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3825 .unwrap_err();
3826 table_txn
3827 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3828 .unwrap();
3829 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3830 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3831 let pending = table_txn.pending();
3832 assert_eq!(
3833 pending,
3834 vec![
3835 (
3836 1i64.to_le_bytes().to_vec(),
3837 "v5".to_string(),
3838 Diff::MINUS_ONE
3839 ),
3840 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3841 ]
3842 );
3843 commit(&mut table, pending);
3844 assert_eq!(
3845 table,
3846 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3847 );
3848
3849 let mut table_txn =
3851 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3852 table_txn
3853 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3854 .unwrap();
3855 let pending = table_txn.pending::<Vec<u8>, String>();
3856 assert!(pending.is_empty());
3857
3858 let mut table_txn =
3860 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3861 table_txn
3863 .set_many(
3864 BTreeMap::from([
3865 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3866 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3867 ]),
3868 0,
3869 )
3870 .unwrap_err();
3871 table_txn
3872 .set_many(
3873 BTreeMap::from([
3874 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3875 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3876 ]),
3877 1,
3878 )
3879 .unwrap();
3880 table_txn
3881 .set_many(
3882 BTreeMap::from([
3883 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3884 (3i64.to_le_bytes().to_vec(), None),
3885 ]),
3886 2,
3887 )
3888 .unwrap();
3889 let pending = table_txn.pending();
3890 assert_eq!(
3891 pending,
3892 vec![
3893 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3894 (
3895 3i64.to_le_bytes().to_vec(),
3896 "v6".to_string(),
3897 Diff::MINUS_ONE
3898 ),
3899 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3900 ]
3901 );
3902 commit(&mut table, pending);
3903 assert_eq!(
3904 table,
3905 BTreeMap::from([
3906 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3907 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3908 ])
3909 );
3910
3911 let mut table_txn =
3913 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3914 table_txn
3915 .set_many(
3916 BTreeMap::from([
3917 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3918 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3919 ]),
3920 0,
3921 )
3922 .unwrap();
3923 let pending = table_txn.pending::<Vec<u8>, String>();
3924 assert!(pending.is_empty());
3925 commit(&mut table, pending);
3926 assert_eq!(
3927 table,
3928 BTreeMap::from([
3929 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3930 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3931 ])
3932 );
3933
3934 let mut table_txn =
3936 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3937 table_txn
3939 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3940 .unwrap_err();
3941 assert!(
3942 table_txn
3943 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3944 .unwrap()
3945 );
3946 assert!(
3947 !table_txn
3948 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3949 .unwrap()
3950 );
3951 let pending = table_txn.pending();
3952 assert_eq!(
3953 pending,
3954 vec![
3955 (
3956 1i64.to_le_bytes().to_vec(),
3957 "v6".to_string(),
3958 Diff::MINUS_ONE
3959 ),
3960 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3961 ]
3962 );
3963 commit(&mut table, pending);
3964 assert_eq!(
3965 table,
3966 BTreeMap::from([
3967 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3968 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3969 ])
3970 );
3971
3972 let mut table_txn =
3974 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3975 assert!(
3976 table_txn
3977 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3978 .unwrap()
3979 );
3980 let pending = table_txn.pending::<Vec<u8>, String>();
3981 assert!(pending.is_empty());
3982 commit(&mut table, pending);
3983 assert_eq!(
3984 table,
3985 BTreeMap::from([
3986 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3987 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3988 ])
3989 );
3990
3991 let mut table_txn =
3993 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3994 table_txn
3996 .update_by_keys(
3997 [
3998 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3999 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
4000 ],
4001 0,
4002 )
4003 .unwrap_err();
4004 let n = table_txn
4005 .update_by_keys(
4006 [
4007 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4008 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
4009 ],
4010 1,
4011 )
4012 .unwrap();
4013 assert_eq!(n, Diff::ONE);
4014 let n = table_txn
4015 .update_by_keys(
4016 [
4017 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
4018 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
4019 ],
4020 2,
4021 )
4022 .unwrap();
4023 assert_eq!(n, Diff::ZERO);
4024 let pending = table_txn.pending();
4025 assert_eq!(
4026 pending,
4027 vec![
4028 (
4029 1i64.to_le_bytes().to_vec(),
4030 "v8".to_string(),
4031 Diff::MINUS_ONE
4032 ),
4033 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
4034 ]
4035 );
4036 commit(&mut table, pending);
4037 assert_eq!(
4038 table,
4039 BTreeMap::from([
4040 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4041 (42i64.to_le_bytes().to_vec(), "v7".to_string())
4042 ])
4043 );
4044
4045 let mut table_txn =
4047 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4048 let n = table_txn
4049 .update_by_keys(
4050 [
4051 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4052 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
4053 ],
4054 0,
4055 )
4056 .unwrap();
4057 assert_eq!(n, Diff::from(2));
4058 let pending = table_txn.pending::<Vec<u8>, String>();
4059 assert!(pending.is_empty());
4060 commit(&mut table, pending);
4061 assert_eq!(
4062 table,
4063 BTreeMap::from([
4064 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4065 (42i64.to_le_bytes().to_vec(), "v7".to_string())
4066 ])
4067 );
4068
4069 let mut table_txn =
4071 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4072 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
4073 assert_eq!(prev, Some("v9".to_string()));
4074 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
4075 assert_none!(prev);
4076 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
4077 assert_none!(prev);
4078 let pending = table_txn.pending();
4079 assert_eq!(
4080 pending,
4081 vec![(
4082 1i64.to_le_bytes().to_vec(),
4083 "v9".to_string(),
4084 Diff::MINUS_ONE
4085 ),]
4086 );
4087 commit(&mut table, pending);
4088 assert_eq!(
4089 table,
4090 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
4091 );
4092
4093 let mut table_txn =
4095 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4096 let prevs = table_txn.delete_by_keys(
4097 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4098 0,
4099 );
4100 assert_eq!(
4101 prevs,
4102 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
4103 );
4104 let prevs = table_txn.delete_by_keys(
4105 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4106 1,
4107 );
4108 assert_eq!(prevs, vec![]);
4109 let prevs = table_txn.delete_by_keys(
4110 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4111 2,
4112 );
4113 assert_eq!(prevs, vec![]);
4114 let pending = table_txn.pending();
4115 assert_eq!(
4116 pending,
4117 vec![(
4118 42i64.to_le_bytes().to_vec(),
4119 "v7".to_string(),
4120 Diff::MINUS_ONE
4121 ),]
4122 );
4123 commit(&mut table, pending);
4124 assert_eq!(table, BTreeMap::new());
4125 }
4126
4127 #[mz_ore::test(tokio::test)]
4128 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
4130 const VERSION: Version = Version::new(26, 0, 0);
4131 let mut persist_cache = PersistClientCache::new_no_metrics();
4132 persist_cache.cfg.build_version = VERSION;
4133 let persist_client = persist_cache
4134 .open(PersistLocation::new_in_mem())
4135 .await
4136 .unwrap();
4137 let state_builder = TestCatalogStateBuilder::new(persist_client)
4138 .with_default_deploy_generation()
4139 .with_version(VERSION);
4140
4141 let _ = state_builder
4143 .clone()
4144 .unwrap_build()
4145 .await
4146 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4147 .await
4148 .unwrap()
4149 .0;
4150 let mut savepoint_state = state_builder
4151 .unwrap_build()
4152 .await
4153 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4154 .await
4155 .unwrap()
4156 .0;
4157
4158 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4159 assert!(!initial_snapshot.is_empty());
4160
4161 let db_name = "db";
4162 let db_owner = RoleId::User(42);
4163 let db_privileges = Vec::new();
4164 let mut txn = savepoint_state.transaction().await.unwrap();
4165 let (db_id, db_oid) = txn
4166 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4167 .unwrap();
4168 let commit_ts = txn.upper();
4169 txn.commit_internal(commit_ts).await.unwrap();
4170 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4171 let update = updates.into_element();
4172
4173 assert_eq!(update.diff, StateDiff::Addition);
4174
4175 let db = match update.kind {
4176 memory::objects::StateUpdateKind::Database(db) => db,
4177 update => panic!("unexpected update: {update:?}"),
4178 };
4179
4180 assert_eq!(db_id, db.id);
4181 assert_eq!(db_oid, db.oid);
4182 assert_eq!(db_name, db.name);
4183 assert_eq!(db_owner, db.owner_id);
4184 assert_eq!(db_privileges, db.privileges);
4185 }
4186
4187 #[mz_ore::test]
4188 fn test_allocate_introspection_source_index_id() {
4189 let cluster_variant: u8 = 0b0000_0001;
4190 let cluster_id_inner: u64 =
4191 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4192 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4193
4194 let cluster_id = ClusterId::System(cluster_id_inner);
4195 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4196
4197 let introspection_source_index_id: u64 =
4198 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4199
4200 {
4202 let mut cluster_variant_mask = 0xFF << 56;
4203 cluster_variant_mask &= introspection_source_index_id;
4204 cluster_variant_mask >>= 56;
4205 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4206 }
4207
4208 {
4210 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4211 cluster_id_inner_mask &= introspection_source_index_id;
4212 cluster_id_inner_mask >>= 8;
4213 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4214 }
4215
4216 {
4218 let mut log_variant_mask = 0xFF;
4219 log_variant_mask &= introspection_source_index_id;
4220 assert_eq!(
4221 log_variant_mask,
4222 u64::from(timely_messages_received_log_variant)
4223 );
4224 }
4225
4226 let (catalog_item_id, global_id) =
4227 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4228
4229 assert_eq!(
4230 catalog_item_id,
4231 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4232 );
4233 assert_eq!(
4234 global_id,
4235 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4236 );
4237 }
4238}