1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12use std::num::NonZeroU32;
13use std::time::Duration;
14
15use anyhow::anyhow;
16use derivative::Derivative;
17use itertools::Itertools;
18use mz_audit_log::VersionedEvent;
19use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::cast::{u64_to_usize, usize_to_u64};
22use mz_ore::collections::{CollectionExt, HashSet};
23use mz_ore::now::SYSTEM_TIME;
24use mz_ore::vec::VecExt;
25use mz_ore::{soft_assert_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_persist_types::ShardId;
27use mz_pgrepr::oid::FIRST_USER_OID;
28use mz_proto::{RustType, TryFromProtoError};
29use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
30use mz_repr::network_policy_id::NetworkPolicyId;
31use mz_repr::role_id::RoleId;
32use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion};
33use mz_sql::catalog::{
34 CatalogError as SqlCatalogError, CatalogItemType, ObjectType, PasswordAction,
35 RoleAttributesRaw, RoleMembership, RoleVars,
36};
37use mz_sql::names::{CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId};
38use mz_sql::plan::NetworkPolicyRule;
39use mz_sql_parser::ast::QualifiedReplica;
40use mz_storage_client::controller::StorageTxn;
41use mz_storage_types::controller::StorageError;
42use tracing::warn;
43
44use crate::builtin::BuiltinLog;
45use crate::durable::initialize::{
46 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY,
47 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
48};
49use crate::durable::objects::serialization::proto;
50use crate::durable::objects::{
51 AuditLogKey, Cluster, ClusterConfig, ClusterIntrospectionSourceIndexKey,
52 ClusterIntrospectionSourceIndexValue, ClusterKey, ClusterReplica, ClusterReplicaKey,
53 ClusterReplicaValue, ClusterSystemConfiguration, ClusterSystemConfigurationKey,
54 ClusterSystemConfigurationValue, ClusterValue, CommentKey, CommentValue, Config, ConfigKey,
55 ConfigValue, Database, DatabaseKey, DatabaseValue, DefaultPrivilegesKey,
56 DefaultPrivilegesValue, DurableType, GidMappingKey, GidMappingValue, IdAllocKey, IdAllocValue,
57 IntrospectionSourceIndex, Item, ItemKey, ItemValue, NetworkPolicyKey, NetworkPolicyValue,
58 ReplicaConfig, ReplicaSystemConfiguration, ReplicaSystemConfigurationKey,
59 ReplicaSystemConfigurationValue, Role, RoleKey, RoleValue, Schema, SchemaKey, SchemaValue,
60 ServerConfigurationKey, ServerConfigurationValue, SettingKey, SettingValue, SourceReference,
61 SourceReferencesKey, SourceReferencesValue, StorageCollectionMetadataKey,
62 StorageCollectionMetadataValue, SystemObjectDescription, SystemObjectMapping,
63 SystemPrivilegesKey, SystemPrivilegesValue, TxnWalShardValue, UnfinalizedShardKey,
64};
65use crate::durable::{
66 AUDIT_LOG_ID_ALLOC_KEY, BUILTIN_MIGRATION_SHARD_KEY, CATALOG_CONTENT_VERSION_KEY, CatalogError,
67 DATABASE_ID_ALLOC_KEY, DefaultPrivilege, DurableCatalogError, DurableCatalogState,
68 EXPRESSION_CACHE_SHARD_KEY, MOCK_AUTHENTICATION_NONCE_KEY, NetworkPolicy, OID_ALLOC_KEY,
69 SCHEMA_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY,
70 SYSTEM_REPLICA_ID_ALLOC_KEY, Snapshot, SystemConfiguration, USER_ITEM_ALLOC_KEY,
71 USER_NETWORK_POLICY_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY,
72};
73use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind};
74
75type Timestamp = u64;
76
77#[derive(Derivative)]
80#[derivative(Debug)]
81pub struct Transaction<'a> {
82 #[derivative(Debug = "ignore")]
83 #[derivative(PartialEq = "ignore")]
84 durable_catalog: &'a mut dyn DurableCatalogState,
85 databases: TableTransaction<DatabaseKey, DatabaseValue>,
86 schemas: TableTransaction<SchemaKey, SchemaValue>,
87 items: TableTransaction<ItemKey, ItemValue>,
88 comments: TableTransaction<CommentKey, CommentValue>,
89 roles: TableTransaction<RoleKey, RoleValue>,
90 role_auth: TableTransaction<RoleAuthKey, RoleAuthValue>,
91 clusters: TableTransaction<ClusterKey, ClusterValue>,
92 cluster_replicas: TableTransaction<ClusterReplicaKey, ClusterReplicaValue>,
93 introspection_sources:
94 TableTransaction<ClusterIntrospectionSourceIndexKey, ClusterIntrospectionSourceIndexValue>,
95 id_allocator: TableTransaction<IdAllocKey, IdAllocValue>,
96 configs: TableTransaction<ConfigKey, ConfigValue>,
97 settings: TableTransaction<SettingKey, SettingValue>,
98 system_gid_mapping: TableTransaction<GidMappingKey, GidMappingValue>,
99 system_configurations: TableTransaction<ServerConfigurationKey, ServerConfigurationValue>,
100 cluster_system_configurations:
101 TableTransaction<ClusterSystemConfigurationKey, ClusterSystemConfigurationValue>,
102 replica_system_configurations:
103 TableTransaction<ReplicaSystemConfigurationKey, ReplicaSystemConfigurationValue>,
104 default_privileges: TableTransaction<DefaultPrivilegesKey, DefaultPrivilegesValue>,
105 source_references: TableTransaction<SourceReferencesKey, SourceReferencesValue>,
106 system_privileges: TableTransaction<SystemPrivilegesKey, SystemPrivilegesValue>,
107 network_policies: TableTransaction<NetworkPolicyKey, NetworkPolicyValue>,
108 storage_collection_metadata:
109 TableTransaction<StorageCollectionMetadataKey, StorageCollectionMetadataValue>,
110 unfinalized_shards: TableTransaction<UnfinalizedShardKey, ()>,
111 txn_wal_shard: TableTransaction<(), TxnWalShardValue>,
112 audit_log_updates: Vec<(AuditLogKey, Diff, Timestamp)>,
115 upper: mz_repr::Timestamp,
117 op_id: Timestamp,
119}
120
121impl<'a> Transaction<'a> {
122 pub fn new(
123 durable_catalog: &'a mut dyn DurableCatalogState,
124 Snapshot {
125 databases,
126 schemas,
127 roles,
128 role_auth,
129 items,
130 comments,
131 clusters,
132 network_policies,
133 cluster_replicas,
134 introspection_sources,
135 id_allocator,
136 configs,
137 settings,
138 source_references,
139 system_object_mappings,
140 system_configurations,
141 cluster_system_configurations,
142 replica_system_configurations,
143 default_privileges,
144 system_privileges,
145 storage_collection_metadata,
146 unfinalized_shards,
147 txn_wal_shard,
148 }: Snapshot,
149 upper: mz_repr::Timestamp,
150 ) -> Result<Transaction<'a>, CatalogError> {
151 Ok(Transaction {
152 durable_catalog,
153 databases: TableTransaction::new_with_uniqueness_fn(
154 databases,
155 |a: &DatabaseValue, b| a.name == b.name,
156 )?,
157 schemas: TableTransaction::new_with_uniqueness_fn(schemas, |a: &SchemaValue, b| {
158 a.database_id == b.database_id && a.name == b.name
159 })?,
160 items: TableTransaction::new_with_uniqueness_fn(items, |a: &ItemValue, b| {
161 a.schema_id == b.schema_id && a.name == b.name && {
162 let a_type = a.item_type();
164 let b_type = b.item_type();
165 (a_type != CatalogItemType::Type && b_type != CatalogItemType::Type)
166 || (a_type == CatalogItemType::Type && b_type.conflicts_with_type())
167 || (b_type == CatalogItemType::Type && a_type.conflicts_with_type())
168 }
169 })?,
170 comments: TableTransaction::new(comments)?,
171 roles: TableTransaction::new_with_uniqueness_fn(roles, |a: &RoleValue, b| {
172 a.name == b.name
173 })?,
174 role_auth: TableTransaction::new(role_auth)?,
175 clusters: TableTransaction::new_with_uniqueness_fn(clusters, |a: &ClusterValue, b| {
176 a.name == b.name
177 })?,
178 network_policies: TableTransaction::new_with_uniqueness_fn(
179 network_policies,
180 |a: &NetworkPolicyValue, b| a.name == b.name,
181 )?,
182 cluster_replicas: TableTransaction::new_with_uniqueness_fn(
183 cluster_replicas,
184 |a: &ClusterReplicaValue, b| a.cluster_id == b.cluster_id && a.name == b.name,
185 )?,
186 introspection_sources: TableTransaction::new(introspection_sources)?,
187 id_allocator: TableTransaction::new(id_allocator)?,
188 configs: TableTransaction::new(configs)?,
189 settings: TableTransaction::new(settings)?,
190 source_references: TableTransaction::new(source_references)?,
191 system_gid_mapping: TableTransaction::new(system_object_mappings)?,
192 system_configurations: TableTransaction::new(system_configurations)?,
193 cluster_system_configurations: TableTransaction::new(cluster_system_configurations)?,
194 replica_system_configurations: TableTransaction::new(replica_system_configurations)?,
195 default_privileges: TableTransaction::new(default_privileges)?,
196 system_privileges: TableTransaction::new(system_privileges)?,
197 storage_collection_metadata: TableTransaction::new(storage_collection_metadata)?,
198 unfinalized_shards: TableTransaction::new(unfinalized_shards)?,
199 txn_wal_shard: TableTransaction::new(txn_wal_shard)?,
203 audit_log_updates: Vec::new(),
204 upper,
205 op_id: 0,
206 })
207 }
208
209 pub fn get_item(&self, id: &CatalogItemId) -> Option<Item> {
210 let key = ItemKey { id: *id };
211 self.items
212 .get(&key)
213 .map(|v| DurableType::from_key_value(key, v.clone()))
214 }
215
216 pub fn get_items(&self) -> impl Iterator<Item = Item> + use<> {
217 self.items
218 .items()
219 .into_iter()
220 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
221 .sorted_by_key(|Item { id, .. }| *id)
222 }
223
224 pub fn insert_audit_log_event(&mut self, event: VersionedEvent) {
225 self.insert_audit_log_events([event]);
226 }
227
228 pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
229 let events = events
230 .into_iter()
231 .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
232 self.audit_log_updates.extend(events);
233 }
234
235 pub fn insert_user_database(
236 &mut self,
237 database_name: &str,
238 owner_id: RoleId,
239 privileges: Vec<MzAclItem>,
240 temporary_oids: &HashSet<u32>,
241 ) -> Result<(DatabaseId, u32), CatalogError> {
242 let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?;
243 let id = DatabaseId::User(id);
244 let oid = self.allocate_oid(temporary_oids)?;
245 self.insert_database(id, database_name, owner_id, privileges, oid)?;
246 Ok((id, oid))
247 }
248
249 pub(crate) fn insert_database(
250 &mut self,
251 id: DatabaseId,
252 database_name: &str,
253 owner_id: RoleId,
254 privileges: Vec<MzAclItem>,
255 oid: u32,
256 ) -> Result<u32, CatalogError> {
257 match self.databases.insert(
258 DatabaseKey { id },
259 DatabaseValue {
260 name: database_name.to_string(),
261 owner_id,
262 privileges,
263 oid,
264 },
265 self.op_id,
266 ) {
267 Ok(_) => Ok(oid),
268 Err(_) => Err(SqlCatalogError::DatabaseAlreadyExists(database_name.to_owned()).into()),
269 }
270 }
271
272 pub fn insert_user_schema(
273 &mut self,
274 database_id: DatabaseId,
275 schema_name: &str,
276 owner_id: RoleId,
277 privileges: Vec<MzAclItem>,
278 temporary_oids: &HashSet<u32>,
279 ) -> Result<(SchemaId, u32), CatalogError> {
280 let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?;
281 let id = SchemaId::User(id);
282 let oid = self.allocate_oid(temporary_oids)?;
283 self.insert_schema(
284 id,
285 Some(database_id),
286 schema_name.to_string(),
287 owner_id,
288 privileges,
289 oid,
290 )?;
291 Ok((id, oid))
292 }
293
294 pub fn insert_system_schema(
295 &mut self,
296 schema_id: u64,
297 schema_name: &str,
298 owner_id: RoleId,
299 privileges: Vec<MzAclItem>,
300 oid: u32,
301 ) -> Result<(), CatalogError> {
302 let id = SchemaId::System(schema_id);
303 self.insert_schema(id, None, schema_name.to_string(), owner_id, privileges, oid)
304 }
305
306 pub(crate) fn insert_schema(
307 &mut self,
308 schema_id: SchemaId,
309 database_id: Option<DatabaseId>,
310 schema_name: String,
311 owner_id: RoleId,
312 privileges: Vec<MzAclItem>,
313 oid: u32,
314 ) -> Result<(), CatalogError> {
315 match self.schemas.insert(
316 SchemaKey { id: schema_id },
317 SchemaValue {
318 database_id,
319 name: schema_name.clone(),
320 owner_id,
321 privileges,
322 oid,
323 },
324 self.op_id,
325 ) {
326 Ok(_) => Ok(()),
327 Err(_) => Err(SqlCatalogError::SchemaAlreadyExists(schema_name).into()),
328 }
329 }
330
331 pub fn insert_builtin_role(
332 &mut self,
333 id: RoleId,
334 name: String,
335 attributes: RoleAttributesRaw,
336 membership: RoleMembership,
337 vars: RoleVars,
338 oid: u32,
339 ) -> Result<RoleId, CatalogError> {
340 soft_assert_or_log!(id.is_builtin(), "ID {id:?} is not builtin");
341 self.insert_role(id, name, attributes, membership, vars, oid)?;
342 Ok(id)
343 }
344
345 pub fn insert_user_role(
346 &mut self,
347 name: String,
348 attributes: RoleAttributesRaw,
349 membership: RoleMembership,
350 vars: RoleVars,
351 temporary_oids: &HashSet<u32>,
352 ) -> Result<(RoleId, u32), CatalogError> {
353 let id = self.get_and_increment_id(USER_ROLE_ID_ALLOC_KEY.to_string())?;
354 let id = RoleId::User(id);
355 let oid = self.allocate_oid(temporary_oids)?;
356 self.insert_role(id, name, attributes, membership, vars, oid)?;
357 Ok((id, oid))
358 }
359
360 fn insert_role(
361 &mut self,
362 id: RoleId,
363 name: String,
364 attributes: RoleAttributesRaw,
365 membership: RoleMembership,
366 vars: RoleVars,
367 oid: u32,
368 ) -> Result<(), CatalogError> {
369 if let Some(ref password) = attributes.password {
370 let hash = mz_auth::hash::scram256_hash(
371 password,
372 &attributes
373 .scram_iterations
374 .or_else(|| {
375 soft_panic_or_log!(
376 "Hash iterations must be set if a password is provided."
377 );
378 None
379 })
380 .unwrap_or_else(|| NonZeroU32::new(600_000).expect("known valid")),
383 )
384 .expect("password hash should be valid");
385 match self.role_auth.insert(
386 RoleAuthKey { role_id: id },
387 RoleAuthValue {
388 password_hash: Some(hash),
389 updated_at: SYSTEM_TIME(),
390 },
391 self.op_id,
392 ) {
393 Ok(_) => {}
394 Err(_) => {
395 return Err(SqlCatalogError::RoleAlreadyExists(name).into());
396 }
397 }
398 }
399
400 match self.roles.insert(
401 RoleKey { id },
402 RoleValue {
403 name: name.clone(),
404 attributes: attributes.into(),
405 membership,
406 vars,
407 oid,
408 },
409 self.op_id,
410 ) {
411 Ok(_) => Ok(()),
412 Err(_) => Err(SqlCatalogError::RoleAlreadyExists(name).into()),
413 }
414 }
415
416 pub fn insert_user_cluster(
418 &mut self,
419 cluster_id: ClusterId,
420 cluster_name: &str,
421 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
422 owner_id: RoleId,
423 privileges: Vec<MzAclItem>,
424 config: ClusterConfig,
425 temporary_oids: &HashSet<u32>,
426 ) -> Result<(), CatalogError> {
427 self.insert_cluster(
428 cluster_id,
429 cluster_name,
430 introspection_source_indexes,
431 owner_id,
432 privileges,
433 config,
434 temporary_oids,
435 )
436 }
437
438 pub fn insert_system_cluster(
440 &mut self,
441 cluster_name: &str,
442 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
443 privileges: Vec<MzAclItem>,
444 owner_id: RoleId,
445 config: ClusterConfig,
446 temporary_oids: &HashSet<u32>,
447 ) -> Result<ClusterId, CatalogError> {
448 let cluster_id = self.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
449 let cluster_id = ClusterId::system(cluster_id).ok_or(SqlCatalogError::IdExhaustion)?;
450 self.insert_cluster(
451 cluster_id,
452 cluster_name,
453 introspection_source_indexes,
454 owner_id,
455 privileges,
456 config,
457 temporary_oids,
458 )?;
459 Ok(cluster_id)
460 }
461
462 fn insert_cluster(
463 &mut self,
464 cluster_id: ClusterId,
465 cluster_name: &str,
466 introspection_source_indexes: Vec<(&'static BuiltinLog, CatalogItemId, GlobalId)>,
467 owner_id: RoleId,
468 privileges: Vec<MzAclItem>,
469 config: ClusterConfig,
470 temporary_oids: &HashSet<u32>,
471 ) -> Result<(), CatalogError> {
472 if let Err(_) = self.clusters.insert(
473 ClusterKey { id: cluster_id },
474 ClusterValue {
475 name: cluster_name.to_string(),
476 owner_id,
477 privileges,
478 config,
479 },
480 self.op_id,
481 ) {
482 return Err(SqlCatalogError::ClusterAlreadyExists(cluster_name.to_owned()).into());
483 };
484
485 let amount = usize_to_u64(introspection_source_indexes.len());
486 let oids = self.allocate_oids(amount, temporary_oids)?;
487 let introspection_source_indexes: Vec<_> = introspection_source_indexes
488 .into_iter()
489 .zip_eq(oids)
490 .map(|((builtin, item_id, index_id), oid)| (builtin, item_id, index_id, oid))
491 .collect();
492 for (builtin, item_id, index_id, oid) in introspection_source_indexes {
493 let introspection_source_index = IntrospectionSourceIndex {
494 cluster_id,
495 name: builtin.name.to_string(),
496 item_id,
497 index_id,
498 oid,
499 };
500 let (key, value) = introspection_source_index.into_key_value();
501 self.introspection_sources
502 .insert(key, value, self.op_id)
503 .expect("no uniqueness violation");
504 }
505
506 Ok(())
507 }
508
509 pub fn rename_cluster(
510 &mut self,
511 cluster_id: ClusterId,
512 cluster_name: &str,
513 cluster_to_name: &str,
514 ) -> Result<(), CatalogError> {
515 let key = ClusterKey { id: cluster_id };
516
517 match self.clusters.update(
518 |k, v| {
519 if *k == key {
520 let mut value = v.clone();
521 value.name = cluster_to_name.to_string();
522 Some(value)
523 } else {
524 None
525 }
526 },
527 self.op_id,
528 )? {
529 Diff::ZERO => Err(SqlCatalogError::UnknownCluster(cluster_name.to_string()).into()),
530 Diff::ONE => Ok(()),
531 n => panic!(
532 "Expected to update single cluster {cluster_name} ({cluster_id}), updated {n}"
533 ),
534 }
535 }
536
537 pub fn rename_cluster_replica(
538 &mut self,
539 replica_id: ReplicaId,
540 replica_name: &QualifiedReplica,
541 replica_to_name: &str,
542 ) -> Result<(), CatalogError> {
543 let key = ClusterReplicaKey { id: replica_id };
544
545 match self.cluster_replicas.update(
546 |k, v| {
547 if *k == key {
548 let mut value = v.clone();
549 value.name = replica_to_name.to_string();
550 Some(value)
551 } else {
552 None
553 }
554 },
555 self.op_id,
556 )? {
557 Diff::ZERO => {
558 Err(SqlCatalogError::UnknownClusterReplica(replica_name.to_string()).into())
559 }
560 Diff::ONE => Ok(()),
561 n => panic!(
562 "Expected to update single cluster replica {replica_name} ({replica_id}), updated {n}"
563 ),
564 }
565 }
566
567 pub fn insert_cluster_replica_with_id(
568 &mut self,
569 cluster_id: ClusterId,
570 replica_id: ReplicaId,
571 replica_name: &str,
572 config: ReplicaConfig,
573 owner_id: RoleId,
574 ) -> Result<(), CatalogError> {
575 if let Err(_) = self.cluster_replicas.insert(
576 ClusterReplicaKey { id: replica_id },
577 ClusterReplicaValue {
578 cluster_id,
579 name: replica_name.into(),
580 config,
581 owner_id,
582 },
583 self.op_id,
584 ) {
585 let cluster = self
586 .clusters
587 .get(&ClusterKey { id: cluster_id })
588 .expect("cluster exists");
589 return Err(SqlCatalogError::DuplicateReplica(
590 replica_name.to_string(),
591 cluster.name.to_string(),
592 )
593 .into());
594 };
595 Ok(())
596 }
597
598 pub fn insert_user_network_policy(
599 &mut self,
600 name: String,
601 rules: Vec<NetworkPolicyRule>,
602 privileges: Vec<MzAclItem>,
603 owner_id: RoleId,
604 temporary_oids: &HashSet<u32>,
605 ) -> Result<NetworkPolicyId, CatalogError> {
606 let oid = self.allocate_oid(temporary_oids)?;
607 let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?;
608 let id = NetworkPolicyId::User(id);
609 self.insert_network_policy(id, name, rules, privileges, owner_id, oid)
610 }
611
612 pub fn insert_network_policy(
613 &mut self,
614 id: NetworkPolicyId,
615 name: String,
616 rules: Vec<NetworkPolicyRule>,
617 privileges: Vec<MzAclItem>,
618 owner_id: RoleId,
619 oid: u32,
620 ) -> Result<NetworkPolicyId, CatalogError> {
621 match self.network_policies.insert(
622 NetworkPolicyKey { id },
623 NetworkPolicyValue {
624 name: name.clone(),
625 rules,
626 privileges,
627 owner_id,
628 oid,
629 },
630 self.op_id,
631 ) {
632 Ok(_) => Ok(id),
633 Err(_) => Err(SqlCatalogError::NetworkPolicyAlreadyExists(name).into()),
634 }
635 }
636
637 pub fn update_introspection_source_index_gids(
642 &mut self,
643 mappings: impl Iterator<
644 Item = (
645 ClusterId,
646 impl Iterator<Item = (String, CatalogItemId, GlobalId, u32)>,
647 ),
648 >,
649 ) -> Result<(), CatalogError> {
650 for (cluster_id, updates) in mappings {
651 for (name, item_id, index_id, oid) in updates {
652 let introspection_source_index = IntrospectionSourceIndex {
653 cluster_id,
654 name,
655 item_id,
656 index_id,
657 oid,
658 };
659 let (key, value) = introspection_source_index.into_key_value();
660
661 let prev = self
662 .introspection_sources
663 .set(key, Some(value), self.op_id)?;
664 if prev.is_none() {
665 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(format!(
666 "{index_id}"
667 ))
668 .into());
669 }
670 }
671 }
672 Ok(())
673 }
674
675 pub fn insert_user_item(
676 &mut self,
677 id: CatalogItemId,
678 global_id: GlobalId,
679 schema_id: SchemaId,
680 item_name: &str,
681 create_sql: String,
682 owner_id: RoleId,
683 privileges: Vec<MzAclItem>,
684 temporary_oids: &HashSet<u32>,
685 versions: BTreeMap<RelationVersion, GlobalId>,
686 ) -> Result<u32, CatalogError> {
687 let oid = self.allocate_oid(temporary_oids)?;
688 self.insert_item(
689 id, oid, global_id, schema_id, item_name, create_sql, owner_id, privileges, versions,
690 )?;
691 Ok(oid)
692 }
693
694 pub fn insert_item(
695 &mut self,
696 id: CatalogItemId,
697 oid: u32,
698 global_id: GlobalId,
699 schema_id: SchemaId,
700 item_name: &str,
701 create_sql: String,
702 owner_id: RoleId,
703 privileges: Vec<MzAclItem>,
704 extra_versions: BTreeMap<RelationVersion, GlobalId>,
705 ) -> Result<(), CatalogError> {
706 match self.items.insert(
707 ItemKey { id },
708 ItemValue {
709 schema_id,
710 name: item_name.to_string(),
711 create_sql,
712 owner_id,
713 privileges,
714 oid,
715 global_id,
716 extra_versions,
717 },
718 self.op_id,
719 ) {
720 Ok(_) => Ok(()),
721 Err(_) => Err(SqlCatalogError::ItemAlreadyExists(id, item_name.to_owned()).into()),
722 }
723 }
724
725 pub fn get_and_increment_id(&mut self, key: String) -> Result<u64, CatalogError> {
726 Ok(self.get_and_increment_id_by(key, 1)?.into_element())
727 }
728
729 pub fn get_and_increment_id_by(
730 &mut self,
731 key: String,
732 amount: u64,
733 ) -> Result<Vec<u64>, CatalogError> {
734 assert!(
735 key != SYSTEM_ITEM_ALLOC_KEY || !self.durable_catalog.is_bootstrap_complete(),
736 "system item IDs cannot be allocated outside of bootstrap"
737 );
738
739 let current_id = self
740 .id_allocator
741 .items()
742 .get(&IdAllocKey { name: key.clone() })
743 .unwrap_or_else(|| panic!("{key} id allocator missing"))
744 .next_id;
745 let next_id = current_id
746 .checked_add(amount)
747 .ok_or(SqlCatalogError::IdExhaustion)?;
748 let prev = self.id_allocator.set(
749 IdAllocKey { name: key },
750 Some(IdAllocValue { next_id }),
751 self.op_id,
752 )?;
753 assert_eq!(
754 prev,
755 Some(IdAllocValue {
756 next_id: current_id
757 })
758 );
759 Ok((current_id..next_id).collect())
760 }
761
762 pub fn allocate_system_item_ids(
763 &mut self,
764 amount: u64,
765 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
766 assert!(
767 !self.durable_catalog.is_bootstrap_complete(),
768 "we can only allocate system item IDs during bootstrap"
769 );
770 Ok(self
771 .get_and_increment_id_by(SYSTEM_ITEM_ALLOC_KEY.to_string(), amount)?
772 .into_iter()
773 .map(|x| (CatalogItemId::System(x), GlobalId::System(x)))
775 .collect())
776 }
777
778 pub fn allocate_introspection_source_index_id(
810 cluster_id: &ClusterId,
811 log_variant: LogVariant,
812 ) -> (CatalogItemId, GlobalId) {
813 let cluster_variant: u8 = match cluster_id {
814 ClusterId::System(_) => 1,
815 ClusterId::User(_) => 2,
816 };
817 let cluster_id: u64 = cluster_id.inner_id();
818 const CLUSTER_ID_MASK: u64 = 0xFFFF << 48;
819 assert_eq!(
820 CLUSTER_ID_MASK & cluster_id,
821 0,
822 "invalid cluster ID: {cluster_id}"
823 );
824 let log_variant: u8 = match log_variant {
825 LogVariant::Timely(TimelyLog::Operates) => 1,
826 LogVariant::Timely(TimelyLog::Channels) => 2,
827 LogVariant::Timely(TimelyLog::Elapsed) => 3,
828 LogVariant::Timely(TimelyLog::Histogram) => 4,
829 LogVariant::Timely(TimelyLog::Addresses) => 5,
830 LogVariant::Timely(TimelyLog::Parks) => 6,
831 LogVariant::Timely(TimelyLog::MessagesSent) => 7,
832 LogVariant::Timely(TimelyLog::MessagesReceived) => 8,
833 LogVariant::Timely(TimelyLog::Reachability) => 9,
834 LogVariant::Timely(TimelyLog::BatchesSent) => 10,
835 LogVariant::Timely(TimelyLog::BatchesReceived) => 11,
836 LogVariant::Differential(DifferentialLog::ArrangementBatches) => 12,
837 LogVariant::Differential(DifferentialLog::ArrangementRecords) => 13,
838 LogVariant::Differential(DifferentialLog::Sharing) => 14,
839 LogVariant::Differential(DifferentialLog::BatcherRecords) => 15,
840 LogVariant::Differential(DifferentialLog::BatcherSize) => 16,
841 LogVariant::Differential(DifferentialLog::BatcherCapacity) => 17,
842 LogVariant::Differential(DifferentialLog::BatcherAllocations) => 18,
843 LogVariant::Compute(ComputeLog::DataflowCurrent) => 19,
844 LogVariant::Compute(ComputeLog::FrontierCurrent) => 20,
845 LogVariant::Compute(ComputeLog::PeekCurrent) => 21,
846 LogVariant::Compute(ComputeLog::PeekDuration) => 22,
847 LogVariant::Compute(ComputeLog::ImportFrontierCurrent) => 23,
848 LogVariant::Compute(ComputeLog::ArrangementHeapSize) => 24,
849 LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) => 25,
850 LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => 26,
851 LogVariant::Compute(ComputeLog::ErrorCount) => 28,
852 LogVariant::Compute(ComputeLog::HydrationTime) => 29,
853 LogVariant::Compute(ComputeLog::LirMapping) => 30,
854 LogVariant::Compute(ComputeLog::DataflowGlobal) => 31,
855 LogVariant::Compute(ComputeLog::OperatorHydrationStatus) => 32,
856 LogVariant::Compute(ComputeLog::PrometheusMetrics) => 33,
857 };
858
859 let mut id: u64 = u64::from(cluster_variant) << 56;
860 id |= cluster_id << 8;
861 id |= u64::from(log_variant);
862
863 (
864 CatalogItemId::IntrospectionSourceIndex(id),
865 GlobalId::IntrospectionSourceIndex(id),
866 )
867 }
868
869 pub fn allocate_user_item_ids(
870 &mut self,
871 amount: u64,
872 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
873 Ok(self
874 .get_and_increment_id_by(USER_ITEM_ALLOC_KEY.to_string(), amount)?
875 .into_iter()
876 .map(|x| (CatalogItemId::User(x), GlobalId::User(x)))
878 .collect())
879 }
880
881 pub fn allocate_system_replica_id(&mut self) -> Result<ReplicaId, CatalogError> {
882 let id = self.get_and_increment_id(SYSTEM_REPLICA_ID_ALLOC_KEY.to_string())?;
883 Ok(ReplicaId::System(id))
884 }
885
886 pub fn allocate_audit_log_id(&mut self) -> Result<u64, CatalogError> {
887 self.get_and_increment_id(AUDIT_LOG_ID_ALLOC_KEY.to_string())
888 }
889
890 #[mz_ore::instrument]
893 fn allocate_oids(
894 &mut self,
895 amount: u64,
896 temporary_oids: &HashSet<u32>,
897 ) -> Result<Vec<u32>, CatalogError> {
898 struct UserOid(u32);
901
902 impl UserOid {
903 fn new(oid: u32) -> Result<UserOid, anyhow::Error> {
904 if oid < FIRST_USER_OID {
905 Err(anyhow!("invalid user OID {oid}"))
906 } else {
907 Ok(UserOid(oid))
908 }
909 }
910 }
911
912 impl std::ops::AddAssign<u32> for UserOid {
913 fn add_assign(&mut self, rhs: u32) {
914 let (res, overflow) = self.0.overflowing_add(rhs);
915 self.0 = if overflow { FIRST_USER_OID + res } else { res };
916 }
917 }
918
919 if amount > u32::MAX.into() {
920 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
921 }
922
923 let mut allocated_oids = HashSet::with_capacity(
929 self.databases.len()
930 + self.schemas.len()
931 + self.roles.len()
932 + self.items.len()
933 + self.introspection_sources.len()
934 + temporary_oids.len(),
935 );
936 self.databases.for_values(|_, value| {
937 allocated_oids.insert(value.oid);
938 });
939 self.schemas.for_values(|_, value| {
940 allocated_oids.insert(value.oid);
941 });
942 self.roles.for_values(|_, value| {
943 allocated_oids.insert(value.oid);
944 });
945 self.items.for_values(|_, value| {
946 allocated_oids.insert(value.oid);
947 });
948 self.introspection_sources.for_values(|_, value| {
949 allocated_oids.insert(value.oid);
950 });
951
952 let is_allocated = |oid| allocated_oids.contains(&oid) || temporary_oids.contains(&oid);
953
954 let start_oid: u32 = self
955 .id_allocator
956 .items()
957 .get(&IdAllocKey {
958 name: OID_ALLOC_KEY.to_string(),
959 })
960 .unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
961 .next_id
962 .try_into()
963 .expect("we should never persist an oid outside of the u32 range");
964 let mut current_oid = UserOid::new(start_oid)
965 .expect("we should never persist an oid outside of user OID range");
966 let mut oids = Vec::new();
967 while oids.len() < u64_to_usize(amount) {
968 if !is_allocated(current_oid.0) {
969 oids.push(current_oid.0);
970 }
971 current_oid += 1;
972
973 if current_oid.0 == start_oid && oids.len() < u64_to_usize(amount) {
974 return Err(CatalogError::Catalog(SqlCatalogError::OidExhaustion));
976 }
977 }
978
979 let next_id = current_oid.0;
980 let prev = self.id_allocator.set(
981 IdAllocKey {
982 name: OID_ALLOC_KEY.to_string(),
983 },
984 Some(IdAllocValue {
985 next_id: next_id.into(),
986 }),
987 self.op_id,
988 )?;
989 assert_eq!(
990 prev,
991 Some(IdAllocValue {
992 next_id: start_oid.into(),
993 })
994 );
995
996 Ok(oids)
997 }
998
999 pub fn allocate_oid(&mut self, temporary_oids: &HashSet<u32>) -> Result<u32, CatalogError> {
1002 self.allocate_oids(1, temporary_oids)
1003 .map(|oids| oids.into_element())
1004 }
1005
1006 pub fn current_snapshot(&self) -> Snapshot {
1014 Snapshot {
1015 databases: self.databases.current_items_proto(),
1016 schemas: self.schemas.current_items_proto(),
1017 roles: self.roles.current_items_proto(),
1018 role_auth: self.role_auth.current_items_proto(),
1019 items: self.items.current_items_proto(),
1020 comments: self.comments.current_items_proto(),
1021 clusters: self.clusters.current_items_proto(),
1022 network_policies: self.network_policies.current_items_proto(),
1023 cluster_replicas: self.cluster_replicas.current_items_proto(),
1024 introspection_sources: self.introspection_sources.current_items_proto(),
1025 id_allocator: self.id_allocator.current_items_proto(),
1026 configs: self.configs.current_items_proto(),
1027 settings: self.settings.current_items_proto(),
1028 system_object_mappings: self.system_gid_mapping.current_items_proto(),
1029 system_configurations: self.system_configurations.current_items_proto(),
1030 cluster_system_configurations: self.cluster_system_configurations.current_items_proto(),
1031 replica_system_configurations: self.replica_system_configurations.current_items_proto(),
1032 default_privileges: self.default_privileges.current_items_proto(),
1033 source_references: self.source_references.current_items_proto(),
1034 system_privileges: self.system_privileges.current_items_proto(),
1035 storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1036 unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1037 txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1038 }
1039 }
1040
1041 pub(crate) fn insert_id_allocator(
1042 &mut self,
1043 name: String,
1044 next_id: u64,
1045 ) -> Result<(), CatalogError> {
1046 match self.id_allocator.insert(
1047 IdAllocKey { name: name.clone() },
1048 IdAllocValue { next_id },
1049 self.op_id,
1050 ) {
1051 Ok(_) => Ok(()),
1052 Err(_) => Err(SqlCatalogError::IdAllocatorAlreadyExists(name).into()),
1053 }
1054 }
1055
1056 pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), CatalogError> {
1063 let prev = self
1064 .databases
1065 .set(DatabaseKey { id: *id }, None, self.op_id)?;
1066 if prev.is_some() {
1067 Ok(())
1068 } else {
1069 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1070 }
1071 }
1072
1073 pub fn remove_databases(
1080 &mut self,
1081 databases: &BTreeSet<DatabaseId>,
1082 ) -> Result<(), CatalogError> {
1083 if databases.is_empty() {
1084 return Ok(());
1085 }
1086
1087 let to_remove = databases
1088 .iter()
1089 .map(|id| (DatabaseKey { id: *id }, None))
1090 .collect();
1091 let mut prev = self.databases.set_many(to_remove, self.op_id)?;
1092 prev.retain(|_k, val| val.is_none());
1093
1094 if !prev.is_empty() {
1095 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1096 return Err(SqlCatalogError::UnknownDatabase(err).into());
1097 }
1098
1099 Ok(())
1100 }
1101
1102 pub fn remove_schema(
1109 &mut self,
1110 database_id: &Option<DatabaseId>,
1111 schema_id: &SchemaId,
1112 ) -> Result<(), CatalogError> {
1113 let prev = self
1114 .schemas
1115 .set(SchemaKey { id: *schema_id }, None, self.op_id)?;
1116 if prev.is_some() {
1117 Ok(())
1118 } else {
1119 let database_name = match database_id {
1120 Some(id) => format!("{id}."),
1121 None => "".to_string(),
1122 };
1123 Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_name, schema_id)).into())
1124 }
1125 }
1126
1127 pub fn remove_schemas(
1134 &mut self,
1135 schemas: &BTreeMap<SchemaId, ResolvedDatabaseSpecifier>,
1136 ) -> Result<(), CatalogError> {
1137 if schemas.is_empty() {
1138 return Ok(());
1139 }
1140
1141 let to_remove = schemas
1142 .iter()
1143 .map(|(schema_id, _)| (SchemaKey { id: *schema_id }, None))
1144 .collect();
1145 let mut prev = self.schemas.set_many(to_remove, self.op_id)?;
1146 prev.retain(|_k, v| v.is_none());
1147
1148 if !prev.is_empty() {
1149 let err = prev
1150 .keys()
1151 .map(|k| {
1152 let db_spec = schemas.get(&k.id).expect("should_exist");
1153 let db_name = match db_spec {
1154 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
1155 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
1156 };
1157 format!("{}.{}", db_name, k.id)
1158 })
1159 .join(", ");
1160
1161 return Err(SqlCatalogError::UnknownSchema(err).into());
1162 }
1163
1164 Ok(())
1165 }
1166
1167 pub fn remove_source_references(
1168 &mut self,
1169 source_id: CatalogItemId,
1170 ) -> Result<(), CatalogError> {
1171 let deleted = self
1172 .source_references
1173 .delete_by_key(SourceReferencesKey { source_id }, self.op_id)
1174 .is_some();
1175 if deleted {
1176 Ok(())
1177 } else {
1178 Err(SqlCatalogError::UnknownItem(source_id.to_string()).into())
1179 }
1180 }
1181
1182 pub fn remove_user_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1189 assert!(
1190 roles.iter().all(|id| id.is_user()),
1191 "cannot delete non-user roles"
1192 );
1193 self.remove_roles(roles)
1194 }
1195
1196 pub fn remove_roles(&mut self, roles: &BTreeSet<RoleId>) -> Result<(), CatalogError> {
1203 if roles.is_empty() {
1204 return Ok(());
1205 }
1206
1207 let to_remove_keys = roles
1208 .iter()
1209 .map(|role_id| RoleKey { id: *role_id })
1210 .collect::<Vec<_>>();
1211
1212 let to_remove_roles = to_remove_keys
1213 .iter()
1214 .map(|role_key| (role_key.clone(), None))
1215 .collect();
1216
1217 let mut prev = self.roles.set_many(to_remove_roles, self.op_id)?;
1218
1219 let to_remove_role_auth = to_remove_keys
1220 .iter()
1221 .map(|role_key| {
1222 (
1223 RoleAuthKey {
1224 role_id: role_key.id,
1225 },
1226 None,
1227 )
1228 })
1229 .collect();
1230
1231 let mut role_auth_prev = self.role_auth.set_many(to_remove_role_auth, self.op_id)?;
1232
1233 prev.retain(|_k, v| v.is_none());
1234 if !prev.is_empty() {
1235 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1236 return Err(SqlCatalogError::UnknownRole(err).into());
1237 }
1238
1239 role_auth_prev.retain(|_k, v| v.is_none());
1240 Ok(())
1244 }
1245
1246 pub fn remove_clusters(&mut self, clusters: &BTreeSet<ClusterId>) -> Result<(), CatalogError> {
1253 if clusters.is_empty() {
1254 return Ok(());
1255 }
1256
1257 let to_remove = clusters
1258 .iter()
1259 .map(|cluster_id| (ClusterKey { id: *cluster_id }, None))
1260 .collect();
1261 let mut prev = self.clusters.set_many(to_remove, self.op_id)?;
1262
1263 prev.retain(|_k, v| v.is_none());
1264 if !prev.is_empty() {
1265 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1266 return Err(SqlCatalogError::UnknownCluster(err).into());
1267 }
1268
1269 self.cluster_replicas
1275 .delete(|_k, v| clusters.contains(&v.cluster_id), self.op_id);
1276 self.introspection_sources
1277 .delete(|k, _v| clusters.contains(&k.cluster_id), self.op_id);
1278
1279 Ok(())
1280 }
1281
1282 pub fn remove_cluster_replica(&mut self, id: ReplicaId) -> Result<(), CatalogError> {
1289 let deleted = self
1290 .cluster_replicas
1291 .delete_by_key(ClusterReplicaKey { id }, self.op_id)
1292 .is_some();
1293 if deleted {
1294 Ok(())
1295 } else {
1296 Err(SqlCatalogError::UnknownClusterReplica(id.to_string()).into())
1297 }
1298 }
1299
1300 pub fn remove_cluster_replicas(
1307 &mut self,
1308 replicas: &BTreeSet<ReplicaId>,
1309 ) -> Result<(), CatalogError> {
1310 if replicas.is_empty() {
1311 return Ok(());
1312 }
1313
1314 let to_remove = replicas
1315 .iter()
1316 .map(|replica_id| (ClusterReplicaKey { id: *replica_id }, None))
1317 .collect();
1318 let mut prev = self.cluster_replicas.set_many(to_remove, self.op_id)?;
1319
1320 prev.retain(|_k, v| v.is_none());
1321 if !prev.is_empty() {
1322 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1323 return Err(SqlCatalogError::UnknownClusterReplica(err).into());
1324 }
1325
1326 Ok(())
1327 }
1328
1329 pub fn remove_item(&mut self, id: CatalogItemId) -> Result<(), CatalogError> {
1336 let prev = self.items.set(ItemKey { id }, None, self.op_id)?;
1337 if prev.is_some() {
1338 Ok(())
1339 } else {
1340 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1341 }
1342 }
1343
1344 pub fn remove_items(&mut self, ids: &BTreeSet<CatalogItemId>) -> Result<(), CatalogError> {
1351 if ids.is_empty() {
1352 return Ok(());
1353 }
1354
1355 let ks: Vec<_> = ids.clone().into_iter().map(|id| ItemKey { id }).collect();
1356 let n = self.items.delete_by_keys(ks, self.op_id).len();
1357 if n == ids.len() {
1358 Ok(())
1359 } else {
1360 let item_ids = self.items.items().keys().map(|k| k.id).collect();
1361 let mut unknown = ids.difference(&item_ids);
1362 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1363 }
1364 }
1365
1366 pub fn remove_system_object_mappings(
1373 &mut self,
1374 descriptions: BTreeSet<SystemObjectDescription>,
1375 ) -> Result<(), CatalogError> {
1376 if descriptions.is_empty() {
1377 return Ok(());
1378 }
1379
1380 let ks: Vec<_> = descriptions
1381 .clone()
1382 .into_iter()
1383 .map(|desc| GidMappingKey {
1384 schema_name: desc.schema_name,
1385 object_type: desc.object_type,
1386 object_name: desc.object_name,
1387 })
1388 .collect();
1389 let n = self.system_gid_mapping.delete_by_keys(ks, self.op_id).len();
1390
1391 if n == descriptions.len() {
1392 Ok(())
1393 } else {
1394 let item_descriptions = self
1395 .system_gid_mapping
1396 .items()
1397 .keys()
1398 .map(|k| SystemObjectDescription {
1399 schema_name: k.schema_name.clone(),
1400 object_type: k.object_type.clone(),
1401 object_name: k.object_name.clone(),
1402 })
1403 .collect();
1404 let mut unknown = descriptions.difference(&item_descriptions).map(|desc| {
1405 format!(
1406 "{} {}.{}",
1407 desc.object_type, desc.schema_name, desc.object_name
1408 )
1409 });
1410 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1411 }
1412 }
1413
1414 pub fn remove_introspection_source_indexes(
1421 &mut self,
1422 introspection_source_indexes: BTreeSet<(ClusterId, String)>,
1423 ) -> Result<(), CatalogError> {
1424 if introspection_source_indexes.is_empty() {
1425 return Ok(());
1426 }
1427
1428 let ks: Vec<_> = introspection_source_indexes
1429 .clone()
1430 .into_iter()
1431 .map(|(cluster_id, name)| ClusterIntrospectionSourceIndexKey { cluster_id, name })
1432 .collect();
1433 let n = self
1434 .introspection_sources
1435 .delete_by_keys(ks, self.op_id)
1436 .len();
1437 if n == introspection_source_indexes.len() {
1438 Ok(())
1439 } else {
1440 let txn_indexes = self
1441 .introspection_sources
1442 .items()
1443 .keys()
1444 .map(|k| (k.cluster_id, k.name.clone()))
1445 .collect();
1446 let mut unknown = introspection_source_indexes
1447 .difference(&txn_indexes)
1448 .map(|(cluster_id, name)| format!("{cluster_id} {name}"));
1449 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1450 }
1451 }
1452
1453 pub fn update_item(&mut self, id: CatalogItemId, item: Item) -> Result<(), CatalogError> {
1460 let updated =
1461 self.items
1462 .update_by_key(ItemKey { id }, item.into_key_value().1, self.op_id)?;
1463 if updated {
1464 Ok(())
1465 } else {
1466 Err(SqlCatalogError::UnknownItem(id.to_string()).into())
1467 }
1468 }
1469
1470 pub fn update_items(
1478 &mut self,
1479 items: BTreeMap<CatalogItemId, Item>,
1480 ) -> Result<(), CatalogError> {
1481 if items.is_empty() {
1482 return Ok(());
1483 }
1484
1485 let update_ids: BTreeSet<_> = items.keys().cloned().collect();
1486 let kvs: Vec<_> = items
1487 .clone()
1488 .into_iter()
1489 .map(|(id, item)| (ItemKey { id }, item.into_key_value().1))
1490 .collect();
1491 let n = self.items.update_by_keys(kvs, self.op_id)?;
1492 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1493 if n == update_ids.len() {
1494 Ok(())
1495 } else {
1496 let item_ids: BTreeSet<_> = self.items.items().keys().map(|k| k.id).collect();
1497 let mut unknown = update_ids.difference(&item_ids);
1498 Err(SqlCatalogError::UnknownItem(unknown.join(", ")).into())
1499 }
1500 }
1501
1502 pub fn update_role(
1510 &mut self,
1511 id: RoleId,
1512 role: Role,
1513 password: PasswordAction,
1514 ) -> Result<(), CatalogError> {
1515 let key = RoleKey { id };
1516 if self.roles.get(&key).is_some() {
1517 let auth_key = RoleAuthKey { role_id: id };
1518
1519 match password {
1520 PasswordAction::Set(new_password) => {
1521 let hash = mz_auth::hash::scram256_hash(
1522 &new_password.password,
1523 &new_password.scram_iterations,
1524 )
1525 .expect("password hash should be valid");
1526 let value = RoleAuthValue {
1527 password_hash: Some(hash),
1528 updated_at: SYSTEM_TIME(),
1529 };
1530
1531 if self.role_auth.get(&auth_key).is_some() {
1532 self.role_auth
1533 .update_by_key(auth_key.clone(), value, self.op_id)?;
1534 } else {
1535 self.role_auth.insert(auth_key.clone(), value, self.op_id)?;
1536 }
1537 }
1538 PasswordAction::Clear => {
1539 let value = RoleAuthValue {
1540 password_hash: None,
1541 updated_at: SYSTEM_TIME(),
1542 };
1543 if self.role_auth.get(&auth_key).is_some() {
1544 self.role_auth
1545 .update_by_key(auth_key.clone(), value, self.op_id)?;
1546 }
1547 }
1548 PasswordAction::NoChange => {}
1549 }
1550
1551 self.roles
1552 .update_by_key(key, role.into_key_value().1, self.op_id)?;
1553
1554 Ok(())
1555 } else {
1556 Err(SqlCatalogError::UnknownRole(id.to_string()).into())
1557 }
1558 }
1559
1560 pub fn update_roles_without_auth(
1571 &mut self,
1572 roles: BTreeMap<RoleId, Role>,
1573 ) -> Result<(), CatalogError> {
1574 if roles.is_empty() {
1575 return Ok(());
1576 }
1577
1578 let update_role_ids: BTreeSet<_> = roles.keys().cloned().collect();
1579 let kvs: Vec<_> = roles
1580 .into_iter()
1581 .map(|(id, role)| (RoleKey { id }, role.into_key_value().1))
1582 .collect();
1583 let n = self.roles.update_by_keys(kvs, self.op_id)?;
1584 let n = usize::try_from(n.into_inner()).expect("Must be positive and fit in usize");
1585
1586 if n == update_role_ids.len() {
1587 Ok(())
1588 } else {
1589 let role_ids: BTreeSet<_> = self.roles.items().keys().map(|k| k.id).collect();
1590 let mut unknown = update_role_ids.difference(&role_ids);
1591 Err(SqlCatalogError::UnknownRole(unknown.join(", ")).into())
1592 }
1593 }
1594
1595 pub fn update_system_object_mappings(
1600 &mut self,
1601 mappings: BTreeMap<CatalogItemId, SystemObjectMapping>,
1602 ) -> Result<(), CatalogError> {
1603 if mappings.is_empty() {
1604 return Ok(());
1605 }
1606
1607 let n = self.system_gid_mapping.update(
1608 |_k, v| {
1609 if let Some(mapping) = mappings.get(&CatalogItemId::from(v.catalog_id)) {
1610 let (_, new_value) = mapping.clone().into_key_value();
1611 Some(new_value)
1612 } else {
1613 None
1614 }
1615 },
1616 self.op_id,
1617 )?;
1618
1619 if usize::try_from(n.into_inner()).expect("update diff should fit into usize")
1620 != mappings.len()
1621 {
1622 let id_str = mappings.keys().map(|id| id.to_string()).join(",");
1623 return Err(SqlCatalogError::FailedBuiltinSchemaMigration(id_str).into());
1624 }
1625
1626 Ok(())
1627 }
1628
1629 pub fn update_cluster(&mut self, id: ClusterId, cluster: Cluster) -> Result<(), CatalogError> {
1636 let updated = self.clusters.update_by_key(
1637 ClusterKey { id },
1638 cluster.into_key_value().1,
1639 self.op_id,
1640 )?;
1641 if updated {
1642 Ok(())
1643 } else {
1644 Err(SqlCatalogError::UnknownCluster(id.to_string()).into())
1645 }
1646 }
1647
1648 pub fn update_cluster_replica(
1655 &mut self,
1656 replica_id: ReplicaId,
1657 replica: ClusterReplica,
1658 ) -> Result<(), CatalogError> {
1659 let updated = self.cluster_replicas.update_by_key(
1660 ClusterReplicaKey { id: replica_id },
1661 replica.into_key_value().1,
1662 self.op_id,
1663 )?;
1664 if updated {
1665 Ok(())
1666 } else {
1667 Err(SqlCatalogError::UnknownClusterReplica(replica_id.to_string()).into())
1668 }
1669 }
1670
1671 pub fn update_database(
1678 &mut self,
1679 id: DatabaseId,
1680 database: Database,
1681 ) -> Result<(), CatalogError> {
1682 let updated = self.databases.update_by_key(
1683 DatabaseKey { id },
1684 database.into_key_value().1,
1685 self.op_id,
1686 )?;
1687 if updated {
1688 Ok(())
1689 } else {
1690 Err(SqlCatalogError::UnknownDatabase(id.to_string()).into())
1691 }
1692 }
1693
1694 pub fn update_schema(
1701 &mut self,
1702 schema_id: SchemaId,
1703 schema: Schema,
1704 ) -> Result<(), CatalogError> {
1705 let updated = self.schemas.update_by_key(
1706 SchemaKey { id: schema_id },
1707 schema.into_key_value().1,
1708 self.op_id,
1709 )?;
1710 if updated {
1711 Ok(())
1712 } else {
1713 Err(SqlCatalogError::UnknownSchema(schema_id.to_string()).into())
1714 }
1715 }
1716
1717 pub fn update_network_policy(
1724 &mut self,
1725 id: NetworkPolicyId,
1726 network_policy: NetworkPolicy,
1727 ) -> Result<(), CatalogError> {
1728 let updated = self.network_policies.update_by_key(
1729 NetworkPolicyKey { id },
1730 network_policy.into_key_value().1,
1731 self.op_id,
1732 )?;
1733 if updated {
1734 Ok(())
1735 } else {
1736 Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into())
1737 }
1738 }
1739 pub fn remove_network_policies(
1746 &mut self,
1747 network_policies: &BTreeSet<NetworkPolicyId>,
1748 ) -> Result<(), CatalogError> {
1749 if network_policies.is_empty() {
1750 return Ok(());
1751 }
1752
1753 let to_remove = network_policies
1754 .iter()
1755 .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None))
1756 .collect();
1757 let mut prev = self.network_policies.set_many(to_remove, self.op_id)?;
1758 assert!(
1759 prev.iter().all(|(k, _)| k.id.is_user()),
1760 "cannot delete non-user network policy"
1761 );
1762
1763 prev.retain(|_k, v| v.is_none());
1764 if !prev.is_empty() {
1765 let err = prev.keys().map(|k| k.id.to_string()).join(", ");
1766 return Err(SqlCatalogError::UnknownNetworkPolicy(err).into());
1767 }
1768
1769 Ok(())
1770 }
1771 pub fn set_default_privilege(
1775 &mut self,
1776 role_id: RoleId,
1777 database_id: Option<DatabaseId>,
1778 schema_id: Option<SchemaId>,
1779 object_type: ObjectType,
1780 grantee: RoleId,
1781 privileges: Option<AclMode>,
1782 ) -> Result<(), CatalogError> {
1783 self.default_privileges.set(
1784 DefaultPrivilegesKey {
1785 role_id,
1786 database_id,
1787 schema_id,
1788 object_type,
1789 grantee,
1790 },
1791 privileges.map(|privileges| DefaultPrivilegesValue { privileges }),
1792 self.op_id,
1793 )?;
1794 Ok(())
1795 }
1796
1797 pub fn set_default_privileges(
1799 &mut self,
1800 default_privileges: Vec<DefaultPrivilege>,
1801 ) -> Result<(), CatalogError> {
1802 if default_privileges.is_empty() {
1803 return Ok(());
1804 }
1805
1806 let default_privileges = default_privileges
1807 .into_iter()
1808 .map(DurableType::into_key_value)
1809 .map(|(k, v)| (k, Some(v)))
1810 .collect();
1811 self.default_privileges
1812 .set_many(default_privileges, self.op_id)?;
1813 Ok(())
1814 }
1815
1816 pub fn set_system_privilege(
1820 &mut self,
1821 grantee: RoleId,
1822 grantor: RoleId,
1823 acl_mode: Option<AclMode>,
1824 ) -> Result<(), CatalogError> {
1825 self.system_privileges.set(
1826 SystemPrivilegesKey { grantee, grantor },
1827 acl_mode.map(|acl_mode| SystemPrivilegesValue { acl_mode }),
1828 self.op_id,
1829 )?;
1830 Ok(())
1831 }
1832
1833 pub fn set_system_privileges(
1835 &mut self,
1836 system_privileges: Vec<MzAclItem>,
1837 ) -> Result<(), CatalogError> {
1838 if system_privileges.is_empty() {
1839 return Ok(());
1840 }
1841
1842 let system_privileges = system_privileges
1843 .into_iter()
1844 .map(DurableType::into_key_value)
1845 .map(|(k, v)| (k, Some(v)))
1846 .collect();
1847 self.system_privileges
1848 .set_many(system_privileges, self.op_id)?;
1849 Ok(())
1850 }
1851
1852 pub fn set_setting(&mut self, name: String, value: Option<String>) -> Result<(), CatalogError> {
1854 self.settings.set(
1855 SettingKey { name },
1856 value.map(|value| SettingValue { value }),
1857 self.op_id,
1858 )?;
1859 Ok(())
1860 }
1861
1862 pub fn set_catalog_content_version(&mut self, version: String) -> Result<(), CatalogError> {
1863 self.set_setting(CATALOG_CONTENT_VERSION_KEY.to_string(), Some(version))
1864 }
1865
1866 pub fn insert_introspection_source_indexes(
1868 &mut self,
1869 introspection_source_indexes: Vec<(ClusterId, String, CatalogItemId, GlobalId)>,
1870 temporary_oids: &HashSet<u32>,
1871 ) -> Result<(), CatalogError> {
1872 if introspection_source_indexes.is_empty() {
1873 return Ok(());
1874 }
1875
1876 let amount = usize_to_u64(introspection_source_indexes.len());
1877 let oids = self.allocate_oids(amount, temporary_oids)?;
1878 let introspection_source_indexes: Vec<_> = introspection_source_indexes
1879 .into_iter()
1880 .zip_eq(oids)
1881 .map(
1882 |((cluster_id, name, item_id, index_id), oid)| IntrospectionSourceIndex {
1883 cluster_id,
1884 name,
1885 item_id,
1886 index_id,
1887 oid,
1888 },
1889 )
1890 .collect();
1891
1892 for introspection_source_index in introspection_source_indexes {
1893 let (key, value) = introspection_source_index.into_key_value();
1894 self.introspection_sources.insert(key, value, self.op_id)?;
1895 }
1896
1897 Ok(())
1898 }
1899
1900 pub fn set_system_object_mappings(
1902 &mut self,
1903 mappings: Vec<SystemObjectMapping>,
1904 ) -> Result<(), CatalogError> {
1905 if mappings.is_empty() {
1906 return Ok(());
1907 }
1908
1909 let mappings = mappings
1910 .into_iter()
1911 .map(DurableType::into_key_value)
1912 .map(|(k, v)| (k, Some(v)))
1913 .collect();
1914 self.system_gid_mapping.set_many(mappings, self.op_id)?;
1915 Ok(())
1916 }
1917
1918 pub fn set_replicas(&mut self, replicas: Vec<ClusterReplica>) -> Result<(), CatalogError> {
1920 if replicas.is_empty() {
1921 return Ok(());
1922 }
1923
1924 let replicas = replicas
1925 .into_iter()
1926 .map(DurableType::into_key_value)
1927 .map(|(k, v)| (k, Some(v)))
1928 .collect();
1929 self.cluster_replicas.set_many(replicas, self.op_id)?;
1930 Ok(())
1931 }
1932
1933 pub fn set_config(&mut self, key: String, value: Option<u64>) -> Result<(), CatalogError> {
1935 match value {
1936 Some(value) => {
1937 let config = Config { key, value };
1938 let (key, value) = config.into_key_value();
1939 self.configs.set(key, Some(value), self.op_id)?;
1940 }
1941 None => {
1942 self.configs.set(ConfigKey { key }, None, self.op_id)?;
1943 }
1944 }
1945 Ok(())
1946 }
1947
1948 pub fn get_config(&self, key: String) -> Option<u64> {
1950 self.configs
1951 .get(&ConfigKey { key })
1952 .map(|entry| entry.value)
1953 }
1954
1955 pub fn get_setting(&self, name: String) -> Option<&str> {
1957 self.settings
1958 .get(&SettingKey { name })
1959 .map(|entry| &*entry.value)
1960 }
1961
1962 pub fn get_builtin_migration_shard(&self) -> Option<ShardId> {
1963 self.get_setting(BUILTIN_MIGRATION_SHARD_KEY.to_string())
1964 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1965 }
1966
1967 pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1968 self.set_setting(
1969 BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1970 Some(shard_id.to_string()),
1971 )
1972 }
1973
1974 pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
1975 self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
1976 .map(|shard_id| shard_id.parse().expect("valid ShardId"))
1977 }
1978
1979 pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1980 self.set_setting(
1981 EXPRESSION_CACHE_SHARD_KEY.to_string(),
1982 Some(shard_id.to_string()),
1983 )
1984 }
1985
1986 pub fn set_0dt_deployment_max_wait(&mut self, value: Duration) -> Result<(), CatalogError> {
1992 self.set_config(
1993 WITH_0DT_DEPLOYMENT_MAX_WAIT.into(),
1994 Some(
1995 value
1996 .as_millis()
1997 .try_into()
1998 .expect("max wait fits into u64"),
1999 ),
2000 )
2001 }
2002
2003 pub fn set_0dt_deployment_ddl_check_interval(
2010 &mut self,
2011 value: Duration,
2012 ) -> Result<(), CatalogError> {
2013 self.set_config(
2014 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(),
2015 Some(
2016 value
2017 .as_millis()
2018 .try_into()
2019 .expect("ddl check interval fits into u64"),
2020 ),
2021 )
2022 }
2023
2024 pub fn set_enable_0dt_deployment_panic_after_timeout(
2030 &mut self,
2031 value: bool,
2032 ) -> Result<(), CatalogError> {
2033 self.set_config(
2034 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(),
2035 Some(u64::from(value)),
2036 )
2037 }
2038
2039 pub fn reset_0dt_deployment_max_wait(&mut self) -> Result<(), CatalogError> {
2045 self.set_config(WITH_0DT_DEPLOYMENT_MAX_WAIT.into(), None)
2046 }
2047
2048 pub fn reset_0dt_deployment_ddl_check_interval(&mut self) -> Result<(), CatalogError> {
2055 self.set_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL.into(), None)
2056 }
2057
2058 pub fn reset_enable_0dt_deployment_panic_after_timeout(&mut self) -> Result<(), CatalogError> {
2065 self.set_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT.into(), None)
2066 }
2067
2068 pub fn set_system_config_synced_once(&mut self) -> Result<(), CatalogError> {
2070 self.set_config(SYSTEM_CONFIG_SYNCED_KEY.into(), Some(1))
2071 }
2072
2073 pub fn update_comment(
2074 &mut self,
2075 object_id: CommentObjectId,
2076 sub_component: Option<usize>,
2077 comment: Option<String>,
2078 ) -> Result<(), CatalogError> {
2079 let key = CommentKey {
2080 object_id,
2081 sub_component,
2082 };
2083 let value = comment.map(|c| CommentValue { comment: c });
2084 self.comments.set(key, value, self.op_id)?;
2085
2086 Ok(())
2087 }
2088
2089 pub fn drop_comments(
2090 &mut self,
2091 object_ids: &BTreeSet<CommentObjectId>,
2092 ) -> Result<(), CatalogError> {
2093 if object_ids.is_empty() {
2094 return Ok(());
2095 }
2096
2097 self.comments
2098 .delete(|k, _v| object_ids.contains(&k.object_id), self.op_id);
2099 Ok(())
2100 }
2101
2102 pub fn update_source_references(
2103 &mut self,
2104 source_id: CatalogItemId,
2105 references: Vec<SourceReference>,
2106 updated_at: u64,
2107 ) -> Result<(), CatalogError> {
2108 let key = SourceReferencesKey { source_id };
2109 let value = SourceReferencesValue {
2110 references,
2111 updated_at,
2112 };
2113 self.source_references.set(key, Some(value), self.op_id)?;
2114 Ok(())
2115 }
2116
2117 pub fn upsert_system_config(&mut self, name: &str, value: String) -> Result<(), CatalogError> {
2119 let key = ServerConfigurationKey {
2120 name: name.to_string(),
2121 };
2122 let value = ServerConfigurationValue { value };
2123 self.system_configurations
2124 .set(key, Some(value), self.op_id)?;
2125 Ok(())
2126 }
2127
2128 pub fn remove_system_config(&mut self, name: &str) {
2130 let key = ServerConfigurationKey {
2131 name: name.to_string(),
2132 };
2133 self.system_configurations
2134 .set(key, None, self.op_id)
2135 .expect("cannot have uniqueness violation");
2136 }
2137
2138 pub fn clear_system_configs(&mut self) {
2140 self.system_configurations.delete(|_k, _v| true, self.op_id);
2141 }
2142
2143 pub fn get_cluster_system_configurations(
2145 &self,
2146 ) -> impl Iterator<Item = ClusterSystemConfiguration> + use<'_> {
2147 self.cluster_system_configurations
2148 .items()
2149 .into_iter()
2150 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2151 }
2152
2153 pub fn upsert_cluster_system_config(
2156 &mut self,
2157 cluster_id: ClusterId,
2158 name: &str,
2159 value: String,
2160 ) -> Result<(), CatalogError> {
2161 let key = ClusterSystemConfigurationKey {
2162 cluster_id,
2163 name: name.to_string(),
2164 };
2165 let value = ClusterSystemConfigurationValue { value };
2166 self.cluster_system_configurations
2167 .set(key, Some(value), self.op_id)?;
2168 Ok(())
2169 }
2170
2171 pub fn remove_cluster_system_config(&mut self, cluster_id: ClusterId, name: &str) {
2174 let key = ClusterSystemConfigurationKey {
2175 cluster_id,
2176 name: name.to_string(),
2177 };
2178 self.cluster_system_configurations
2179 .set(key, None, self.op_id)
2180 .expect("cannot have uniqueness violation");
2181 }
2182
2183 pub fn get_replica_system_configurations(
2185 &self,
2186 ) -> impl Iterator<Item = ReplicaSystemConfiguration> + use<'_> {
2187 self.replica_system_configurations
2188 .items()
2189 .into_iter()
2190 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2191 }
2192
2193 pub fn upsert_replica_system_config(
2196 &mut self,
2197 replica_id: ReplicaId,
2198 name: &str,
2199 value: String,
2200 ) -> Result<(), CatalogError> {
2201 let key = ReplicaSystemConfigurationKey {
2202 replica_id,
2203 name: name.to_string(),
2204 };
2205 let value = ReplicaSystemConfigurationValue { value };
2206 self.replica_system_configurations
2207 .set(key, Some(value), self.op_id)?;
2208 Ok(())
2209 }
2210
2211 pub fn remove_replica_system_config(&mut self, replica_id: ReplicaId, name: &str) {
2214 let key = ReplicaSystemConfigurationKey {
2215 replica_id,
2216 name: name.to_string(),
2217 };
2218 self.replica_system_configurations
2219 .set(key, None, self.op_id)
2220 .expect("cannot have uniqueness violation");
2221 }
2222
2223 pub(crate) fn insert_config(&mut self, key: String, value: u64) -> Result<(), CatalogError> {
2224 match self.configs.insert(
2225 ConfigKey { key: key.clone() },
2226 ConfigValue { value },
2227 self.op_id,
2228 ) {
2229 Ok(_) => Ok(()),
2230 Err(_) => Err(SqlCatalogError::ConfigAlreadyExists(key).into()),
2231 }
2232 }
2233
2234 pub fn get_clusters(&self) -> impl Iterator<Item = Cluster> + use<'_> {
2235 self.clusters
2236 .items()
2237 .into_iter()
2238 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2239 }
2240
2241 pub fn get_cluster_replicas(&self) -> impl Iterator<Item = ClusterReplica> + use<'_> {
2242 self.cluster_replicas
2243 .items()
2244 .into_iter()
2245 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2246 }
2247
2248 pub fn get_databases(&self) -> impl Iterator<Item = Database> + use<'_> {
2249 self.databases
2250 .items()
2251 .into_iter()
2252 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2253 }
2254
2255 pub fn get_roles(&self) -> impl Iterator<Item = Role> + use<'_> {
2256 self.roles
2257 .items()
2258 .into_iter()
2259 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2260 }
2261
2262 pub fn get_network_policies(&self) -> impl Iterator<Item = NetworkPolicy> + use<'_> {
2263 self.network_policies
2264 .items()
2265 .into_iter()
2266 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2267 }
2268
2269 pub fn get_system_object_mappings(
2270 &self,
2271 ) -> impl Iterator<Item = SystemObjectMapping> + use<'_> {
2272 self.system_gid_mapping
2273 .items()
2274 .into_iter()
2275 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2276 }
2277
2278 pub fn get_schemas(&self) -> impl Iterator<Item = Schema> + use<'_> {
2279 self.schemas
2280 .items()
2281 .into_iter()
2282 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2283 }
2284
2285 pub fn get_system_configurations(&self) -> impl Iterator<Item = SystemConfiguration> + use<'_> {
2286 self.system_configurations
2287 .items()
2288 .into_iter()
2289 .map(|(k, v)| DurableType::from_key_value(k.clone(), v.clone()))
2290 }
2291
2292 pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
2293 let key = SchemaKey { id: *id };
2294 self.schemas
2295 .get(&key)
2296 .map(|v| DurableType::from_key_value(key, v.clone()))
2297 }
2298
2299 pub fn get_introspection_source_indexes(
2300 &self,
2301 cluster_id: ClusterId,
2302 ) -> BTreeMap<&str, (GlobalId, u32)> {
2303 self.introspection_sources
2304 .items()
2305 .into_iter()
2306 .filter(|(k, _v)| k.cluster_id == cluster_id)
2307 .map(|(k, v)| (k.name.as_str(), (v.global_id.into(), v.oid)))
2308 .collect()
2309 }
2310
2311 pub fn get_catalog_content_version(&self) -> Option<&str> {
2312 self.settings
2313 .get(&SettingKey {
2314 name: CATALOG_CONTENT_VERSION_KEY.to_string(),
2315 })
2316 .map(|value| &*value.value)
2317 }
2318
2319 pub fn get_authentication_mock_nonce(&self) -> Option<String> {
2320 self.settings
2321 .get(&SettingKey {
2322 name: MOCK_AUTHENTICATION_NONCE_KEY.to_string(),
2323 })
2324 .map(|value| value.value.clone())
2325 }
2326
2327 #[must_use]
2333 pub fn get_and_commit_op_updates(&mut self) -> Vec<StateUpdate> {
2334 let updates = self.get_op_updates();
2335 self.commit_op();
2336 updates
2337 }
2338
2339 fn get_op_updates(&self) -> Vec<StateUpdate> {
2340 fn get_collection_op_updates<'a, T>(
2341 table_txn: &'a TableTransaction<T::Key, T::Value>,
2342 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2343 op: Timestamp,
2344 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2345 where
2346 T::Key: Ord + Eq + Clone + Debug,
2347 T::Value: Ord + Clone + Debug,
2348 T: DurableType,
2349 {
2350 table_txn
2351 .pending
2352 .iter()
2353 .flat_map(|(k, vs)| vs.into_iter().map(move |v| (k, v)))
2354 .filter_map(move |(k, v)| {
2355 if v.ts == op {
2356 let key = k.clone();
2357 let value = v.value.clone();
2358 let diff = v.diff.clone().try_into().expect("invalid diff");
2359 let update = DurableType::from_key_value(key, value);
2360 let kind = kind_fn(update);
2361 Some((kind, diff))
2362 } else {
2363 None
2364 }
2365 })
2366 }
2367
2368 fn get_large_collection_op_updates<'a, T>(
2369 collection: &'a Vec<(T::Key, Diff, Timestamp)>,
2370 kind_fn: impl Fn(T) -> StateUpdateKind + 'a,
2371 op: Timestamp,
2372 ) -> impl Iterator<Item = (StateUpdateKind, StateDiff)> + 'a
2373 where
2374 T::Key: Ord + Eq + Clone + Debug,
2375 T: DurableType<Value = ()>,
2376 {
2377 collection.iter().filter_map(move |(k, diff, ts)| {
2378 if *ts == op {
2379 let key = k.clone();
2380 let diff = diff.clone().try_into().expect("invalid diff");
2381 let update = DurableType::from_key_value(key, ());
2382 let kind = kind_fn(update);
2383 Some((kind, diff))
2384 } else {
2385 None
2386 }
2387 })
2388 }
2389
2390 let Transaction {
2391 durable_catalog: _,
2392 databases,
2393 schemas,
2394 items,
2395 comments,
2396 roles,
2397 role_auth,
2398 clusters,
2399 network_policies,
2400 cluster_replicas,
2401 introspection_sources,
2402 system_gid_mapping,
2403 system_configurations,
2404 cluster_system_configurations,
2405 replica_system_configurations,
2406 default_privileges,
2407 source_references,
2408 system_privileges,
2409 audit_log_updates,
2410 storage_collection_metadata,
2411 unfinalized_shards,
2412 id_allocator: _,
2414 configs: _,
2415 settings: _,
2416 txn_wal_shard: _,
2417 upper,
2418 op_id: _,
2419 } = &self;
2420
2421 let updates = std::iter::empty()
2422 .chain(get_collection_op_updates(
2423 roles,
2424 StateUpdateKind::Role,
2425 self.op_id,
2426 ))
2427 .chain(get_collection_op_updates(
2428 role_auth,
2429 StateUpdateKind::RoleAuth,
2430 self.op_id,
2431 ))
2432 .chain(get_collection_op_updates(
2433 databases,
2434 StateUpdateKind::Database,
2435 self.op_id,
2436 ))
2437 .chain(get_collection_op_updates(
2438 schemas,
2439 StateUpdateKind::Schema,
2440 self.op_id,
2441 ))
2442 .chain(get_collection_op_updates(
2443 default_privileges,
2444 StateUpdateKind::DefaultPrivilege,
2445 self.op_id,
2446 ))
2447 .chain(get_collection_op_updates(
2448 system_privileges,
2449 StateUpdateKind::SystemPrivilege,
2450 self.op_id,
2451 ))
2452 .chain(get_collection_op_updates(
2453 system_configurations,
2454 StateUpdateKind::SystemConfiguration,
2455 self.op_id,
2456 ))
2457 .chain(get_collection_op_updates(
2458 cluster_system_configurations,
2459 StateUpdateKind::ClusterSystemConfiguration,
2460 self.op_id,
2461 ))
2462 .chain(get_collection_op_updates(
2463 replica_system_configurations,
2464 StateUpdateKind::ReplicaSystemConfiguration,
2465 self.op_id,
2466 ))
2467 .chain(get_collection_op_updates(
2468 clusters,
2469 StateUpdateKind::Cluster,
2470 self.op_id,
2471 ))
2472 .chain(get_collection_op_updates(
2473 network_policies,
2474 StateUpdateKind::NetworkPolicy,
2475 self.op_id,
2476 ))
2477 .chain(get_collection_op_updates(
2478 introspection_sources,
2479 StateUpdateKind::IntrospectionSourceIndex,
2480 self.op_id,
2481 ))
2482 .chain(get_collection_op_updates(
2483 cluster_replicas,
2484 StateUpdateKind::ClusterReplica,
2485 self.op_id,
2486 ))
2487 .chain(get_collection_op_updates(
2488 system_gid_mapping,
2489 StateUpdateKind::SystemObjectMapping,
2490 self.op_id,
2491 ))
2492 .chain(get_collection_op_updates(
2493 items,
2494 StateUpdateKind::Item,
2495 self.op_id,
2496 ))
2497 .chain(get_collection_op_updates(
2498 comments,
2499 StateUpdateKind::Comment,
2500 self.op_id,
2501 ))
2502 .chain(get_collection_op_updates(
2503 source_references,
2504 StateUpdateKind::SourceReferences,
2505 self.op_id,
2506 ))
2507 .chain(get_collection_op_updates(
2508 storage_collection_metadata,
2509 StateUpdateKind::StorageCollectionMetadata,
2510 self.op_id,
2511 ))
2512 .chain(get_collection_op_updates(
2513 unfinalized_shards,
2514 StateUpdateKind::UnfinalizedShard,
2515 self.op_id,
2516 ))
2517 .chain(get_large_collection_op_updates(
2518 audit_log_updates,
2519 StateUpdateKind::AuditLog,
2520 self.op_id,
2521 ))
2522 .map(|(kind, diff)| StateUpdate {
2523 kind,
2524 ts: upper.clone(),
2525 diff,
2526 })
2527 .collect();
2528
2529 updates
2530 }
2531
2532 pub fn is_savepoint(&self) -> bool {
2533 self.durable_catalog.is_savepoint()
2534 }
2535
2536 fn commit_op(&mut self) {
2537 self.op_id += 1;
2538 }
2539
2540 pub fn op_id(&self) -> Timestamp {
2541 self.op_id
2542 }
2543
2544 pub fn upper(&self) -> mz_repr::Timestamp {
2545 self.upper
2546 }
2547
2548 pub(crate) fn into_parts(self) -> (TransactionBatch, &'a mut dyn DurableCatalogState) {
2549 let audit_log_updates = self
2550 .audit_log_updates
2551 .into_iter()
2552 .map(|(k, diff, _op)| (k.into_proto(), (), diff))
2553 .collect();
2554
2555 let txn_batch = TransactionBatch {
2556 databases: self.databases.pending(),
2557 schemas: self.schemas.pending(),
2558 items: self.items.pending(),
2559 comments: self.comments.pending(),
2560 roles: self.roles.pending(),
2561 role_auth: self.role_auth.pending(),
2562 clusters: self.clusters.pending(),
2563 cluster_replicas: self.cluster_replicas.pending(),
2564 network_policies: self.network_policies.pending(),
2565 introspection_sources: self.introspection_sources.pending(),
2566 id_allocator: self.id_allocator.pending(),
2567 configs: self.configs.pending(),
2568 source_references: self.source_references.pending(),
2569 settings: self.settings.pending(),
2570 system_gid_mapping: self.system_gid_mapping.pending(),
2571 system_configurations: self.system_configurations.pending(),
2572 cluster_system_configurations: self.cluster_system_configurations.pending(),
2573 replica_system_configurations: self.replica_system_configurations.pending(),
2574 default_privileges: self.default_privileges.pending(),
2575 system_privileges: self.system_privileges.pending(),
2576 storage_collection_metadata: self.storage_collection_metadata.pending(),
2577 unfinalized_shards: self.unfinalized_shards.pending(),
2578 txn_wal_shard: self.txn_wal_shard.pending(),
2579 audit_log_updates,
2580 upper: self.upper,
2581 };
2582 (txn_batch, self.durable_catalog)
2583 }
2584
2585 #[mz_ore::instrument(level = "debug")]
2597 pub(crate) async fn commit_internal(
2598 self,
2599 commit_ts: mz_repr::Timestamp,
2600 ) -> Result<(&'a mut dyn DurableCatalogState, mz_repr::Timestamp), CatalogError> {
2601 let (mut txn_batch, durable_catalog) = self.into_parts();
2602 let TransactionBatch {
2603 databases,
2604 schemas,
2605 items,
2606 comments,
2607 roles,
2608 role_auth,
2609 clusters,
2610 cluster_replicas,
2611 network_policies,
2612 introspection_sources,
2613 id_allocator,
2614 configs,
2615 source_references,
2616 settings,
2617 system_gid_mapping,
2618 system_configurations,
2619 cluster_system_configurations,
2620 replica_system_configurations,
2621 default_privileges,
2622 system_privileges,
2623 storage_collection_metadata,
2624 unfinalized_shards,
2625 txn_wal_shard,
2626 audit_log_updates,
2627 upper: _,
2628 } = &mut txn_batch;
2629 differential_dataflow::consolidation::consolidate_updates(databases);
2632 differential_dataflow::consolidation::consolidate_updates(schemas);
2633 differential_dataflow::consolidation::consolidate_updates(items);
2634 differential_dataflow::consolidation::consolidate_updates(comments);
2635 differential_dataflow::consolidation::consolidate_updates(roles);
2636 differential_dataflow::consolidation::consolidate_updates(role_auth);
2637 differential_dataflow::consolidation::consolidate_updates(clusters);
2638 differential_dataflow::consolidation::consolidate_updates(cluster_replicas);
2639 differential_dataflow::consolidation::consolidate_updates(network_policies);
2640 differential_dataflow::consolidation::consolidate_updates(introspection_sources);
2641 differential_dataflow::consolidation::consolidate_updates(id_allocator);
2642 differential_dataflow::consolidation::consolidate_updates(configs);
2643 differential_dataflow::consolidation::consolidate_updates(settings);
2644 differential_dataflow::consolidation::consolidate_updates(source_references);
2645 differential_dataflow::consolidation::consolidate_updates(system_gid_mapping);
2646 differential_dataflow::consolidation::consolidate_updates(system_configurations);
2647 differential_dataflow::consolidation::consolidate_updates(cluster_system_configurations);
2648 differential_dataflow::consolidation::consolidate_updates(replica_system_configurations);
2649 differential_dataflow::consolidation::consolidate_updates(default_privileges);
2650 differential_dataflow::consolidation::consolidate_updates(system_privileges);
2651 differential_dataflow::consolidation::consolidate_updates(storage_collection_metadata);
2652 differential_dataflow::consolidation::consolidate_updates(unfinalized_shards);
2653 differential_dataflow::consolidation::consolidate_updates(txn_wal_shard);
2654 differential_dataflow::consolidation::consolidate_updates(audit_log_updates);
2655
2656 let upper = durable_catalog
2657 .commit_transaction(txn_batch, commit_ts)
2658 .await?;
2659 Ok((durable_catalog, upper))
2660 }
2661
2662 #[mz_ore::instrument(level = "debug")]
2679 pub async fn commit(self, commit_ts: mz_repr::Timestamp) -> Result<(), CatalogError> {
2680 let op_updates = self.get_op_updates();
2681 assert!(
2682 op_updates.is_empty(),
2683 "unconsumed transaction updates: {op_updates:?}"
2684 );
2685
2686 let (durable_storage, upper) = self.commit_internal(commit_ts).await?;
2687 let updates = durable_storage.sync_updates(upper).await?;
2689 soft_assert_no_log!(
2694 durable_storage.is_read_only() || updates.iter().all(|update| update.ts == commit_ts),
2695 "unconsumed updates existed before transaction commit: commit_ts={commit_ts:?}, updates:{updates:?}"
2696 );
2697 Ok(())
2698 }
2699}
2700
2701use crate::durable::async_trait;
2702
2703use super::objects::{RoleAuthKey, RoleAuthValue};
2704
2705#[async_trait]
2706impl StorageTxn for Transaction<'_> {
2707 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId> {
2708 self.storage_collection_metadata
2709 .items()
2710 .into_iter()
2711 .map(
2712 |(
2713 StorageCollectionMetadataKey { id },
2714 StorageCollectionMetadataValue { shard },
2715 )| { (*id, shard.clone()) },
2716 )
2717 .collect()
2718 }
2719
2720 fn insert_collection_metadata(
2721 &mut self,
2722 metadata: BTreeMap<GlobalId, ShardId>,
2723 ) -> Result<(), StorageError> {
2724 for (id, shard) in metadata {
2725 self.storage_collection_metadata
2726 .insert(
2727 StorageCollectionMetadataKey { id },
2728 StorageCollectionMetadataValue {
2729 shard: shard.clone(),
2730 },
2731 self.op_id,
2732 )
2733 .map_err(|err| match err {
2734 DurableCatalogError::DuplicateKey => {
2735 StorageError::CollectionMetadataAlreadyExists(id)
2736 }
2737 DurableCatalogError::UniquenessViolation => {
2738 StorageError::PersistShardAlreadyInUse(shard)
2739 }
2740 err => StorageError::Generic(anyhow::anyhow!(err)),
2741 })?;
2742 }
2743 Ok(())
2744 }
2745
2746 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)> {
2747 let ks: Vec<_> = ids
2748 .into_iter()
2749 .map(|id| StorageCollectionMetadataKey { id })
2750 .collect();
2751 self.storage_collection_metadata
2752 .delete_by_keys(ks, self.op_id)
2753 .into_iter()
2754 .map(
2755 |(
2756 StorageCollectionMetadataKey { id },
2757 StorageCollectionMetadataValue { shard },
2758 )| (id, shard),
2759 )
2760 .collect()
2761 }
2762
2763 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId> {
2764 self.unfinalized_shards
2765 .items()
2766 .into_iter()
2767 .map(|(UnfinalizedShardKey { shard }, ())| *shard)
2768 .collect()
2769 }
2770
2771 fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError> {
2772 for shard in s {
2773 match self
2774 .unfinalized_shards
2775 .insert(UnfinalizedShardKey { shard }, (), self.op_id)
2776 {
2777 Ok(()) | Err(DurableCatalogError::DuplicateKey) => {}
2779 Err(e) => Err(StorageError::Generic(anyhow::anyhow!(e)))?,
2780 };
2781 }
2782 Ok(())
2783 }
2784
2785 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>) {
2786 let ks: Vec<_> = shards
2787 .into_iter()
2788 .map(|shard| UnfinalizedShardKey { shard })
2789 .collect();
2790 let _ = self.unfinalized_shards.delete_by_keys(ks, self.op_id);
2791 }
2792
2793 fn get_txn_wal_shard(&self) -> Option<ShardId> {
2794 self.txn_wal_shard
2795 .values()
2796 .iter()
2797 .next()
2798 .map(|TxnWalShardValue { shard }| *shard)
2799 }
2800
2801 fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError> {
2802 self.txn_wal_shard
2803 .insert((), TxnWalShardValue { shard }, self.op_id)
2804 .map_err(|err| match err {
2805 DurableCatalogError::DuplicateKey => StorageError::TxnWalShardAlreadyExists,
2806 err => StorageError::Generic(anyhow::anyhow!(err)),
2807 })
2808 }
2809}
2810
2811#[derive(Debug, Clone, Default, PartialEq)]
2813pub struct TransactionBatch {
2814 pub(crate) databases: Vec<(proto::DatabaseKey, proto::DatabaseValue, Diff)>,
2815 pub(crate) schemas: Vec<(proto::SchemaKey, proto::SchemaValue, Diff)>,
2816 pub(crate) items: Vec<(proto::ItemKey, proto::ItemValue, Diff)>,
2817 pub(crate) comments: Vec<(proto::CommentKey, proto::CommentValue, Diff)>,
2818 pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>,
2819 pub(crate) role_auth: Vec<(proto::RoleAuthKey, proto::RoleAuthValue, Diff)>,
2820 pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>,
2821 pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>,
2822 pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>,
2823 pub(crate) introspection_sources: Vec<(
2824 proto::ClusterIntrospectionSourceIndexKey,
2825 proto::ClusterIntrospectionSourceIndexValue,
2826 Diff,
2827 )>,
2828 pub(crate) id_allocator: Vec<(proto::IdAllocKey, proto::IdAllocValue, Diff)>,
2829 pub(crate) configs: Vec<(proto::ConfigKey, proto::ConfigValue, Diff)>,
2830 pub(crate) settings: Vec<(proto::SettingKey, proto::SettingValue, Diff)>,
2831 pub(crate) system_gid_mapping: Vec<(proto::GidMappingKey, proto::GidMappingValue, Diff)>,
2832 pub(crate) system_configurations: Vec<(
2833 proto::ServerConfigurationKey,
2834 proto::ServerConfigurationValue,
2835 Diff,
2836 )>,
2837 pub(crate) cluster_system_configurations: Vec<(
2838 proto::ClusterSystemConfigurationKey,
2839 proto::ClusterSystemConfigurationValue,
2840 Diff,
2841 )>,
2842 pub(crate) replica_system_configurations: Vec<(
2843 proto::ReplicaSystemConfigurationKey,
2844 proto::ReplicaSystemConfigurationValue,
2845 Diff,
2846 )>,
2847 pub(crate) default_privileges: Vec<(
2848 proto::DefaultPrivilegesKey,
2849 proto::DefaultPrivilegesValue,
2850 Diff,
2851 )>,
2852 pub(crate) source_references: Vec<(
2853 proto::SourceReferencesKey,
2854 proto::SourceReferencesValue,
2855 Diff,
2856 )>,
2857 pub(crate) system_privileges: Vec<(
2858 proto::SystemPrivilegesKey,
2859 proto::SystemPrivilegesValue,
2860 Diff,
2861 )>,
2862 pub(crate) storage_collection_metadata: Vec<(
2863 proto::StorageCollectionMetadataKey,
2864 proto::StorageCollectionMetadataValue,
2865 Diff,
2866 )>,
2867 pub(crate) unfinalized_shards: Vec<(proto::UnfinalizedShardKey, (), Diff)>,
2868 pub(crate) txn_wal_shard: Vec<((), proto::TxnWalShardValue, Diff)>,
2869 pub(crate) audit_log_updates: Vec<(proto::AuditLogKey, (), Diff)>,
2870 pub(crate) upper: mz_repr::Timestamp,
2872}
2873
2874impl TransactionBatch {
2875 pub fn is_empty(&self) -> bool {
2876 let TransactionBatch {
2877 databases,
2878 schemas,
2879 items,
2880 comments,
2881 roles,
2882 role_auth,
2883 clusters,
2884 cluster_replicas,
2885 network_policies,
2886 introspection_sources,
2887 id_allocator,
2888 configs,
2889 settings,
2890 source_references,
2891 system_gid_mapping,
2892 system_configurations,
2893 cluster_system_configurations,
2894 replica_system_configurations,
2895 default_privileges,
2896 system_privileges,
2897 storage_collection_metadata,
2898 unfinalized_shards,
2899 txn_wal_shard,
2900 audit_log_updates,
2901 upper: _,
2902 } = self;
2903 databases.is_empty()
2904 && schemas.is_empty()
2905 && items.is_empty()
2906 && comments.is_empty()
2907 && roles.is_empty()
2908 && role_auth.is_empty()
2909 && clusters.is_empty()
2910 && cluster_replicas.is_empty()
2911 && network_policies.is_empty()
2912 && introspection_sources.is_empty()
2913 && id_allocator.is_empty()
2914 && configs.is_empty()
2915 && settings.is_empty()
2916 && source_references.is_empty()
2917 && system_gid_mapping.is_empty()
2918 && system_configurations.is_empty()
2919 && cluster_system_configurations.is_empty()
2920 && replica_system_configurations.is_empty()
2921 && default_privileges.is_empty()
2922 && system_privileges.is_empty()
2923 && storage_collection_metadata.is_empty()
2924 && unfinalized_shards.is_empty()
2925 && txn_wal_shard.is_empty()
2926 && audit_log_updates.is_empty()
2927 }
2928}
2929
2930#[derive(Debug, Clone, PartialEq, Eq)]
2931struct TransactionUpdate<V> {
2932 value: V,
2933 ts: Timestamp,
2934 diff: Diff,
2935}
2936
2937trait UniqueName {
2939 const HAS_UNIQUE_NAME: bool;
2942 fn unique_name(&self) -> &str;
2944}
2945
2946mod unique_name {
2947 use crate::durable::objects::*;
2948
2949 macro_rules! impl_unique_name {
2950 ($($t:ty),* $(,)?) => {
2951 $(
2952 impl crate::durable::transaction::UniqueName for $t {
2953 const HAS_UNIQUE_NAME: bool = true;
2954 fn unique_name(&self) -> &str {
2955 &self.name
2956 }
2957 }
2958 )*
2959 };
2960 }
2961
2962 macro_rules! impl_no_unique_name {
2963 ($($t:ty),* $(,)?) => {
2964 $(
2965 impl crate::durable::transaction::UniqueName for $t {
2966 const HAS_UNIQUE_NAME: bool = false;
2967 fn unique_name(&self) -> &str {
2968 ""
2969 }
2970 }
2971 )*
2972 };
2973 }
2974
2975 impl_unique_name! {
2976 ClusterReplicaValue,
2977 ClusterValue,
2978 DatabaseValue,
2979 ItemValue,
2980 NetworkPolicyValue,
2981 RoleValue,
2982 SchemaValue,
2983 }
2984
2985 impl_no_unique_name!(
2986 (),
2987 ClusterIntrospectionSourceIndexValue,
2988 ClusterSystemConfigurationValue,
2989 CommentValue,
2990 ConfigValue,
2991 DefaultPrivilegesValue,
2992 GidMappingValue,
2993 IdAllocValue,
2994 ReplicaSystemConfigurationValue,
2995 ServerConfigurationValue,
2996 SettingValue,
2997 SourceReferencesValue,
2998 StorageCollectionMetadataValue,
2999 SystemPrivilegesValue,
3000 TxnWalShardValue,
3001 RoleAuthValue,
3002 );
3003
3004 #[cfg(test)]
3005 mod test {
3006 impl_no_unique_name!(String,);
3007 }
3008}
3009
3010#[derive(Debug)]
3020struct TableTransaction<K, V> {
3021 initial: BTreeMap<K, V>,
3022 pending: BTreeMap<K, Vec<TransactionUpdate<V>>>,
3025 uniqueness_violation: Option<fn(a: &V, b: &V) -> bool>,
3026}
3027
3028impl<K, V> TableTransaction<K, V>
3029where
3030 K: Ord + Eq + Clone + Debug,
3031 V: Ord + Clone + Debug + UniqueName,
3032{
3033 fn new<KP, VP>(initial: BTreeMap<KP, VP>) -> Result<Self, TryFromProtoError>
3040 where
3041 K: RustType<KP>,
3042 V: RustType<VP>,
3043 {
3044 let initial = initial
3045 .into_iter()
3046 .map(RustType::from_proto)
3047 .collect::<Result<_, _>>()?;
3048
3049 Ok(Self {
3050 initial,
3051 pending: BTreeMap::new(),
3052 uniqueness_violation: None,
3053 })
3054 }
3055
3056 fn new_with_uniqueness_fn<KP, VP>(
3059 initial: BTreeMap<KP, VP>,
3060 uniqueness_violation: fn(a: &V, b: &V) -> bool,
3061 ) -> Result<Self, TryFromProtoError>
3062 where
3063 K: RustType<KP>,
3064 V: RustType<VP>,
3065 {
3066 let initial = initial
3067 .into_iter()
3068 .map(RustType::from_proto)
3069 .collect::<Result<_, _>>()?;
3070
3071 Ok(Self {
3072 initial,
3073 pending: BTreeMap::new(),
3074 uniqueness_violation: Some(uniqueness_violation),
3075 })
3076 }
3077
3078 fn pending<KP, VP>(self) -> Vec<(KP, VP, Diff)>
3081 where
3082 K: RustType<KP>,
3083 V: RustType<VP>,
3084 {
3085 soft_assert_no_log!(self.verify().is_ok());
3086 self.pending
3089 .into_iter()
3090 .flat_map(|(k, v)| {
3091 let mut v: Vec<_> = v
3092 .into_iter()
3093 .map(|TransactionUpdate { value, ts: _, diff }| (value, diff))
3094 .collect();
3095 differential_dataflow::consolidation::consolidate(&mut v);
3096 v.into_iter().map(move |(v, diff)| (k.clone(), v, diff))
3097 })
3098 .map(|(key, val, diff)| (key.into_proto(), val.into_proto(), diff))
3099 .collect()
3100 }
3101
3102 fn verify(&self) -> Result<(), DurableCatalogError> {
3107 if let Some(uniqueness_violation) = self.uniqueness_violation {
3108 let items = self.values();
3110 if V::HAS_UNIQUE_NAME {
3111 let by_name: BTreeMap<_, _> = items
3112 .iter()
3113 .enumerate()
3114 .map(|(v, vi)| (vi.unique_name(), (v, vi)))
3115 .collect();
3116 for (i, vi) in items.iter().enumerate() {
3117 if let Some((j, vj)) = by_name.get(vi.unique_name()) {
3118 if i != *j && uniqueness_violation(vi, *vj) {
3119 return Err(DurableCatalogError::UniquenessViolation);
3120 }
3121 }
3122 }
3123 } else {
3124 for (i, vi) in items.iter().enumerate() {
3125 for (j, vj) in items.iter().enumerate() {
3126 if i != j && uniqueness_violation(vi, vj) {
3127 return Err(DurableCatalogError::UniquenessViolation);
3128 }
3129 }
3130 }
3131 }
3132 }
3133 soft_assert_no_log!(
3134 self.pending
3135 .values()
3136 .all(|pending| { pending.is_sorted_by(|a, b| a.ts <= b.ts) }),
3137 "pending should be sorted by timestamp: {:?}",
3138 self.pending
3139 );
3140 Ok(())
3141 }
3142
3143 fn verify_keys<'a>(
3148 &self,
3149 keys: impl IntoIterator<Item = &'a K>,
3150 ) -> Result<(), DurableCatalogError>
3151 where
3152 K: 'a,
3153 {
3154 if let Some(uniqueness_violation) = self.uniqueness_violation {
3155 let entries: Vec<_> = keys
3156 .into_iter()
3157 .filter_map(|key| self.get(key).map(|value| (key, value)))
3158 .collect();
3159 for (ki, vi) in self.items() {
3161 for (kj, vj) in &entries {
3162 if ki != *kj && uniqueness_violation(vi, vj) {
3163 return Err(DurableCatalogError::UniquenessViolation);
3164 }
3165 }
3166 }
3167 }
3168 soft_assert_no_log!(self.verify().is_ok());
3169 Ok(())
3170 }
3171
3172 fn for_values<'a, F: FnMut(&'a K, &'a V)>(&'a self, mut f: F) {
3175 let mut seen = BTreeSet::new();
3176 for k in self.pending.keys() {
3177 seen.insert(k);
3178 let v = self.get(k);
3179 if let Some(v) = v {
3182 f(k, v);
3183 }
3184 }
3185 for (k, v) in self.initial.iter() {
3186 if !seen.contains(k) {
3188 f(k, v);
3189 }
3190 }
3191 }
3192
3193 fn get(&self, k: &K) -> Option<&V> {
3195 let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
3196 let mut updates = Vec::with_capacity(pending.len() + 1);
3197 if let Some(initial) = self.initial.get(k) {
3198 updates.push((initial, Diff::ONE));
3199 }
3200 updates.extend(
3201 pending
3202 .into_iter()
3203 .map(|TransactionUpdate { value, ts: _, diff }| (value, *diff)),
3204 );
3205
3206 differential_dataflow::consolidation::consolidate(&mut updates);
3207 assert!(updates.len() <= 1);
3208 updates.into_iter().next().map(|(v, _)| v)
3209 }
3210
3211 #[cfg(test)]
3216 fn items_cloned(&self) -> BTreeMap<K, V> {
3217 let mut items = BTreeMap::new();
3218 self.for_values(|k, v| {
3219 items.insert(k.clone(), v.clone());
3220 });
3221 items
3222 }
3223
3224 fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3228 where
3229 K: RustType<KP>,
3230 V: RustType<VP>,
3231 KP: Ord,
3232 {
3233 let mut items = BTreeMap::new();
3234 self.for_values(|k, v| {
3235 items.insert(k.into_proto(), v.into_proto());
3236 });
3237 items
3238 }
3239
3240 fn items(&self) -> BTreeMap<&K, &V> {
3243 let mut items = BTreeMap::new();
3244 self.for_values(|k, v| {
3245 items.insert(k, v);
3246 });
3247 items
3248 }
3249
3250 fn values(&self) -> BTreeSet<&V> {
3252 let mut items = BTreeSet::new();
3253 self.for_values(|_, v| {
3254 items.insert(v);
3255 });
3256 items
3257 }
3258
3259 fn len(&self) -> usize {
3261 let mut count = 0;
3262 self.for_values(|_, _| {
3263 count += 1;
3264 });
3265 count
3266 }
3267
3268 fn for_values_mut<F: FnMut(&mut BTreeMap<K, Vec<TransactionUpdate<V>>>, &K, &V)>(
3272 &mut self,
3273 mut f: F,
3274 ) {
3275 let mut pending = BTreeMap::new();
3276 self.for_values(|k, v| f(&mut pending, k, v));
3277 for (k, updates) in pending {
3278 self.pending.entry(k).or_default().extend(updates);
3279 }
3280 }
3281
3282 fn insert(&mut self, k: K, v: V, ts: Timestamp) -> Result<(), DurableCatalogError> {
3286 let mut violation = None;
3287 self.for_values(|for_k, for_v| {
3288 if &k == for_k {
3289 violation = Some(DurableCatalogError::DuplicateKey);
3290 }
3291 if let Some(uniqueness_violation) = self.uniqueness_violation {
3292 if uniqueness_violation(for_v, &v) {
3293 violation = Some(DurableCatalogError::UniquenessViolation);
3294 }
3295 }
3296 });
3297 if let Some(violation) = violation {
3298 return Err(violation);
3299 }
3300 self.pending.entry(k).or_default().push(TransactionUpdate {
3301 value: v,
3302 ts,
3303 diff: Diff::ONE,
3304 });
3305 soft_assert_no_log!(self.verify().is_ok());
3306 Ok(())
3307 }
3308
3309 fn update<F: Fn(&K, &V) -> Option<V>>(
3318 &mut self,
3319 f: F,
3320 ts: Timestamp,
3321 ) -> Result<Diff, DurableCatalogError> {
3322 let mut changed = Diff::ZERO;
3323 let mut keys = BTreeSet::new();
3324 let pending = self.pending.clone();
3326 self.for_values_mut(|p, k, v| {
3327 if let Some(next) = f(k, v) {
3328 changed += Diff::ONE;
3329 keys.insert(k.clone());
3330 let updates = p.entry(k.clone()).or_default();
3331 updates.push(TransactionUpdate {
3332 value: v.clone(),
3333 ts,
3334 diff: Diff::MINUS_ONE,
3335 });
3336 updates.push(TransactionUpdate {
3337 value: next,
3338 ts,
3339 diff: Diff::ONE,
3340 });
3341 }
3342 });
3343 if let Err(err) = self.verify_keys(&keys) {
3345 self.pending = pending;
3346 Err(err)
3347 } else {
3348 Ok(changed)
3349 }
3350 }
3351
3352 fn update_by_key(&mut self, k: K, v: V, ts: Timestamp) -> Result<bool, DurableCatalogError> {
3357 if let Some(cur_v) = self.get(&k) {
3358 if v != *cur_v {
3359 self.set(k, Some(v), ts)?;
3360 }
3361 Ok(true)
3362 } else {
3363 Ok(false)
3364 }
3365 }
3366
3367 fn update_by_keys(
3372 &mut self,
3373 kvs: impl IntoIterator<Item = (K, V)>,
3374 ts: Timestamp,
3375 ) -> Result<Diff, DurableCatalogError> {
3376 let kvs: Vec<_> = kvs
3377 .into_iter()
3378 .filter_map(|(k, v)| match self.get(&k) {
3379 Some(cur_v) => Some((*cur_v == v, k, v)),
3381 None => None,
3382 })
3383 .collect();
3384 let changed = kvs.len();
3385 let changed =
3386 Diff::try_from(changed).map_err(|e| DurableCatalogError::Internal(e.to_string()))?;
3387 let kvs = kvs
3388 .into_iter()
3389 .filter(|(no_op, _, _)| !no_op)
3391 .map(|(_, k, v)| (k, Some(v)))
3392 .collect();
3393 self.set_many(kvs, ts)?;
3394 Ok(changed)
3395 }
3396
3397 fn set(&mut self, k: K, v: Option<V>, ts: Timestamp) -> Result<Option<V>, DurableCatalogError> {
3404 let prev = self.get(&k).cloned();
3405 let entry = self.pending.entry(k.clone()).or_default();
3406 let restore_len = entry.len();
3407
3408 match (v, prev.clone()) {
3409 (Some(v), Some(prev)) => {
3410 entry.push(TransactionUpdate {
3411 value: prev,
3412 ts,
3413 diff: Diff::MINUS_ONE,
3414 });
3415 entry.push(TransactionUpdate {
3416 value: v,
3417 ts,
3418 diff: Diff::ONE,
3419 });
3420 }
3421 (Some(v), None) => {
3422 entry.push(TransactionUpdate {
3423 value: v,
3424 ts,
3425 diff: Diff::ONE,
3426 });
3427 }
3428 (None, Some(prev)) => {
3429 entry.push(TransactionUpdate {
3430 value: prev,
3431 ts,
3432 diff: Diff::MINUS_ONE,
3433 });
3434 }
3435 (None, None) => {}
3436 }
3437
3438 if let Err(err) = self.verify_keys([&k]) {
3440 let pending = self.pending.get_mut(&k).expect("inserted above");
3443 pending.truncate(restore_len);
3444 Err(err)
3445 } else {
3446 Ok(prev)
3447 }
3448 }
3449
3450 fn set_many(
3455 &mut self,
3456 kvs: BTreeMap<K, Option<V>>,
3457 ts: Timestamp,
3458 ) -> Result<BTreeMap<K, Option<V>>, DurableCatalogError> {
3459 if kvs.is_empty() {
3460 return Ok(BTreeMap::new());
3461 }
3462
3463 let mut prevs = BTreeMap::new();
3464 let mut restores = BTreeMap::new();
3465
3466 for (k, v) in kvs {
3467 let prev = self.get(&k).cloned();
3468 let entry = self.pending.entry(k.clone()).or_default();
3469 restores.insert(k.clone(), entry.len());
3470
3471 match (v, prev.clone()) {
3472 (Some(v), Some(prev)) => {
3473 entry.push(TransactionUpdate {
3474 value: prev,
3475 ts,
3476 diff: Diff::MINUS_ONE,
3477 });
3478 entry.push(TransactionUpdate {
3479 value: v,
3480 ts,
3481 diff: Diff::ONE,
3482 });
3483 }
3484 (Some(v), None) => {
3485 entry.push(TransactionUpdate {
3486 value: v,
3487 ts,
3488 diff: Diff::ONE,
3489 });
3490 }
3491 (None, Some(prev)) => {
3492 entry.push(TransactionUpdate {
3493 value: prev,
3494 ts,
3495 diff: Diff::MINUS_ONE,
3496 });
3497 }
3498 (None, None) => {}
3499 }
3500
3501 prevs.insert(k, prev);
3502 }
3503
3504 if let Err(err) = self.verify_keys(prevs.keys()) {
3506 for (k, restore_len) in restores {
3507 let pending = self.pending.get_mut(&k).expect("inserted above");
3510 pending.truncate(restore_len);
3511 }
3512 Err(err)
3513 } else {
3514 Ok(prevs)
3515 }
3516 }
3517
3518 fn delete<F: Fn(&K, &V) -> bool>(&mut self, f: F, ts: Timestamp) -> Vec<(K, V)> {
3524 let mut deleted = Vec::new();
3525 self.for_values_mut(|p, k, v| {
3526 if f(k, v) {
3527 deleted.push((k.clone(), v.clone()));
3528 p.entry(k.clone()).or_default().push(TransactionUpdate {
3529 value: v.clone(),
3530 ts,
3531 diff: Diff::MINUS_ONE,
3532 });
3533 }
3534 });
3535 soft_assert_no_log!(self.verify().is_ok());
3536 deleted
3537 }
3538
3539 fn delete_by_key(&mut self, k: K, ts: Timestamp) -> Option<V> {
3543 self.set(k, None, ts)
3544 .expect("deleting an entry cannot violate uniqueness")
3545 }
3546
3547 fn delete_by_keys(&mut self, ks: impl IntoIterator<Item = K>, ts: Timestamp) -> Vec<(K, V)> {
3551 let kvs = ks.into_iter().map(|k| (k, None)).collect();
3552 let prevs = self
3553 .set_many(kvs, ts)
3554 .expect("deleting entries cannot violate uniqueness");
3555 prevs
3556 .into_iter()
3557 .filter_map(|(k, v)| v.map(|v| (k, v)))
3558 .collect()
3559 }
3560}
3561
3562#[cfg(test)]
3563#[allow(clippy::unwrap_used)]
3564mod tests {
3565 use super::*;
3566
3567 use mz_controller::clusters::ReplicaLogging;
3568 use mz_ore::now::SYSTEM_TIME;
3569 use mz_ore::{assert_none, assert_ok};
3570 use mz_persist_client::cache::PersistClientCache;
3571 use mz_persist_types::PersistLocation;
3572 use semver::Version;
3573
3574 use crate::durable::{
3575 ReplicaConfig, ReplicaLocation, TestCatalogStateBuilder, test_bootstrap_args,
3576 };
3577 use crate::memory;
3578
3579 #[mz_ore::test]
3580 fn test_table_transaction_simple() {
3581 fn uniqueness_violation(a: &String, b: &String) -> bool {
3582 a == b
3583 }
3584 let mut table = TableTransaction::new_with_uniqueness_fn(
3585 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "a".to_string())]),
3586 uniqueness_violation,
3587 )
3588 .unwrap();
3589
3590 assert_ok!(table.insert(2i64.to_le_bytes().to_vec(), "b".to_string(), 0));
3593 assert_ok!(table.insert(3i64.to_le_bytes().to_vec(), "c".to_string(), 0));
3594 assert!(
3595 table
3596 .insert(1i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3597 .is_err()
3598 );
3599 assert!(
3600 table
3601 .insert(4i64.to_le_bytes().to_vec(), "c".to_string(), 0)
3602 .is_err()
3603 );
3604 }
3605
3606 #[mz_ore::test]
3607 fn test_table_transaction() {
3608 fn uniqueness_violation(a: &String, b: &String) -> bool {
3609 a == b
3610 }
3611 let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
3612
3613 fn commit(
3614 table: &mut BTreeMap<Vec<u8>, String>,
3615 mut pending: Vec<(Vec<u8>, String, Diff)>,
3616 ) {
3617 pending.sort_by(|a, b| a.2.cmp(&b.2));
3619 for (k, v, diff) in pending {
3620 if diff == Diff::MINUS_ONE {
3621 let prev = table.remove(&k);
3622 assert_eq!(prev, Some(v));
3623 } else if diff == Diff::ONE {
3624 let prev = table.insert(k, v);
3625 assert_eq!(prev, None);
3626 } else {
3627 panic!("unexpected diff: {diff}");
3628 }
3629 }
3630 }
3631
3632 table.insert(1i64.to_le_bytes().to_vec(), "v1".to_string());
3633 table.insert(2i64.to_le_bytes().to_vec(), "v2".to_string());
3634 let mut table_txn =
3635 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3636 assert_eq!(table_txn.items_cloned(), table);
3637 assert_eq!(table_txn.delete(|_k, _v| false, 0).len(), 0);
3638 assert_eq!(table_txn.delete(|_k, v| v == "v2", 1).len(), 1);
3639 assert_eq!(
3640 table_txn.items_cloned(),
3641 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string())])
3642 );
3643 assert_eq!(
3644 table_txn
3645 .update(|_k, _v| Some("v3".to_string()), 2)
3646 .unwrap(),
3647 Diff::ONE
3648 );
3649
3650 table_txn
3652 .insert(3i64.to_le_bytes().to_vec(), "v3".to_string(), 3)
3653 .unwrap_err();
3654
3655 table_txn
3656 .insert(3i64.to_le_bytes().to_vec(), "v4".to_string(), 4)
3657 .unwrap();
3658 assert_eq!(
3659 table_txn.items_cloned(),
3660 BTreeMap::from([
3661 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3662 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3663 ])
3664 );
3665 let err = table_txn
3666 .update(|_k, _v| Some("v1".to_string()), 5)
3667 .unwrap_err();
3668 assert!(
3669 matches!(err, DurableCatalogError::UniquenessViolation),
3670 "unexpected err: {err:?}"
3671 );
3672 let pending = table_txn.pending();
3673 assert_eq!(
3674 pending,
3675 vec![
3676 (
3677 1i64.to_le_bytes().to_vec(),
3678 "v1".to_string(),
3679 Diff::MINUS_ONE
3680 ),
3681 (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3682 (
3683 2i64.to_le_bytes().to_vec(),
3684 "v2".to_string(),
3685 Diff::MINUS_ONE
3686 ),
3687 (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
3688 ]
3689 );
3690 commit(&mut table, pending);
3691 assert_eq!(
3692 table,
3693 BTreeMap::from([
3694 (1i64.to_le_bytes().to_vec(), "v3".to_string()),
3695 (3i64.to_le_bytes().to_vec(), "v4".to_string())
3696 ])
3697 );
3698
3699 let mut table_txn =
3700 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3701 assert_eq!(
3703 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3704 1
3705 );
3706 table_txn
3707 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3708 .unwrap();
3709 table_txn
3711 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3712 .unwrap_err();
3713 table_txn
3715 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3716 .unwrap_err();
3717 assert_eq!(
3718 table_txn.delete(|k, _v| k == &1i64.to_le_bytes(), 0).len(),
3719 1
3720 );
3721 table_txn
3723 .insert(5i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3724 .unwrap();
3725 table_txn
3726 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 0)
3727 .unwrap();
3728 let pending = table_txn.pending();
3729 assert_eq!(
3730 pending,
3731 vec![
3732 (
3733 1i64.to_le_bytes().to_vec(),
3734 "v3".to_string(),
3735 Diff::MINUS_ONE
3736 ),
3737 (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
3738 (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
3739 ]
3740 );
3741 commit(&mut table, pending);
3742 assert_eq!(
3743 table,
3744 BTreeMap::from([
3745 (1i64.to_le_bytes().to_vec(), "v5".to_string()),
3746 (3i64.to_le_bytes().to_vec(), "v4".to_string()),
3747 (5i64.to_le_bytes().to_vec(), "v3".to_string()),
3748 ])
3749 );
3750
3751 let mut table_txn =
3752 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3753 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 3);
3754 table_txn
3755 .insert(1i64.to_le_bytes().to_vec(), "v1".to_string(), 0)
3756 .unwrap();
3757
3758 commit(&mut table, table_txn.pending());
3759 assert_eq!(
3760 table,
3761 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v1".to_string()),])
3762 );
3763
3764 let mut table_txn =
3765 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3766 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3767 table_txn
3768 .insert(1i64.to_le_bytes().to_vec(), "v2".to_string(), 0)
3769 .unwrap();
3770 commit(&mut table, table_txn.pending());
3771 assert_eq!(
3772 table,
3773 BTreeMap::from([(1i64.to_le_bytes().to_vec(), "v2".to_string()),])
3774 );
3775
3776 let mut table_txn =
3778 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3779 assert_eq!(table_txn.delete(|_k, _v| true, 0).len(), 1);
3780 table_txn
3781 .insert(1i64.to_le_bytes().to_vec(), "v3".to_string(), 0)
3782 .unwrap();
3783 table_txn
3784 .insert(1i64.to_le_bytes().to_vec(), "v4".to_string(), 1)
3785 .unwrap_err();
3786 assert_eq!(table_txn.delete(|_k, _v| true, 1).len(), 1);
3787 table_txn
3788 .insert(1i64.to_le_bytes().to_vec(), "v5".to_string(), 1)
3789 .unwrap();
3790 commit(&mut table, table_txn.pending());
3791 assert_eq!(
3792 table.clone().into_iter().collect::<Vec<_>>(),
3793 vec![(1i64.to_le_bytes().to_vec(), "v5".to_string())]
3794 );
3795
3796 let mut table_txn =
3798 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3799 table_txn
3801 .set(2i64.to_le_bytes().to_vec(), Some("v5".to_string()), 0)
3802 .unwrap_err();
3803 table_txn
3804 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 1)
3805 .unwrap();
3806 table_txn.set(2i64.to_le_bytes().to_vec(), None, 2).unwrap();
3807 table_txn.set(1i64.to_le_bytes().to_vec(), None, 2).unwrap();
3808 let pending = table_txn.pending();
3809 assert_eq!(
3810 pending,
3811 vec![
3812 (
3813 1i64.to_le_bytes().to_vec(),
3814 "v5".to_string(),
3815 Diff::MINUS_ONE
3816 ),
3817 (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3818 ]
3819 );
3820 commit(&mut table, pending);
3821 assert_eq!(
3822 table,
3823 BTreeMap::from([(3i64.to_le_bytes().to_vec(), "v6".to_string())])
3824 );
3825
3826 let mut table_txn =
3828 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3829 table_txn
3830 .set(3i64.to_le_bytes().to_vec(), Some("v6".to_string()), 0)
3831 .unwrap();
3832 let pending = table_txn.pending::<Vec<u8>, String>();
3833 assert!(pending.is_empty());
3834
3835 let mut table_txn =
3837 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3838 table_txn
3840 .set_many(
3841 BTreeMap::from([
3842 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3843 (42i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3844 ]),
3845 0,
3846 )
3847 .unwrap_err();
3848 table_txn
3849 .set_many(
3850 BTreeMap::from([
3851 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3852 (3i64.to_le_bytes().to_vec(), Some("v1".to_string())),
3853 ]),
3854 1,
3855 )
3856 .unwrap();
3857 table_txn
3858 .set_many(
3859 BTreeMap::from([
3860 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3861 (3i64.to_le_bytes().to_vec(), None),
3862 ]),
3863 2,
3864 )
3865 .unwrap();
3866 let pending = table_txn.pending();
3867 assert_eq!(
3868 pending,
3869 vec![
3870 (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
3871 (
3872 3i64.to_le_bytes().to_vec(),
3873 "v6".to_string(),
3874 Diff::MINUS_ONE
3875 ),
3876 (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
3877 ]
3878 );
3879 commit(&mut table, pending);
3880 assert_eq!(
3881 table,
3882 BTreeMap::from([
3883 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3884 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3885 ])
3886 );
3887
3888 let mut table_txn =
3890 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3891 table_txn
3892 .set_many(
3893 BTreeMap::from([
3894 (1i64.to_le_bytes().to_vec(), Some("v6".to_string())),
3895 (42i64.to_le_bytes().to_vec(), Some("v7".to_string())),
3896 ]),
3897 0,
3898 )
3899 .unwrap();
3900 let pending = table_txn.pending::<Vec<u8>, String>();
3901 assert!(pending.is_empty());
3902 commit(&mut table, pending);
3903 assert_eq!(
3904 table,
3905 BTreeMap::from([
3906 (1i64.to_le_bytes().to_vec(), "v6".to_string()),
3907 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3908 ])
3909 );
3910
3911 let mut table_txn =
3913 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3914 table_txn
3916 .update_by_key(1i64.to_le_bytes().to_vec(), "v7".to_string(), 0)
3917 .unwrap_err();
3918 assert!(
3919 table_txn
3920 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 1)
3921 .unwrap()
3922 );
3923 assert!(
3924 !table_txn
3925 .update_by_key(5i64.to_le_bytes().to_vec(), "v8".to_string(), 2)
3926 .unwrap()
3927 );
3928 let pending = table_txn.pending();
3929 assert_eq!(
3930 pending,
3931 vec![
3932 (
3933 1i64.to_le_bytes().to_vec(),
3934 "v6".to_string(),
3935 Diff::MINUS_ONE
3936 ),
3937 (1i64.to_le_bytes().to_vec(), "v8".to_string(), Diff::ONE),
3938 ]
3939 );
3940 commit(&mut table, pending);
3941 assert_eq!(
3942 table,
3943 BTreeMap::from([
3944 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3945 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3946 ])
3947 );
3948
3949 let mut table_txn =
3951 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3952 assert!(
3953 table_txn
3954 .update_by_key(1i64.to_le_bytes().to_vec(), "v8".to_string(), 0)
3955 .unwrap()
3956 );
3957 let pending = table_txn.pending::<Vec<u8>, String>();
3958 assert!(pending.is_empty());
3959 commit(&mut table, pending);
3960 assert_eq!(
3961 table,
3962 BTreeMap::from([
3963 (1i64.to_le_bytes().to_vec(), "v8".to_string()),
3964 (42i64.to_le_bytes().to_vec(), "v7".to_string())
3965 ])
3966 );
3967
3968 let mut table_txn =
3970 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
3971 table_txn
3973 .update_by_keys(
3974 [
3975 (1i64.to_le_bytes().to_vec(), "v7".to_string()),
3976 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3977 ],
3978 0,
3979 )
3980 .unwrap_err();
3981 let n = table_txn
3982 .update_by_keys(
3983 [
3984 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
3985 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3986 ],
3987 1,
3988 )
3989 .unwrap();
3990 assert_eq!(n, Diff::ONE);
3991 let n = table_txn
3992 .update_by_keys(
3993 [
3994 (15i64.to_le_bytes().to_vec(), "v9".to_string()),
3995 (5i64.to_le_bytes().to_vec(), "v7".to_string()),
3996 ],
3997 2,
3998 )
3999 .unwrap();
4000 assert_eq!(n, Diff::ZERO);
4001 let pending = table_txn.pending();
4002 assert_eq!(
4003 pending,
4004 vec![
4005 (
4006 1i64.to_le_bytes().to_vec(),
4007 "v8".to_string(),
4008 Diff::MINUS_ONE
4009 ),
4010 (1i64.to_le_bytes().to_vec(), "v9".to_string(), Diff::ONE),
4011 ]
4012 );
4013 commit(&mut table, pending);
4014 assert_eq!(
4015 table,
4016 BTreeMap::from([
4017 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4018 (42i64.to_le_bytes().to_vec(), "v7".to_string())
4019 ])
4020 );
4021
4022 let mut table_txn =
4024 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4025 let n = table_txn
4026 .update_by_keys(
4027 [
4028 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4029 (42i64.to_le_bytes().to_vec(), "v7".to_string()),
4030 ],
4031 0,
4032 )
4033 .unwrap();
4034 assert_eq!(n, Diff::from(2));
4035 let pending = table_txn.pending::<Vec<u8>, String>();
4036 assert!(pending.is_empty());
4037 commit(&mut table, pending);
4038 assert_eq!(
4039 table,
4040 BTreeMap::from([
4041 (1i64.to_le_bytes().to_vec(), "v9".to_string()),
4042 (42i64.to_le_bytes().to_vec(), "v7".to_string())
4043 ])
4044 );
4045
4046 let mut table_txn =
4048 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4049 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 0);
4050 assert_eq!(prev, Some("v9".to_string()));
4051 let prev = table_txn.delete_by_key(5i64.to_le_bytes().to_vec(), 1);
4052 assert_none!(prev);
4053 let prev = table_txn.delete_by_key(1i64.to_le_bytes().to_vec(), 2);
4054 assert_none!(prev);
4055 let pending = table_txn.pending();
4056 assert_eq!(
4057 pending,
4058 vec![(
4059 1i64.to_le_bytes().to_vec(),
4060 "v9".to_string(),
4061 Diff::MINUS_ONE
4062 ),]
4063 );
4064 commit(&mut table, pending);
4065 assert_eq!(
4066 table,
4067 BTreeMap::from([(42i64.to_le_bytes().to_vec(), "v7".to_string())])
4068 );
4069
4070 let mut table_txn =
4072 TableTransaction::new_with_uniqueness_fn(table.clone(), uniqueness_violation).unwrap();
4073 let prevs = table_txn.delete_by_keys(
4074 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4075 0,
4076 );
4077 assert_eq!(
4078 prevs,
4079 vec![(42i64.to_le_bytes().to_vec(), "v7".to_string())]
4080 );
4081 let prevs = table_txn.delete_by_keys(
4082 [42i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4083 1,
4084 );
4085 assert_eq!(prevs, vec![]);
4086 let prevs = table_txn.delete_by_keys(
4087 [10i64.to_le_bytes().to_vec(), 55i64.to_le_bytes().to_vec()],
4088 2,
4089 );
4090 assert_eq!(prevs, vec![]);
4091 let pending = table_txn.pending();
4092 assert_eq!(
4093 pending,
4094 vec![(
4095 42i64.to_le_bytes().to_vec(),
4096 "v7".to_string(),
4097 Diff::MINUS_ONE
4098 ),]
4099 );
4100 commit(&mut table, pending);
4101 assert_eq!(table, BTreeMap::new());
4102 }
4103
4104 #[mz_ore::test(tokio::test)]
4105 #[cfg_attr(miri, ignore)] async fn test_savepoint() {
4107 const VERSION: Version = Version::new(26, 0, 0);
4108 let mut persist_cache = PersistClientCache::new_no_metrics();
4109 persist_cache.cfg.build_version = VERSION;
4110 let persist_client = persist_cache
4111 .open(PersistLocation::new_in_mem())
4112 .await
4113 .unwrap();
4114 let state_builder = TestCatalogStateBuilder::new(persist_client)
4115 .with_default_deploy_generation()
4116 .with_version(VERSION);
4117
4118 let _ = state_builder
4120 .clone()
4121 .unwrap_build()
4122 .await
4123 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4124 .await
4125 .unwrap()
4126 .0;
4127 let mut savepoint_state = state_builder
4128 .unwrap_build()
4129 .await
4130 .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
4131 .await
4132 .unwrap()
4133 .0;
4134
4135 let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
4136 assert!(!initial_snapshot.is_empty());
4137
4138 let db_name = "db";
4139 let db_owner = RoleId::User(42);
4140 let db_privileges = Vec::new();
4141 let mut txn = savepoint_state.transaction().await.unwrap();
4142 let (db_id, db_oid) = txn
4143 .insert_user_database(db_name, db_owner, db_privileges.clone(), &HashSet::new())
4144 .unwrap();
4145 let commit_ts = txn.upper();
4146 txn.commit_internal(commit_ts).await.unwrap();
4147 let updates = savepoint_state.sync_to_current_updates().await.unwrap();
4148 let update = updates.into_element();
4149
4150 assert_eq!(update.diff, StateDiff::Addition);
4151
4152 let db = match update.kind {
4153 memory::objects::StateUpdateKind::Database(db) => db,
4154 update => panic!("unexpected update: {update:?}"),
4155 };
4156
4157 assert_eq!(db_id, db.id);
4158 assert_eq!(db_oid, db.oid);
4159 assert_eq!(db_name, db.name);
4160 assert_eq!(db_owner, db.owner_id);
4161 assert_eq!(db_privileges, db.privileges);
4162 }
4163
4164 #[mz_ore::test(tokio::test)]
4168 #[cfg_attr(miri, ignore)] async fn test_insert_replica_with_id_does_not_consume_allocator() {
4170 const VERSION: Version = Version::new(26, 0, 0);
4171 let mut persist_cache = PersistClientCache::new_no_metrics();
4172 persist_cache.cfg.build_version = VERSION;
4173 let persist_client = persist_cache
4174 .open(PersistLocation::new_in_mem())
4175 .await
4176 .unwrap();
4177 let state_builder = TestCatalogStateBuilder::new(persist_client)
4178 .with_default_deploy_generation()
4179 .with_version(VERSION);
4180 let mut state = state_builder
4181 .unwrap_build()
4182 .await
4183 .open(SYSTEM_TIME().into(), &test_bootstrap_args())
4184 .await
4185 .unwrap()
4186 .0;
4187
4188 let cluster_id = ClusterId::User(1);
4191 let owner_id = RoleId::User(1);
4192 let config = ReplicaConfig {
4193 location: ReplicaLocation::Managed {
4194 size: "1".to_string(),
4195 availability_zones: Vec::new(),
4196 internal: false,
4197 billed_as: None,
4198 pending: false,
4199 },
4200 logging: ReplicaLogging {
4201 log_logging: false,
4202 interval: Some(Duration::from_secs(1)),
4203 },
4204 };
4205
4206 let commit_ts = state.current_upper().await;
4208 let a = state
4209 .allocate_user_replica_ids(1, commit_ts)
4210 .await
4211 .unwrap()
4212 .into_element();
4213 assert!(a.is_user());
4214
4215 let mut txn = state.transaction().await.unwrap();
4217 txn.insert_cluster_replica_with_id(cluster_id, a, "explicit", config, owner_id)
4218 .unwrap();
4219 let commit_ts = txn.upper();
4220 txn.commit_internal(commit_ts).await.unwrap();
4221
4222 let commit_ts = state.current_upper().await;
4224 let b = state
4225 .allocate_user_replica_ids(1, commit_ts)
4226 .await
4227 .unwrap()
4228 .into_element();
4229
4230 assert_eq!(b.inner_id(), a.inner_id() + 1);
4233
4234 let txn = state.transaction().await.unwrap();
4236 let found = txn
4237 .get_cluster_replicas()
4238 .any(|replica| replica.replica_id == a);
4239 assert!(found, "explicitly inserted replica {a} not found");
4240 }
4241
4242 #[mz_ore::test]
4243 fn test_allocate_introspection_source_index_id() {
4244 let cluster_variant: u8 = 0b0000_0001;
4245 let cluster_id_inner: u64 =
4246 0b0000_0000_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010;
4247 let timely_messages_received_log_variant: u8 = 0b0000_1000;
4248
4249 let cluster_id = ClusterId::System(cluster_id_inner);
4250 let log_variant = LogVariant::Timely(TimelyLog::MessagesReceived);
4251
4252 let introspection_source_index_id: u64 =
4253 0b0000_0001_1100_0101_1100_0011_1010_1101_0000_1011_1111_1001_0110_1010_0000_1000;
4254
4255 {
4257 let mut cluster_variant_mask = 0xFF << 56;
4258 cluster_variant_mask &= introspection_source_index_id;
4259 cluster_variant_mask >>= 56;
4260 assert_eq!(cluster_variant_mask, u64::from(cluster_variant));
4261 }
4262
4263 {
4265 let mut cluster_id_inner_mask = 0xFFFF_FFFF_FFFF << 8;
4266 cluster_id_inner_mask &= introspection_source_index_id;
4267 cluster_id_inner_mask >>= 8;
4268 assert_eq!(cluster_id_inner_mask, cluster_id_inner);
4269 }
4270
4271 {
4273 let mut log_variant_mask = 0xFF;
4274 log_variant_mask &= introspection_source_index_id;
4275 assert_eq!(
4276 log_variant_mask,
4277 u64::from(timely_messages_received_log_variant)
4278 );
4279 }
4280
4281 let (catalog_item_id, global_id) =
4282 Transaction::allocate_introspection_source_index_id(&cluster_id, log_variant);
4283
4284 assert_eq!(
4285 catalog_item_id,
4286 CatalogItemId::IntrospectionSourceIndex(introspection_source_index_id)
4287 );
4288 assert_eq!(
4289 global_id,
4290 GlobalId::IntrospectionSourceIndex(introspection_source_index_id)
4291 );
4292 }
4293}