1use std::fmt::Debug;
14
15use mz_repr::Diff;
16use serde::{Deserialize, Serialize};
17use serde_plain::{derive_display_from_serialize, derive_fromstr_from_deserialize};
18
19use crate::durable::CatalogError;
20use crate::durable::objects::serialization::proto;
21use crate::durable::objects::state_update::StateUpdateKind;
22use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
23
24pub trait Collection: Debug {
27 type Key;
29 type Value;
31
32 fn collection_type() -> CollectionType;
34
35 fn collection_trace(trace: Trace) -> CollectionTrace<Self>;
37
38 fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind;
40
41 fn name() -> String {
43 Self::collection_type().to_string()
44 }
45}
46
47#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)]
54#[serde(rename_all = "snake_case")]
55pub enum CollectionType {
56 AuditLog,
57 ComputeInstance,
58 ComputeIntrospectionSourceIndex,
59 ComputeReplicas,
60 Comments,
61 Config,
62 Database,
63 DefaultPrivileges,
64 IdAlloc,
65 Item,
66 NetworkPolicy,
67 Role,
68 RoleAuth,
69 Schema,
70 Setting,
71 SourceReferences,
72 SystemConfiguration,
73 ClusterSystemConfiguration,
74 ReplicaSystemConfiguration,
75 SystemGidMapping,
76 SystemPrivileges,
77 StorageCollectionMetadata,
78 UnfinalizedShard,
79 TxnWalShard,
80}
81
82derive_display_from_serialize!(CollectionType);
83derive_fromstr_from_deserialize!(CollectionType);
84
85macro_rules! collection_impl {
95 ({
96 name: $name:ident,
97 key: $key:ty,
98 value: $value:ty,
99 collection_type: $collection_type:expr,
100 trace_field: $trace_field:ident,
101 update: $update:expr,
102}) => {
103 #[derive(Debug, Clone, PartialEq, Eq)]
104 pub struct $name {}
105
106 impl Collection for $name {
107 type Key = $key;
108 type Value = $value;
109
110 fn collection_type() -> CollectionType {
111 $collection_type
112 }
113
114 fn collection_trace(trace: Trace) -> CollectionTrace<Self> {
115 trace.$trace_field
116 }
117
118 fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind {
119 $update(key, value)
120 }
121 }
122 };
123}
124
125collection_impl!({
126 name: AuditLogCollection,
127 key: proto::AuditLogKey,
128 value: (),
129 collection_type: CollectionType::AuditLog,
130 trace_field: audit_log,
131 update: StateUpdateKind::AuditLog,
132});
133collection_impl!({
134 name: ClusterCollection,
135 key: proto::ClusterKey,
136 value: proto::ClusterValue,
137 collection_type: CollectionType::ComputeInstance,
138 trace_field: clusters,
139 update: StateUpdateKind::Cluster,
140});
141collection_impl!({
142 name: ClusterIntrospectionSourceIndexCollection,
143 key: proto::ClusterIntrospectionSourceIndexKey,
144 value: proto::ClusterIntrospectionSourceIndexValue,
145 collection_type: CollectionType::ComputeIntrospectionSourceIndex,
146 trace_field: introspection_sources,
147 update: StateUpdateKind::IntrospectionSourceIndex,
148});
149collection_impl!({
150 name: ClusterReplicaCollection,
151 key: proto::ClusterReplicaKey,
152 value: proto::ClusterReplicaValue,
153 collection_type: CollectionType::ComputeReplicas,
154 trace_field: cluster_replicas,
155 update: StateUpdateKind::ClusterReplica,
156});
157collection_impl!({
158 name: CommentCollection,
159 key: proto::CommentKey,
160 value: proto::CommentValue,
161 collection_type: CollectionType::Comments,
162 trace_field: comments,
163 update: StateUpdateKind::Comment,
164});
165collection_impl!({
166 name: ConfigCollection,
167 key: proto::ConfigKey,
168 value: proto::ConfigValue,
169 collection_type: CollectionType::Config,
170 trace_field: configs,
171 update: StateUpdateKind::Config,
172});
173collection_impl!({
174 name: DatabaseCollection,
175 key: proto::DatabaseKey,
176 value: proto::DatabaseValue,
177 collection_type: CollectionType::Database,
178 trace_field: databases,
179 update: StateUpdateKind::Database,
180});
181collection_impl!({
182 name: DefaultPrivilegeCollection,
183 key: proto::DefaultPrivilegesKey,
184 value: proto::DefaultPrivilegesValue,
185 collection_type: CollectionType::DefaultPrivileges,
186 trace_field: default_privileges,
187 update: StateUpdateKind::DefaultPrivilege,
188});
189collection_impl!({
190 name: IdAllocatorCollection,
191 key: proto::IdAllocKey,
192 value: proto::IdAllocValue,
193 collection_type: CollectionType::IdAlloc,
194 trace_field: id_allocator,
195 update: StateUpdateKind::IdAllocator,
196});
197collection_impl!({
198 name: ItemCollection,
199 key: proto::ItemKey,
200 value: proto::ItemValue,
201 collection_type: CollectionType::Item,
202 trace_field: items,
203 update: StateUpdateKind::Item,
204});
205collection_impl!({
206 name: NetworkPolicyCollection,
207 key: proto::NetworkPolicyKey,
208 value: proto::NetworkPolicyValue,
209 collection_type: CollectionType::NetworkPolicy,
210 trace_field: network_policies,
211 update: StateUpdateKind::NetworkPolicy,
212});
213collection_impl!({
214 name: RoleCollection,
215 key: proto::RoleKey,
216 value: proto::RoleValue,
217 collection_type: CollectionType::Role,
218 trace_field: roles,
219 update: StateUpdateKind::Role,
220});
221collection_impl!({
222 name: RoleAuthCollection,
223 key: proto::RoleAuthKey,
224 value: proto::RoleAuthValue,
225 collection_type: CollectionType::RoleAuth,
226 trace_field: role_auth,
227 update: StateUpdateKind::RoleAuth,
228});
229collection_impl!({
230 name: SchemaCollection,
231 key: proto::SchemaKey,
232 value: proto::SchemaValue,
233 collection_type: CollectionType::Schema,
234 trace_field: schemas,
235 update: StateUpdateKind::Schema,
236});
237collection_impl!({
238 name: SettingCollection,
239 key: proto::SettingKey,
240 value: proto::SettingValue,
241 collection_type: CollectionType::Setting,
242 trace_field: settings,
243 update: StateUpdateKind::Setting,
244});
245collection_impl!({
246 name: SourceReferencesCollection,
247 key: proto::SourceReferencesKey,
248 value: proto::SourceReferencesValue,
249 collection_type: CollectionType::SourceReferences,
250 trace_field: source_references,
251 update: StateUpdateKind::SourceReferences,
252});
253collection_impl!({
254 name: SystemConfigurationCollection,
255 key: proto::ServerConfigurationKey,
256 value: proto::ServerConfigurationValue,
257 collection_type: CollectionType::SystemConfiguration,
258 trace_field: system_configurations,
259 update: StateUpdateKind::SystemConfiguration,
260});
261collection_impl!({
262 name: ClusterSystemConfigurationCollection,
263 key: proto::ClusterSystemConfigurationKey,
264 value: proto::ClusterSystemConfigurationValue,
265 collection_type: CollectionType::ClusterSystemConfiguration,
266 trace_field: cluster_system_configurations,
267 update: StateUpdateKind::ClusterSystemConfiguration,
268});
269collection_impl!({
270 name: ReplicaSystemConfigurationCollection,
271 key: proto::ReplicaSystemConfigurationKey,
272 value: proto::ReplicaSystemConfigurationValue,
273 collection_type: CollectionType::ReplicaSystemConfiguration,
274 trace_field: replica_system_configurations,
275 update: StateUpdateKind::ReplicaSystemConfiguration,
276});
277collection_impl!({
278 name: SystemItemMappingCollection,
279 key: proto::GidMappingKey,
280 value: proto::GidMappingValue,
281 collection_type: CollectionType::SystemGidMapping,
282 trace_field: system_object_mappings,
283 update: StateUpdateKind::SystemObjectMapping,
284});
285collection_impl!({
286 name: SystemPrivilegeCollection,
287 key: proto::SystemPrivilegesKey,
288 value: proto::SystemPrivilegesValue,
289 collection_type: CollectionType::SystemPrivileges,
290 trace_field: system_privileges,
291 update: StateUpdateKind::SystemPrivilege,
292});
293
294collection_impl!({
295 name: StorageCollectionMetadataCollection,
296 key: proto::StorageCollectionMetadataKey,
297 value: proto::StorageCollectionMetadataValue,
298 collection_type: CollectionType::StorageCollectionMetadata,
299 trace_field: storage_collection_metadata,
300 update: StateUpdateKind::StorageCollectionMetadata,
301});
302collection_impl!({
303 name: UnfinalizedShardsCollection,
304 key: proto::UnfinalizedShardKey,
305 value: (),
306 collection_type: CollectionType::UnfinalizedShard,
307 trace_field: unfinalized_shards,
308 update: StateUpdateKind::UnfinalizedShard,
309});
310collection_impl!({
311 name: TxnWalShardCollection,
312 key: (),
313 value: proto::TxnWalShardValue,
314 collection_type: CollectionType::TxnWalShard,
315 trace_field: txn_wal_shard,
316 update: StateUpdateKind::TxnWalShard,
317});
318
319#[derive(Debug, Clone, PartialEq, Eq)]
324pub struct CollectionTrace<T: Collection + ?Sized> {
325 pub values: Vec<((T::Key, T::Value), Timestamp, Diff)>,
326}
327
328impl<T: Collection> CollectionTrace<T> {
329 fn new() -> CollectionTrace<T> {
330 CollectionTrace { values: Vec::new() }
331 }
332}
333
334impl<T: Collection> CollectionTrace<T>
335where
336 T: Collection,
337 T::Key: Ord,
338 T::Value: Ord,
339{
340 fn sort(&mut self) {
341 self.values
342 .sort_by(|(x1, ts1, d1), (x2, ts2, d2)| ts1.cmp(ts2).then(d1.cmp(d2)).then(x1.cmp(x2)));
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq)]
348pub struct Trace {
349 pub audit_log: CollectionTrace<AuditLogCollection>,
350 pub clusters: CollectionTrace<ClusterCollection>,
351 pub introspection_sources: CollectionTrace<ClusterIntrospectionSourceIndexCollection>,
352 pub cluster_replicas: CollectionTrace<ClusterReplicaCollection>,
353 pub comments: CollectionTrace<CommentCollection>,
354 pub configs: CollectionTrace<ConfigCollection>,
355 pub databases: CollectionTrace<DatabaseCollection>,
356 pub default_privileges: CollectionTrace<DefaultPrivilegeCollection>,
357 pub id_allocator: CollectionTrace<IdAllocatorCollection>,
358 pub items: CollectionTrace<ItemCollection>,
359 pub network_policies: CollectionTrace<NetworkPolicyCollection>,
360 pub roles: CollectionTrace<RoleCollection>,
361 pub role_auth: CollectionTrace<RoleAuthCollection>,
362 pub schemas: CollectionTrace<SchemaCollection>,
363 pub settings: CollectionTrace<SettingCollection>,
364 pub source_references: CollectionTrace<SourceReferencesCollection>,
365 pub system_object_mappings: CollectionTrace<SystemItemMappingCollection>,
366 pub system_configurations: CollectionTrace<SystemConfigurationCollection>,
367 pub cluster_system_configurations: CollectionTrace<ClusterSystemConfigurationCollection>,
368 pub replica_system_configurations: CollectionTrace<ReplicaSystemConfigurationCollection>,
369 pub system_privileges: CollectionTrace<SystemPrivilegeCollection>,
370 pub storage_collection_metadata: CollectionTrace<StorageCollectionMetadataCollection>,
371 pub unfinalized_shards: CollectionTrace<UnfinalizedShardsCollection>,
372 pub txn_wal_shard: CollectionTrace<TxnWalShardCollection>,
373}
374
375impl Trace {
376 pub(crate) fn new() -> Trace {
377 Trace {
378 audit_log: CollectionTrace::new(),
379 clusters: CollectionTrace::new(),
380 introspection_sources: CollectionTrace::new(),
381 cluster_replicas: CollectionTrace::new(),
382 comments: CollectionTrace::new(),
383 configs: CollectionTrace::new(),
384 databases: CollectionTrace::new(),
385 default_privileges: CollectionTrace::new(),
386 id_allocator: CollectionTrace::new(),
387 items: CollectionTrace::new(),
388 network_policies: CollectionTrace::new(),
389 roles: CollectionTrace::new(),
390 role_auth: CollectionTrace::new(),
391 schemas: CollectionTrace::new(),
392 settings: CollectionTrace::new(),
393 source_references: CollectionTrace::new(),
394 system_object_mappings: CollectionTrace::new(),
395 system_configurations: CollectionTrace::new(),
396 cluster_system_configurations: CollectionTrace::new(),
397 replica_system_configurations: CollectionTrace::new(),
398 system_privileges: CollectionTrace::new(),
399 storage_collection_metadata: CollectionTrace::new(),
400 unfinalized_shards: CollectionTrace::new(),
401 txn_wal_shard: CollectionTrace::new(),
402 }
403 }
404
405 pub fn sort(&mut self) {
406 let Trace {
407 audit_log,
408 clusters,
409 introspection_sources,
410 cluster_replicas,
411 comments,
412 configs,
413 databases,
414 default_privileges,
415 id_allocator,
416 items,
417 network_policies,
418 roles,
419 role_auth,
420 schemas,
421 settings,
422 source_references,
423 system_object_mappings,
424 system_configurations,
425 cluster_system_configurations,
426 replica_system_configurations,
427 system_privileges,
428 storage_collection_metadata,
429 unfinalized_shards,
430 txn_wal_shard,
431 } = self;
432 audit_log.sort();
433 clusters.sort();
434 introspection_sources.sort();
435 cluster_replicas.sort();
436 comments.sort();
437 configs.sort();
438 databases.sort();
439 default_privileges.sort();
440 id_allocator.sort();
441 items.sort();
442 network_policies.sort();
443 roles.sort();
444 role_auth.sort();
445 schemas.sort();
446 settings.sort();
447 source_references.sort();
448 system_object_mappings.sort();
449 system_configurations.sort();
450 cluster_system_configurations.sort();
451 replica_system_configurations.sort();
452 system_privileges.sort();
453 storage_collection_metadata.sort();
454 unfinalized_shards.sort();
455 txn_wal_shard.sort();
456 }
457}
458
459pub struct DebugCatalogState(pub(crate) UnopenedPersistCatalogState);
460
461impl DebugCatalogState {
462 pub async fn edit<T: Collection>(
464 &mut self,
465 key: T::Key,
466 value: T::Value,
467 ) -> Result<Option<T::Value>, CatalogError>
468 where
469 T::Key: PartialEq + Eq + Debug + Clone,
470 T::Value: Debug + Clone,
471 {
472 self.0.debug_edit::<T>(key, value).await
473 }
474
475 pub async fn delete<T: Collection>(&mut self, key: T::Key) -> Result<(), CatalogError>
477 where
478 T::Key: PartialEq + Eq + Debug + Clone,
479 T::Value: Debug,
480 {
481 self.0.debug_delete::<T>(key).await
482 }
483}