mz_catalog/durable/
debug.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Functionality for manually modifying and displaying the catalog contents. This is helpful for
11//! fixing a corrupt catalog.
12
13use std::fmt::Debug;
14
15use mz_repr::Diff;
16use serde::{Deserialize, Serialize};
17use serde_plain::{derive_display_from_serialize, derive_fromstr_from_deserialize};
18
19use crate::durable::CatalogError;
20use crate::durable::objects::serialization::proto;
21use crate::durable::objects::state_update::StateUpdateKind;
22use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
23
24/// The contents of the catalog are logically separated into separate [`Collection`]s, which
25/// describe the category of data that the content belongs to.
26pub trait Collection: Debug {
27    /// Type used to stores keys for [`Collection`].
28    type Key;
29    /// Type used to stores values for [`Collection`].
30    type Value;
31
32    /// [`CollectionType`] corresponding to [`Collection`].
33    fn collection_type() -> CollectionType;
34
35    /// Extract the [`CollectionTrace`] from a [`Trace`] that corresponds to [`Collection`].
36    fn collection_trace(trace: Trace) -> CollectionTrace<Self>;
37
38    /// Generate a `StateUpdateKind` with `key` and `value` that corresponds to [`Collection`].
39    fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind;
40
41    /// The human-readable name of this collection.
42    fn name() -> String {
43        Self::collection_type().to_string()
44    }
45}
46
47/// The type of a [`Collection`].
48///
49/// See [`Collection`] for more details.
50///
51/// The names of each variant are used to determine the labels of each [`CollectionTrace`] when
52/// dumping a [`Trace`].
53#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)]
54#[serde(rename_all = "snake_case")]
55pub enum CollectionType {
56    AuditLog,
57    ComputeInstance,
58    ComputeIntrospectionSourceIndex,
59    ComputeReplicas,
60    Comments,
61    Config,
62    Database,
63    DefaultPrivileges,
64    IdAlloc,
65    Item,
66    NetworkPolicy,
67    Role,
68    RoleAuth,
69    Schema,
70    Setting,
71    SourceReferences,
72    SystemConfiguration,
73    SystemGidMapping,
74    SystemPrivileges,
75    StorageCollectionMetadata,
76    UnfinalizedShard,
77    TxnWalShard,
78}
79
80derive_display_from_serialize!(CollectionType);
81derive_fromstr_from_deserialize!(CollectionType);
82
83/// Macro to simplify implementing [`Collection`].
84///
85/// The arguments to `collection_impl!` are:
86/// - `$name`, which will be the name of the implementing struct.
87/// - `$key`, the type used to store keys.
88/// - `$value`, the type used to store values.
89/// - `$collection_type`, the [`CollectionType`].
90/// - `$trace_field`, the corresponding field name within a [`Trace`].
91/// - `$update`, the corresponding [`StateUpdateKind`] constructor.
92macro_rules! collection_impl {
93    ({
94    name: $name:ident,
95    key: $key:ty,
96    value: $value:ty,
97    collection_type: $collection_type:expr,
98    trace_field: $trace_field:ident,
99    update: $update:expr,
100}) => {
101        #[derive(Debug, Clone, PartialEq, Eq)]
102        pub struct $name {}
103
104        impl Collection for $name {
105            type Key = $key;
106            type Value = $value;
107
108            fn collection_type() -> CollectionType {
109                $collection_type
110            }
111
112            fn collection_trace(trace: Trace) -> CollectionTrace<Self> {
113                trace.$trace_field
114            }
115
116            fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind {
117                $update(key, value)
118            }
119        }
120    };
121}
122
123collection_impl!({
124    name: AuditLogCollection,
125    key: proto::AuditLogKey,
126    value: (),
127    collection_type: CollectionType::AuditLog,
128    trace_field: audit_log,
129    update: StateUpdateKind::AuditLog,
130});
131collection_impl!({
132    name: ClusterCollection,
133    key: proto::ClusterKey,
134    value: proto::ClusterValue,
135    collection_type: CollectionType::ComputeInstance,
136    trace_field: clusters,
137    update: StateUpdateKind::Cluster,
138});
139collection_impl!({
140    name: ClusterIntrospectionSourceIndexCollection,
141    key: proto::ClusterIntrospectionSourceIndexKey,
142    value: proto::ClusterIntrospectionSourceIndexValue,
143    collection_type: CollectionType::ComputeIntrospectionSourceIndex,
144    trace_field: introspection_sources,
145    update: StateUpdateKind::IntrospectionSourceIndex,
146});
147collection_impl!({
148    name: ClusterReplicaCollection,
149    key: proto::ClusterReplicaKey,
150    value: proto::ClusterReplicaValue,
151    collection_type: CollectionType::ComputeReplicas,
152    trace_field: cluster_replicas,
153    update: StateUpdateKind::ClusterReplica,
154});
155collection_impl!({
156    name: CommentCollection,
157    key: proto::CommentKey,
158    value: proto::CommentValue,
159    collection_type: CollectionType::Comments,
160    trace_field: comments,
161    update: StateUpdateKind::Comment,
162});
163collection_impl!({
164    name: ConfigCollection,
165    key: proto::ConfigKey,
166    value: proto::ConfigValue,
167    collection_type: CollectionType::Config,
168    trace_field: configs,
169    update: StateUpdateKind::Config,
170});
171collection_impl!({
172    name: DatabaseCollection,
173    key: proto::DatabaseKey,
174    value: proto::DatabaseValue,
175    collection_type: CollectionType::Database,
176    trace_field: databases,
177    update: StateUpdateKind::Database,
178});
179collection_impl!({
180    name: DefaultPrivilegeCollection,
181    key: proto::DefaultPrivilegesKey,
182    value: proto::DefaultPrivilegesValue,
183    collection_type: CollectionType::DefaultPrivileges,
184    trace_field: default_privileges,
185    update: StateUpdateKind::DefaultPrivilege,
186});
187collection_impl!({
188    name: IdAllocatorCollection,
189    key: proto::IdAllocKey,
190    value: proto::IdAllocValue,
191    collection_type: CollectionType::IdAlloc,
192    trace_field: id_allocator,
193    update: StateUpdateKind::IdAllocator,
194});
195collection_impl!({
196    name: ItemCollection,
197    key: proto::ItemKey,
198    value: proto::ItemValue,
199    collection_type: CollectionType::Item,
200    trace_field: items,
201    update: StateUpdateKind::Item,
202});
203collection_impl!({
204    name: NetworkPolicyCollection,
205    key: proto::NetworkPolicyKey,
206    value: proto::NetworkPolicyValue,
207    collection_type: CollectionType::NetworkPolicy,
208    trace_field: network_policies,
209    update: StateUpdateKind::NetworkPolicy,
210});
211collection_impl!({
212    name: RoleCollection,
213    key: proto::RoleKey,
214    value: proto::RoleValue,
215    collection_type: CollectionType::Role,
216    trace_field: roles,
217    update: StateUpdateKind::Role,
218});
219collection_impl!({
220    name: RoleAuthCollection,
221    key: proto::RoleAuthKey,
222    value: proto::RoleAuthValue,
223    collection_type: CollectionType::RoleAuth,
224    trace_field: role_auth,
225    update: StateUpdateKind::RoleAuth,
226});
227collection_impl!({
228    name: SchemaCollection,
229    key: proto::SchemaKey,
230    value: proto::SchemaValue,
231    collection_type: CollectionType::Schema,
232    trace_field: schemas,
233    update: StateUpdateKind::Schema,
234});
235collection_impl!({
236    name: SettingCollection,
237    key: proto::SettingKey,
238    value: proto::SettingValue,
239    collection_type: CollectionType::Setting,
240    trace_field: settings,
241    update: StateUpdateKind::Setting,
242});
243collection_impl!({
244    name: SourceReferencesCollection,
245    key: proto::SourceReferencesKey,
246    value: proto::SourceReferencesValue,
247    collection_type: CollectionType::SourceReferences,
248    trace_field: source_references,
249    update: StateUpdateKind::SourceReferences,
250});
251collection_impl!({
252    name: SystemConfigurationCollection,
253    key: proto::ServerConfigurationKey,
254    value: proto::ServerConfigurationValue,
255    collection_type: CollectionType::SystemConfiguration,
256    trace_field: system_configurations,
257    update: StateUpdateKind::SystemConfiguration,
258});
259collection_impl!({
260    name: SystemItemMappingCollection,
261    key: proto::GidMappingKey,
262    value: proto::GidMappingValue,
263    collection_type: CollectionType::SystemGidMapping,
264    trace_field: system_object_mappings,
265    update: StateUpdateKind::SystemObjectMapping,
266});
267collection_impl!({
268    name: SystemPrivilegeCollection,
269    key: proto::SystemPrivilegesKey,
270    value: proto::SystemPrivilegesValue,
271    collection_type: CollectionType::SystemPrivileges,
272    trace_field: system_privileges,
273    update: StateUpdateKind::SystemPrivilege,
274});
275
276collection_impl!({
277    name: StorageCollectionMetadataCollection,
278    key: proto::StorageCollectionMetadataKey,
279    value: proto::StorageCollectionMetadataValue,
280    collection_type: CollectionType::StorageCollectionMetadata,
281    trace_field: storage_collection_metadata,
282    update: StateUpdateKind::StorageCollectionMetadata,
283});
284collection_impl!({
285    name: UnfinalizedShardsCollection,
286    key: proto::UnfinalizedShardKey,
287    value: (),
288    collection_type: CollectionType::UnfinalizedShard,
289    trace_field: unfinalized_shards,
290    update: StateUpdateKind::UnfinalizedShard,
291});
292collection_impl!({
293    name: TxnWalShardCollection,
294    key: (),
295    value: proto::TxnWalShardValue,
296    collection_type: CollectionType::TxnWalShard,
297    trace_field: txn_wal_shard,
298    update: StateUpdateKind::TxnWalShard,
299});
300
301/// A trace of timestamped diffs for a particular [`Collection`].
302///
303/// The timestamps are represented as strings since different implementations use non-compatible
304/// timestamp types.
305#[derive(Debug, Clone, PartialEq, Eq)]
306pub struct CollectionTrace<T: Collection + ?Sized> {
307    pub values: Vec<((T::Key, T::Value), Timestamp, Diff)>,
308}
309
310impl<T: Collection> CollectionTrace<T> {
311    fn new() -> CollectionTrace<T> {
312        CollectionTrace { values: Vec::new() }
313    }
314}
315
316impl<T: Collection> CollectionTrace<T>
317where
318    T: Collection,
319    T::Key: Ord,
320    T::Value: Ord,
321{
322    fn sort(&mut self) {
323        self.values
324            .sort_by(|(x1, ts1, d1), (x2, ts2, d2)| ts1.cmp(ts2).then(d1.cmp(d2)).then(x1.cmp(x2)));
325    }
326}
327
328/// Catalog data structured as timestamped diffs.
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct Trace {
331    pub audit_log: CollectionTrace<AuditLogCollection>,
332    pub clusters: CollectionTrace<ClusterCollection>,
333    pub introspection_sources: CollectionTrace<ClusterIntrospectionSourceIndexCollection>,
334    pub cluster_replicas: CollectionTrace<ClusterReplicaCollection>,
335    pub comments: CollectionTrace<CommentCollection>,
336    pub configs: CollectionTrace<ConfigCollection>,
337    pub databases: CollectionTrace<DatabaseCollection>,
338    pub default_privileges: CollectionTrace<DefaultPrivilegeCollection>,
339    pub id_allocator: CollectionTrace<IdAllocatorCollection>,
340    pub items: CollectionTrace<ItemCollection>,
341    pub network_policies: CollectionTrace<NetworkPolicyCollection>,
342    pub roles: CollectionTrace<RoleCollection>,
343    pub role_auth: CollectionTrace<RoleAuthCollection>,
344    pub schemas: CollectionTrace<SchemaCollection>,
345    pub settings: CollectionTrace<SettingCollection>,
346    pub source_references: CollectionTrace<SourceReferencesCollection>,
347    pub system_object_mappings: CollectionTrace<SystemItemMappingCollection>,
348    pub system_configurations: CollectionTrace<SystemConfigurationCollection>,
349    pub system_privileges: CollectionTrace<SystemPrivilegeCollection>,
350    pub storage_collection_metadata: CollectionTrace<StorageCollectionMetadataCollection>,
351    pub unfinalized_shards: CollectionTrace<UnfinalizedShardsCollection>,
352    pub txn_wal_shard: CollectionTrace<TxnWalShardCollection>,
353}
354
355impl Trace {
356    pub(crate) fn new() -> Trace {
357        Trace {
358            audit_log: CollectionTrace::new(),
359            clusters: CollectionTrace::new(),
360            introspection_sources: CollectionTrace::new(),
361            cluster_replicas: CollectionTrace::new(),
362            comments: CollectionTrace::new(),
363            configs: CollectionTrace::new(),
364            databases: CollectionTrace::new(),
365            default_privileges: CollectionTrace::new(),
366            id_allocator: CollectionTrace::new(),
367            items: CollectionTrace::new(),
368            network_policies: CollectionTrace::new(),
369            roles: CollectionTrace::new(),
370            role_auth: CollectionTrace::new(),
371            schemas: CollectionTrace::new(),
372            settings: CollectionTrace::new(),
373            source_references: CollectionTrace::new(),
374            system_object_mappings: CollectionTrace::new(),
375            system_configurations: CollectionTrace::new(),
376            system_privileges: CollectionTrace::new(),
377            storage_collection_metadata: CollectionTrace::new(),
378            unfinalized_shards: CollectionTrace::new(),
379            txn_wal_shard: CollectionTrace::new(),
380        }
381    }
382
383    pub fn sort(&mut self) {
384        let Trace {
385            audit_log,
386            clusters,
387            introspection_sources,
388            cluster_replicas,
389            comments,
390            configs,
391            databases,
392            default_privileges,
393            id_allocator,
394            items,
395            network_policies,
396            roles,
397            role_auth,
398            schemas,
399            settings,
400            source_references,
401            system_object_mappings,
402            system_configurations,
403            system_privileges,
404            storage_collection_metadata,
405            unfinalized_shards,
406            txn_wal_shard,
407        } = self;
408        audit_log.sort();
409        clusters.sort();
410        introspection_sources.sort();
411        cluster_replicas.sort();
412        comments.sort();
413        configs.sort();
414        databases.sort();
415        default_privileges.sort();
416        id_allocator.sort();
417        items.sort();
418        network_policies.sort();
419        roles.sort();
420        role_auth.sort();
421        schemas.sort();
422        settings.sort();
423        source_references.sort();
424        system_object_mappings.sort();
425        system_configurations.sort();
426        system_privileges.sort();
427        storage_collection_metadata.sort();
428        unfinalized_shards.sort();
429        txn_wal_shard.sort();
430    }
431}
432
433pub struct DebugCatalogState(pub(crate) UnopenedPersistCatalogState);
434
435impl DebugCatalogState {
436    /// Manually update value of `key` in collection `T` to `value`.
437    pub async fn edit<T: Collection>(
438        &mut self,
439        key: T::Key,
440        value: T::Value,
441    ) -> Result<Option<T::Value>, CatalogError>
442    where
443        T::Key: PartialEq + Eq + Debug + Clone,
444        T::Value: Debug + Clone,
445    {
446        self.0.debug_edit::<T>(key, value).await
447    }
448
449    /// Manually delete `key` from collection `T`.
450    pub async fn delete<T: Collection>(&mut self, key: T::Key) -> Result<(), CatalogError>
451    where
452        T::Key: PartialEq + Eq + Debug + Clone,
453        T::Value: Debug,
454    {
455        self.0.debug_delete::<T>(key).await
456    }
457}