Skip to main content

mz_catalog/durable/
debug.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Functionality for manually modifying and displaying the catalog contents. This is helpful for
11//! fixing a corrupt catalog.
12
13use std::fmt::Debug;
14
15use mz_repr::Diff;
16use serde::{Deserialize, Serialize};
17use serde_plain::{derive_display_from_serialize, derive_fromstr_from_deserialize};
18
19use crate::durable::CatalogError;
20use crate::durable::objects::serialization::proto;
21use crate::durable::objects::state_update::StateUpdateKind;
22use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
23
24/// The contents of the catalog are logically separated into separate [`Collection`]s, which
25/// describe the category of data that the content belongs to.
26pub trait Collection: Debug {
27    /// Type used to stores keys for [`Collection`].
28    type Key;
29    /// Type used to stores values for [`Collection`].
30    type Value;
31
32    /// [`CollectionType`] corresponding to [`Collection`].
33    fn collection_type() -> CollectionType;
34
35    /// Extract the [`CollectionTrace`] from a [`Trace`] that corresponds to [`Collection`].
36    fn collection_trace(trace: Trace) -> CollectionTrace<Self>;
37
38    /// Generate a `StateUpdateKind` with `key` and `value` that corresponds to [`Collection`].
39    fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind;
40
41    /// The human-readable name of this collection.
42    fn name() -> String {
43        Self::collection_type().to_string()
44    }
45}
46
47/// The type of a [`Collection`].
48///
49/// See [`Collection`] for more details.
50///
51/// The names of each variant are used to determine the labels of each [`CollectionTrace`] when
52/// dumping a [`Trace`].
53#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)]
54#[serde(rename_all = "snake_case")]
55pub enum CollectionType {
56    AuditLog,
57    ComputeInstance,
58    ComputeIntrospectionSourceIndex,
59    ComputeReplicas,
60    Comments,
61    Config,
62    Database,
63    DefaultPrivileges,
64    IdAlloc,
65    Item,
66    NetworkPolicy,
67    Role,
68    RoleAuth,
69    Schema,
70    Setting,
71    SourceReferences,
72    SystemConfiguration,
73    ClusterSystemConfiguration,
74    ReplicaSystemConfiguration,
75    SystemGidMapping,
76    SystemPrivileges,
77    StorageCollectionMetadata,
78    UnfinalizedShard,
79    TxnWalShard,
80}
81
82derive_display_from_serialize!(CollectionType);
83derive_fromstr_from_deserialize!(CollectionType);
84
85/// Macro to simplify implementing [`Collection`].
86///
87/// The arguments to `collection_impl!` are:
88/// - `$name`, which will be the name of the implementing struct.
89/// - `$key`, the type used to store keys.
90/// - `$value`, the type used to store values.
91/// - `$collection_type`, the [`CollectionType`].
92/// - `$trace_field`, the corresponding field name within a [`Trace`].
93/// - `$update`, the corresponding [`StateUpdateKind`] constructor.
94macro_rules! collection_impl {
95    ({
96    name: $name:ident,
97    key: $key:ty,
98    value: $value:ty,
99    collection_type: $collection_type:expr,
100    trace_field: $trace_field:ident,
101    update: $update:expr,
102}) => {
103        #[derive(Debug, Clone, PartialEq, Eq)]
104        pub struct $name {}
105
106        impl Collection for $name {
107            type Key = $key;
108            type Value = $value;
109
110            fn collection_type() -> CollectionType {
111                $collection_type
112            }
113
114            fn collection_trace(trace: Trace) -> CollectionTrace<Self> {
115                trace.$trace_field
116            }
117
118            fn update(key: Self::Key, value: Self::Value) -> StateUpdateKind {
119                $update(key, value)
120            }
121        }
122    };
123}
124
125collection_impl!({
126    name: AuditLogCollection,
127    key: proto::AuditLogKey,
128    value: (),
129    collection_type: CollectionType::AuditLog,
130    trace_field: audit_log,
131    update: StateUpdateKind::AuditLog,
132});
133collection_impl!({
134    name: ClusterCollection,
135    key: proto::ClusterKey,
136    value: proto::ClusterValue,
137    collection_type: CollectionType::ComputeInstance,
138    trace_field: clusters,
139    update: StateUpdateKind::Cluster,
140});
141collection_impl!({
142    name: ClusterIntrospectionSourceIndexCollection,
143    key: proto::ClusterIntrospectionSourceIndexKey,
144    value: proto::ClusterIntrospectionSourceIndexValue,
145    collection_type: CollectionType::ComputeIntrospectionSourceIndex,
146    trace_field: introspection_sources,
147    update: StateUpdateKind::IntrospectionSourceIndex,
148});
149collection_impl!({
150    name: ClusterReplicaCollection,
151    key: proto::ClusterReplicaKey,
152    value: proto::ClusterReplicaValue,
153    collection_type: CollectionType::ComputeReplicas,
154    trace_field: cluster_replicas,
155    update: StateUpdateKind::ClusterReplica,
156});
157collection_impl!({
158    name: CommentCollection,
159    key: proto::CommentKey,
160    value: proto::CommentValue,
161    collection_type: CollectionType::Comments,
162    trace_field: comments,
163    update: StateUpdateKind::Comment,
164});
165collection_impl!({
166    name: ConfigCollection,
167    key: proto::ConfigKey,
168    value: proto::ConfigValue,
169    collection_type: CollectionType::Config,
170    trace_field: configs,
171    update: StateUpdateKind::Config,
172});
173collection_impl!({
174    name: DatabaseCollection,
175    key: proto::DatabaseKey,
176    value: proto::DatabaseValue,
177    collection_type: CollectionType::Database,
178    trace_field: databases,
179    update: StateUpdateKind::Database,
180});
181collection_impl!({
182    name: DefaultPrivilegeCollection,
183    key: proto::DefaultPrivilegesKey,
184    value: proto::DefaultPrivilegesValue,
185    collection_type: CollectionType::DefaultPrivileges,
186    trace_field: default_privileges,
187    update: StateUpdateKind::DefaultPrivilege,
188});
189collection_impl!({
190    name: IdAllocatorCollection,
191    key: proto::IdAllocKey,
192    value: proto::IdAllocValue,
193    collection_type: CollectionType::IdAlloc,
194    trace_field: id_allocator,
195    update: StateUpdateKind::IdAllocator,
196});
197collection_impl!({
198    name: ItemCollection,
199    key: proto::ItemKey,
200    value: proto::ItemValue,
201    collection_type: CollectionType::Item,
202    trace_field: items,
203    update: StateUpdateKind::Item,
204});
205collection_impl!({
206    name: NetworkPolicyCollection,
207    key: proto::NetworkPolicyKey,
208    value: proto::NetworkPolicyValue,
209    collection_type: CollectionType::NetworkPolicy,
210    trace_field: network_policies,
211    update: StateUpdateKind::NetworkPolicy,
212});
213collection_impl!({
214    name: RoleCollection,
215    key: proto::RoleKey,
216    value: proto::RoleValue,
217    collection_type: CollectionType::Role,
218    trace_field: roles,
219    update: StateUpdateKind::Role,
220});
221collection_impl!({
222    name: RoleAuthCollection,
223    key: proto::RoleAuthKey,
224    value: proto::RoleAuthValue,
225    collection_type: CollectionType::RoleAuth,
226    trace_field: role_auth,
227    update: StateUpdateKind::RoleAuth,
228});
229collection_impl!({
230    name: SchemaCollection,
231    key: proto::SchemaKey,
232    value: proto::SchemaValue,
233    collection_type: CollectionType::Schema,
234    trace_field: schemas,
235    update: StateUpdateKind::Schema,
236});
237collection_impl!({
238    name: SettingCollection,
239    key: proto::SettingKey,
240    value: proto::SettingValue,
241    collection_type: CollectionType::Setting,
242    trace_field: settings,
243    update: StateUpdateKind::Setting,
244});
245collection_impl!({
246    name: SourceReferencesCollection,
247    key: proto::SourceReferencesKey,
248    value: proto::SourceReferencesValue,
249    collection_type: CollectionType::SourceReferences,
250    trace_field: source_references,
251    update: StateUpdateKind::SourceReferences,
252});
253collection_impl!({
254    name: SystemConfigurationCollection,
255    key: proto::ServerConfigurationKey,
256    value: proto::ServerConfigurationValue,
257    collection_type: CollectionType::SystemConfiguration,
258    trace_field: system_configurations,
259    update: StateUpdateKind::SystemConfiguration,
260});
261collection_impl!({
262    name: ClusterSystemConfigurationCollection,
263    key: proto::ClusterSystemConfigurationKey,
264    value: proto::ClusterSystemConfigurationValue,
265    collection_type: CollectionType::ClusterSystemConfiguration,
266    trace_field: cluster_system_configurations,
267    update: StateUpdateKind::ClusterSystemConfiguration,
268});
269collection_impl!({
270    name: ReplicaSystemConfigurationCollection,
271    key: proto::ReplicaSystemConfigurationKey,
272    value: proto::ReplicaSystemConfigurationValue,
273    collection_type: CollectionType::ReplicaSystemConfiguration,
274    trace_field: replica_system_configurations,
275    update: StateUpdateKind::ReplicaSystemConfiguration,
276});
277collection_impl!({
278    name: SystemItemMappingCollection,
279    key: proto::GidMappingKey,
280    value: proto::GidMappingValue,
281    collection_type: CollectionType::SystemGidMapping,
282    trace_field: system_object_mappings,
283    update: StateUpdateKind::SystemObjectMapping,
284});
285collection_impl!({
286    name: SystemPrivilegeCollection,
287    key: proto::SystemPrivilegesKey,
288    value: proto::SystemPrivilegesValue,
289    collection_type: CollectionType::SystemPrivileges,
290    trace_field: system_privileges,
291    update: StateUpdateKind::SystemPrivilege,
292});
293
294collection_impl!({
295    name: StorageCollectionMetadataCollection,
296    key: proto::StorageCollectionMetadataKey,
297    value: proto::StorageCollectionMetadataValue,
298    collection_type: CollectionType::StorageCollectionMetadata,
299    trace_field: storage_collection_metadata,
300    update: StateUpdateKind::StorageCollectionMetadata,
301});
302collection_impl!({
303    name: UnfinalizedShardsCollection,
304    key: proto::UnfinalizedShardKey,
305    value: (),
306    collection_type: CollectionType::UnfinalizedShard,
307    trace_field: unfinalized_shards,
308    update: StateUpdateKind::UnfinalizedShard,
309});
310collection_impl!({
311    name: TxnWalShardCollection,
312    key: (),
313    value: proto::TxnWalShardValue,
314    collection_type: CollectionType::TxnWalShard,
315    trace_field: txn_wal_shard,
316    update: StateUpdateKind::TxnWalShard,
317});
318
319/// A trace of timestamped diffs for a particular [`Collection`].
320///
321/// The timestamps are represented as strings since different implementations use non-compatible
322/// timestamp types.
323#[derive(Debug, Clone, PartialEq, Eq)]
324pub struct CollectionTrace<T: Collection + ?Sized> {
325    pub values: Vec<((T::Key, T::Value), Timestamp, Diff)>,
326}
327
328impl<T: Collection> CollectionTrace<T> {
329    fn new() -> CollectionTrace<T> {
330        CollectionTrace { values: Vec::new() }
331    }
332}
333
334impl<T: Collection> CollectionTrace<T>
335where
336    T: Collection,
337    T::Key: Ord,
338    T::Value: Ord,
339{
340    fn sort(&mut self) {
341        self.values
342            .sort_by(|(x1, ts1, d1), (x2, ts2, d2)| ts1.cmp(ts2).then(d1.cmp(d2)).then(x1.cmp(x2)));
343    }
344}
345
346/// Catalog data structured as timestamped diffs.
347#[derive(Debug, Clone, PartialEq, Eq)]
348pub struct Trace {
349    pub audit_log: CollectionTrace<AuditLogCollection>,
350    pub clusters: CollectionTrace<ClusterCollection>,
351    pub introspection_sources: CollectionTrace<ClusterIntrospectionSourceIndexCollection>,
352    pub cluster_replicas: CollectionTrace<ClusterReplicaCollection>,
353    pub comments: CollectionTrace<CommentCollection>,
354    pub configs: CollectionTrace<ConfigCollection>,
355    pub databases: CollectionTrace<DatabaseCollection>,
356    pub default_privileges: CollectionTrace<DefaultPrivilegeCollection>,
357    pub id_allocator: CollectionTrace<IdAllocatorCollection>,
358    pub items: CollectionTrace<ItemCollection>,
359    pub network_policies: CollectionTrace<NetworkPolicyCollection>,
360    pub roles: CollectionTrace<RoleCollection>,
361    pub role_auth: CollectionTrace<RoleAuthCollection>,
362    pub schemas: CollectionTrace<SchemaCollection>,
363    pub settings: CollectionTrace<SettingCollection>,
364    pub source_references: CollectionTrace<SourceReferencesCollection>,
365    pub system_object_mappings: CollectionTrace<SystemItemMappingCollection>,
366    pub system_configurations: CollectionTrace<SystemConfigurationCollection>,
367    pub cluster_system_configurations: CollectionTrace<ClusterSystemConfigurationCollection>,
368    pub replica_system_configurations: CollectionTrace<ReplicaSystemConfigurationCollection>,
369    pub system_privileges: CollectionTrace<SystemPrivilegeCollection>,
370    pub storage_collection_metadata: CollectionTrace<StorageCollectionMetadataCollection>,
371    pub unfinalized_shards: CollectionTrace<UnfinalizedShardsCollection>,
372    pub txn_wal_shard: CollectionTrace<TxnWalShardCollection>,
373}
374
375impl Trace {
376    pub(crate) fn new() -> Trace {
377        Trace {
378            audit_log: CollectionTrace::new(),
379            clusters: CollectionTrace::new(),
380            introspection_sources: CollectionTrace::new(),
381            cluster_replicas: CollectionTrace::new(),
382            comments: CollectionTrace::new(),
383            configs: CollectionTrace::new(),
384            databases: CollectionTrace::new(),
385            default_privileges: CollectionTrace::new(),
386            id_allocator: CollectionTrace::new(),
387            items: CollectionTrace::new(),
388            network_policies: CollectionTrace::new(),
389            roles: CollectionTrace::new(),
390            role_auth: CollectionTrace::new(),
391            schemas: CollectionTrace::new(),
392            settings: CollectionTrace::new(),
393            source_references: CollectionTrace::new(),
394            system_object_mappings: CollectionTrace::new(),
395            system_configurations: CollectionTrace::new(),
396            cluster_system_configurations: CollectionTrace::new(),
397            replica_system_configurations: CollectionTrace::new(),
398            system_privileges: CollectionTrace::new(),
399            storage_collection_metadata: CollectionTrace::new(),
400            unfinalized_shards: CollectionTrace::new(),
401            txn_wal_shard: CollectionTrace::new(),
402        }
403    }
404
405    pub fn sort(&mut self) {
406        let Trace {
407            audit_log,
408            clusters,
409            introspection_sources,
410            cluster_replicas,
411            comments,
412            configs,
413            databases,
414            default_privileges,
415            id_allocator,
416            items,
417            network_policies,
418            roles,
419            role_auth,
420            schemas,
421            settings,
422            source_references,
423            system_object_mappings,
424            system_configurations,
425            cluster_system_configurations,
426            replica_system_configurations,
427            system_privileges,
428            storage_collection_metadata,
429            unfinalized_shards,
430            txn_wal_shard,
431        } = self;
432        audit_log.sort();
433        clusters.sort();
434        introspection_sources.sort();
435        cluster_replicas.sort();
436        comments.sort();
437        configs.sort();
438        databases.sort();
439        default_privileges.sort();
440        id_allocator.sort();
441        items.sort();
442        network_policies.sort();
443        roles.sort();
444        role_auth.sort();
445        schemas.sort();
446        settings.sort();
447        source_references.sort();
448        system_object_mappings.sort();
449        system_configurations.sort();
450        cluster_system_configurations.sort();
451        replica_system_configurations.sort();
452        system_privileges.sort();
453        storage_collection_metadata.sort();
454        unfinalized_shards.sort();
455        txn_wal_shard.sort();
456    }
457}
458
459pub struct DebugCatalogState(pub(crate) UnopenedPersistCatalogState);
460
461impl DebugCatalogState {
462    /// Manually update value of `key` in collection `T` to `value`.
463    pub async fn edit<T: Collection>(
464        &mut self,
465        key: T::Key,
466        value: T::Value,
467    ) -> Result<Option<T::Value>, CatalogError>
468    where
469        T::Key: PartialEq + Eq + Debug + Clone,
470        T::Value: Debug + Clone,
471    {
472        self.0.debug_edit::<T>(key, value).await
473    }
474
475    /// Manually delete `key` from collection `T`.
476    pub async fn delete<T: Collection>(&mut self, key: T::Key) -> Result<(), CatalogError>
477    where
478        T::Key: PartialEq + Eq + Debug + Clone,
479        T::Value: Debug,
480    {
481        self.0.debug_delete::<T>(key).await
482    }
483}