1use std::fmt::Debug;
14
15use mz_repr::Diff;
16use serde::{Deserialize, Serialize};
17use serde_plain::{derive_display_from_serialize, derive_fromstr_from_deserialize};
18
19use crate::durable::CatalogError;
20use crate::durable::objects::serialization::proto;
21use crate::durable::objects::state_update::StateUpdateKind;
22use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
23
24pub trait Collection: Debug {
27 type Key;
29 type Value;
31
32 fn collection_type() -> CollectionType;
34
35 fn collection_trace(trace: Trace) -> CollectionTrace<Self>;
37
38 fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind;
40
41 fn name() -> String {
43 Self::collection_type().to_string()
44 }
45}
46
47#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)]
54#[serde(rename_all = "snake_case")]
55pub enum CollectionType {
56 AuditLog,
57 ComputeInstance,
58 ComputeIntrospectionSourceIndex,
59 ComputeReplicas,
60 Comments,
61 Config,
62 Database,
63 DefaultPrivileges,
64 IdAlloc,
65 Item,
66 NetworkPolicy,
67 Role,
68 RoleAuth,
69 Schema,
70 Setting,
71 SourceReferences,
72 SystemConfiguration,
73 SystemGidMapping,
74 SystemPrivileges,
75 StorageCollectionMetadata,
76 UnfinalizedShard,
77 TxnWalShard,
78}
79
80derive_display_from_serialize!(CollectionType);
81derive_fromstr_from_deserialize!(CollectionType);
82
83macro_rules! collection_impl {
93 ({
94 name: $name:ident,
95 key: $key:ty,
96 value: $value:ty,
97 collection_type: $collection_type:expr,
98 trace_field: $trace_field:ident,
99 update: $update:expr,
100}) => {
101 #[derive(Debug, Clone, PartialEq, Eq)]
102 pub struct $name {}
103
104 impl Collection for $name {
105 type Key = $key;
106 type Value = $value;
107
108 fn collection_type() -> CollectionType {
109 $collection_type
110 }
111
112 fn collection_trace(trace: Trace) -> CollectionTrace<Self> {
113 trace.$trace_field
114 }
115
116 fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind {
117 $update(key, value)
118 }
119 }
120 };
121}
122
123collection_impl!({
124 name: AuditLogCollection,
125 key: proto::AuditLogKey,
126 value: (),
127 collection_type: CollectionType::AuditLog,
128 trace_field: audit_log,
129 update: StateUpdateKind::AuditLog,
130});
131collection_impl!({
132 name: ClusterCollection,
133 key: proto::ClusterKey,
134 value: proto::ClusterValue,
135 collection_type: CollectionType::ComputeInstance,
136 trace_field: clusters,
137 update: StateUpdateKind::Cluster,
138});
139collection_impl!({
140 name: ClusterIntrospectionSourceIndexCollection,
141 key: proto::ClusterIntrospectionSourceIndexKey,
142 value: proto::ClusterIntrospectionSourceIndexValue,
143 collection_type: CollectionType::ComputeIntrospectionSourceIndex,
144 trace_field: introspection_sources,
145 update: StateUpdateKind::IntrospectionSourceIndex,
146});
147collection_impl!({
148 name: ClusterReplicaCollection,
149 key: proto::ClusterReplicaKey,
150 value: proto::ClusterReplicaValue,
151 collection_type: CollectionType::ComputeReplicas,
152 trace_field: cluster_replicas,
153 update: StateUpdateKind::ClusterReplica,
154});
155collection_impl!({
156 name: CommentCollection,
157 key: proto::CommentKey,
158 value: proto::CommentValue,
159 collection_type: CollectionType::Comments,
160 trace_field: comments,
161 update: StateUpdateKind::Comment,
162});
163collection_impl!({
164 name: ConfigCollection,
165 key: proto::ConfigKey,
166 value: proto::ConfigValue,
167 collection_type: CollectionType::Config,
168 trace_field: configs,
169 update: StateUpdateKind::Config,
170});
171collection_impl!({
172 name: DatabaseCollection,
173 key: proto::DatabaseKey,
174 value: proto::DatabaseValue,
175 collection_type: CollectionType::Database,
176 trace_field: databases,
177 update: StateUpdateKind::Database,
178});
179collection_impl!({
180 name: DefaultPrivilegeCollection,
181 key: proto::DefaultPrivilegesKey,
182 value: proto::DefaultPrivilegesValue,
183 collection_type: CollectionType::DefaultPrivileges,
184 trace_field: default_privileges,
185 update: StateUpdateKind::DefaultPrivilege,
186});
187collection_impl!({
188 name: IdAllocatorCollection,
189 key: proto::IdAllocKey,
190 value: proto::IdAllocValue,
191 collection_type: CollectionType::IdAlloc,
192 trace_field: id_allocator,
193 update: StateUpdateKind::IdAllocator,
194});
195collection_impl!({
196 name: ItemCollection,
197 key: proto::ItemKey,
198 value: proto::ItemValue,
199 collection_type: CollectionType::Item,
200 trace_field: items,
201 update: StateUpdateKind::Item,
202});
203collection_impl!({
204 name: NetworkPolicyCollection,
205 key: proto::NetworkPolicyKey,
206 value: proto::NetworkPolicyValue,
207 collection_type: CollectionType::NetworkPolicy,
208 trace_field: network_policies,
209 update: StateUpdateKind::NetworkPolicy,
210});
211collection_impl!({
212 name: RoleCollection,
213 key: proto::RoleKey,
214 value: proto::RoleValue,
215 collection_type: CollectionType::Role,
216 trace_field: roles,
217 update: StateUpdateKind::Role,
218});
219collection_impl!({
220 name: RoleAuthCollection,
221 key: proto::RoleAuthKey,
222 value: proto::RoleAuthValue,
223 collection_type: CollectionType::RoleAuth,
224 trace_field: role_auth,
225 update: StateUpdateKind::RoleAuth,
226});
227collection_impl!({
228 name: SchemaCollection,
229 key: proto::SchemaKey,
230 value: proto::SchemaValue,
231 collection_type: CollectionType::Schema,
232 trace_field: schemas,
233 update: StateUpdateKind::Schema,
234});
235collection_impl!({
236 name: SettingCollection,
237 key: proto::SettingKey,
238 value: proto::SettingValue,
239 collection_type: CollectionType::Setting,
240 trace_field: settings,
241 update: StateUpdateKind::Setting,
242});
243collection_impl!({
244 name: SourceReferencesCollection,
245 key: proto::SourceReferencesKey,
246 value: proto::SourceReferencesValue,
247 collection_type: CollectionType::SourceReferences,
248 trace_field: source_references,
249 update: StateUpdateKind::SourceReferences,
250});
251collection_impl!({
252 name: SystemConfigurationCollection,
253 key: proto::ServerConfigurationKey,
254 value: proto::ServerConfigurationValue,
255 collection_type: CollectionType::SystemConfiguration,
256 trace_field: system_configurations,
257 update: StateUpdateKind::SystemConfiguration,
258});
259collection_impl!({
260 name: SystemItemMappingCollection,
261 key: proto::GidMappingKey,
262 value: proto::GidMappingValue,
263 collection_type: CollectionType::SystemGidMapping,
264 trace_field: system_object_mappings,
265 update: StateUpdateKind::SystemObjectMapping,
266});
267collection_impl!({
268 name: SystemPrivilegeCollection,
269 key: proto::SystemPrivilegesKey,
270 value: proto::SystemPrivilegesValue,
271 collection_type: CollectionType::SystemPrivileges,
272 trace_field: system_privileges,
273 update: StateUpdateKind::SystemPrivilege,
274});
275
276collection_impl!({
277 name: StorageCollectionMetadataCollection,
278 key: proto::StorageCollectionMetadataKey,
279 value: proto::StorageCollectionMetadataValue,
280 collection_type: CollectionType::StorageCollectionMetadata,
281 trace_field: storage_collection_metadata,
282 update: StateUpdateKind::StorageCollectionMetadata,
283});
284collection_impl!({
285 name: UnfinalizedShardsCollection,
286 key: proto::UnfinalizedShardKey,
287 value: (),
288 collection_type: CollectionType::UnfinalizedShard,
289 trace_field: unfinalized_shards,
290 update: StateUpdateKind::UnfinalizedShard,
291});
292collection_impl!({
293 name: TxnWalShardCollection,
294 key: (),
295 value: proto::TxnWalShardValue,
296 collection_type: CollectionType::TxnWalShard,
297 trace_field: txn_wal_shard,
298 update: StateUpdateKind::TxnWalShard,
299});
300
301#[derive(Debug, Clone, PartialEq, Eq)]
306pub struct CollectionTrace<T: Collection + ?Sized> {
307 pub values: Vec<((T::Key, T::Value), Timestamp, Diff)>,
308}
309
310impl<T: Collection> CollectionTrace<T> {
311 fn new() -> CollectionTrace<T> {
312 CollectionTrace { values: Vec::new() }
313 }
314}
315
316impl<T: Collection> CollectionTrace<T>
317where
318 T: Collection,
319 T::Key: Ord,
320 T::Value: Ord,
321{
322 fn sort(&mut self) {
323 self.values
324 .sort_by(|(x1, ts1, d1), (x2, ts2, d2)| ts1.cmp(ts2).then(d1.cmp(d2)).then(x1.cmp(x2)));
325 }
326}
327
328#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct Trace {
331 pub audit_log: CollectionTrace<AuditLogCollection>,
332 pub clusters: CollectionTrace<ClusterCollection>,
333 pub introspection_sources: CollectionTrace<ClusterIntrospectionSourceIndexCollection>,
334 pub cluster_replicas: CollectionTrace<ClusterReplicaCollection>,
335 pub comments: CollectionTrace<CommentCollection>,
336 pub configs: CollectionTrace<ConfigCollection>,
337 pub databases: CollectionTrace<DatabaseCollection>,
338 pub default_privileges: CollectionTrace<DefaultPrivilegeCollection>,
339 pub id_allocator: CollectionTrace<IdAllocatorCollection>,
340 pub items: CollectionTrace<ItemCollection>,
341 pub network_policies: CollectionTrace<NetworkPolicyCollection>,
342 pub roles: CollectionTrace<RoleCollection>,
343 pub role_auth: CollectionTrace<RoleAuthCollection>,
344 pub schemas: CollectionTrace<SchemaCollection>,
345 pub settings: CollectionTrace<SettingCollection>,
346 pub source_references: CollectionTrace<SourceReferencesCollection>,
347 pub system_object_mappings: CollectionTrace<SystemItemMappingCollection>,
348 pub system_configurations: CollectionTrace<SystemConfigurationCollection>,
349 pub system_privileges: CollectionTrace<SystemPrivilegeCollection>,
350 pub storage_collection_metadata: CollectionTrace<StorageCollectionMetadataCollection>,
351 pub unfinalized_shards: CollectionTrace<UnfinalizedShardsCollection>,
352 pub txn_wal_shard: CollectionTrace<TxnWalShardCollection>,
353}
354
355impl Trace {
356 pub(crate) fn new() -> Trace {
357 Trace {
358 audit_log: CollectionTrace::new(),
359 clusters: CollectionTrace::new(),
360 introspection_sources: CollectionTrace::new(),
361 cluster_replicas: CollectionTrace::new(),
362 comments: CollectionTrace::new(),
363 configs: CollectionTrace::new(),
364 databases: CollectionTrace::new(),
365 default_privileges: CollectionTrace::new(),
366 id_allocator: CollectionTrace::new(),
367 items: CollectionTrace::new(),
368 network_policies: CollectionTrace::new(),
369 roles: CollectionTrace::new(),
370 role_auth: CollectionTrace::new(),
371 schemas: CollectionTrace::new(),
372 settings: CollectionTrace::new(),
373 source_references: CollectionTrace::new(),
374 system_object_mappings: CollectionTrace::new(),
375 system_configurations: CollectionTrace::new(),
376 system_privileges: CollectionTrace::new(),
377 storage_collection_metadata: CollectionTrace::new(),
378 unfinalized_shards: CollectionTrace::new(),
379 txn_wal_shard: CollectionTrace::new(),
380 }
381 }
382
383 pub fn sort(&mut self) {
384 let Trace {
385 audit_log,
386 clusters,
387 introspection_sources,
388 cluster_replicas,
389 comments,
390 configs,
391 databases,
392 default_privileges,
393 id_allocator,
394 items,
395 network_policies,
396 roles,
397 role_auth,
398 schemas,
399 settings,
400 source_references,
401 system_object_mappings,
402 system_configurations,
403 system_privileges,
404 storage_collection_metadata,
405 unfinalized_shards,
406 txn_wal_shard,
407 } = self;
408 audit_log.sort();
409 clusters.sort();
410 introspection_sources.sort();
411 cluster_replicas.sort();
412 comments.sort();
413 configs.sort();
414 databases.sort();
415 default_privileges.sort();
416 id_allocator.sort();
417 items.sort();
418 network_policies.sort();
419 roles.sort();
420 role_auth.sort();
421 schemas.sort();
422 settings.sort();
423 source_references.sort();
424 system_object_mappings.sort();
425 system_configurations.sort();
426 system_privileges.sort();
427 storage_collection_metadata.sort();
428 unfinalized_shards.sort();
429 txn_wal_shard.sort();
430 }
431}
432
433pub struct DebugCatalogState(pub(crate) UnopenedPersistCatalogState);
434
435impl DebugCatalogState {
436 pub async fn edit<T: Collection>(
438 &mut self,
439 key: T::Key,
440 value: T::Value,
441 ) -> Result<Option<T::Value>, CatalogError>
442 where
443 T::Key: PartialEq + Eq + Debug + Clone,
444 T::Value: Debug + Clone,
445 {
446 self.0.debug_edit::<T>(key, value).await
447 }
448
449 pub async fn delete<T: Collection>(&mut self, key: T::Key) -> Result<(), CatalogError>
451 where
452 T::Key: PartialEq + Eq + Debug + Clone,
453 T::Value: Debug,
454 {
455 self.0.debug_delete::<T>(key).await
456 }
457}