Skip to main content

mz_catalog/durable/objects/
state_update.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains various representations of a single catalog update and the logic necessary
11//! for converting between representations.
12//!
13//! The general lifecycle of a single update when read from persist is as follows:
14//!
15//!   1. The update is stored in persist as a [`PersistStateUpdate`].
16//!   2. After being read from persist the update is immediately converted into a
17//!      [`StateUpdate<StateUpdateKindJson>`], which models the update as a JSON.
18//!   3. The [`StateUpdateKindJson`] is converted into a protobuf message,
19//!      [`proto::StateUpdateKind`].
20//!   4. The update is then converted into a [`StateUpdate<StateUpdateKind>`], which is a strongly
21//!      typed Rust object.
22//!   5. Finally, the update is converted into an [`Option<memory::objects::StateUpdate>`], and
23//!      `Some` variants are given to the in-memory catalog. The in-memory catalog is only
24//!      interested in a subset of catalog updates which is why the [`Option`] is necessary.
25//!
26//! TLDR: [`PersistStateUpdate`] -> [`StateUpdate<StateUpdateKindJson>`] ->
27//!       [`proto::StateUpdateKind`] -> [`StateUpdate<StateUpdateKind>`] ->
28//!       [`Option<memory::objects::StateUpdate>`]
29//!
30//! The process of writing a catalog update to persist is the exact opposite.
31//!
32//! When running catalog protobuf upgrades/migrations we may need to take a detour and convert the
33//! [`StateUpdateKindJson`] to some `proto::object_v{x}::StateUpdateKind` before applying specific
34//! upgrades to get us to a valid [`proto::StateUpdateKind`].
35
36use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46#[cfg(test)]
47use proptest_derive::Arbitrary;
48use tracing::error;
49
50use crate::durable::debug::CollectionType;
51use crate::durable::objects::serialization::proto;
52use crate::durable::objects::{DurableType, FenceToken};
53use crate::durable::persist::Timestamp;
54use crate::durable::transaction::TransactionBatch;
55use crate::durable::{DurableCatalogError, Epoch};
56use crate::memory;
57
58/// Trait for objects that can be converted to/from a [`StateUpdateKindJson`].
59pub trait IntoStateUpdateKindJson:
60    Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
61{
62    type Error: Debug;
63
64    fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
65}
66impl<
67    T: Into<StateUpdateKindJson>
68        + TryFrom<StateUpdateKindJson>
69        + PartialEq
70        + Eq
71        + PartialOrd
72        + Ord
73        + Debug
74        + Clone,
75> IntoStateUpdateKindJson for T
76where
77    T::Error: Debug,
78{
79    type Error = T::Error;
80
81    fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
82        <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
83    }
84}
85
86/// Trait for objects that can be converted to/from a [`StateUpdateKind`].
87pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
88    type Error: Debug;
89
90    fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
91}
92impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
93where
94    <T as TryInto<StateUpdateKind>>::Error: Debug,
95{
96    type Error = <T as TryInto<StateUpdateKind>>::Error;
97
98    fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
99        <T as TryInto<StateUpdateKind>>::try_into(self)
100    }
101}
102
103/// A single update to the catalog state.
104#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
105pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
106    /// They kind and contents of the state update.
107    pub kind: T,
108    /// The timestamp at which the update occurred.
109    pub ts: Timestamp,
110    /// Record count difference for the update.
111    pub diff: Diff,
112}
113
114impl StateUpdate {
115    /// Convert a [`TransactionBatch`] to a list of [`StateUpdate`]s at timestamp `ts`.
116    pub(crate) fn from_txn_batch_ts(
117        txn_batch: TransactionBatch,
118        ts: Timestamp,
119    ) -> impl Iterator<Item = StateUpdate> {
120        Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
121    }
122
123    /// Convert a [`TransactionBatch`] to a list of [`StateUpdate`]s and [`Diff`]s.
124    pub(crate) fn from_txn_batch(
125        txn_batch: TransactionBatch,
126    ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
127        fn from_batch<K, V>(
128            batch: Vec<(K, V, Diff)>,
129            kind: fn(K, V) -> StateUpdateKind,
130        ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
131            batch
132                .into_iter()
133                .map(move |(k, v, diff)| (kind(k, v), diff))
134        }
135        let TransactionBatch {
136            databases,
137            schemas,
138            items,
139            comments,
140            roles,
141            role_auth,
142            clusters,
143            cluster_replicas,
144            network_policies,
145            introspection_sources,
146            id_allocator,
147            configs,
148            settings,
149            source_references,
150            system_gid_mapping,
151            system_configurations,
152            cluster_system_configurations,
153            replica_system_configurations,
154            default_privileges,
155            system_privileges,
156            storage_collection_metadata,
157            unfinalized_shards,
158            txn_wal_shard,
159            audit_log_updates,
160            upper: _,
161        } = txn_batch;
162        let databases = from_batch(databases, StateUpdateKind::Database);
163        let schemas = from_batch(schemas, StateUpdateKind::Schema);
164        let items = from_batch(items, StateUpdateKind::Item);
165        let comments = from_batch(comments, StateUpdateKind::Comment);
166        let roles = from_batch(roles, StateUpdateKind::Role);
167        let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
168        let clusters = from_batch(clusters, StateUpdateKind::Cluster);
169        let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
170        let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
171        let introspection_sources = from_batch(
172            introspection_sources,
173            StateUpdateKind::IntrospectionSourceIndex,
174        );
175        let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
176        let configs = from_batch(configs, StateUpdateKind::Config);
177        let settings = from_batch(settings, StateUpdateKind::Setting);
178        let system_object_mappings =
179            from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
180        let system_configurations =
181            from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
182        let cluster_system_configurations = from_batch(
183            cluster_system_configurations,
184            StateUpdateKind::ClusterSystemConfiguration,
185        );
186        let replica_system_configurations = from_batch(
187            replica_system_configurations,
188            StateUpdateKind::ReplicaSystemConfiguration,
189        );
190        let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
191        let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
192        let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
193        let storage_collection_metadata = from_batch(
194            storage_collection_metadata,
195            StateUpdateKind::StorageCollectionMetadata,
196        );
197        let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
198        let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
199        let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
200
201        databases
202            .chain(schemas)
203            .chain(items)
204            .chain(comments)
205            .chain(roles)
206            .chain(role_auth)
207            .chain(clusters)
208            .chain(cluster_replicas)
209            .chain(network_policies)
210            .chain(introspection_sources)
211            .chain(id_allocators)
212            .chain(configs)
213            .chain(settings)
214            .chain(source_references)
215            .chain(system_object_mappings)
216            .chain(system_configurations)
217            .chain(cluster_system_configurations)
218            .chain(replica_system_configurations)
219            .chain(default_privileges)
220            .chain(system_privileges)
221            .chain(storage_collection_metadata)
222            .chain(unfinalized_shards)
223            .chain(txn_wal_shard)
224            .chain(audit_logs)
225    }
226}
227
228/// The contents of a single state update.
229///
230/// The entire catalog is serialized as bytes and saved in a single persist shard. We use this
231/// enum to determine what collection something in the catalog belongs to.
232#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
233#[cfg_attr(test, derive(Arbitrary))]
234pub enum StateUpdateKind {
235    AuditLog(proto::AuditLogKey, ()),
236    Cluster(proto::ClusterKey, proto::ClusterValue),
237    ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
238    Comment(proto::CommentKey, proto::CommentValue),
239    Config(proto::ConfigKey, proto::ConfigValue),
240    Database(proto::DatabaseKey, proto::DatabaseValue),
241    DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
242    FenceToken(FenceToken),
243    IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
244    IntrospectionSourceIndex(
245        proto::ClusterIntrospectionSourceIndexKey,
246        proto::ClusterIntrospectionSourceIndexValue,
247    ),
248    Item(proto::ItemKey, proto::ItemValue),
249    NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
250    Role(proto::RoleKey, proto::RoleValue),
251    RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
252    Schema(proto::SchemaKey, proto::SchemaValue),
253    Setting(proto::SettingKey, proto::SettingValue),
254    SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
255    SystemConfiguration(
256        proto::ServerConfigurationKey,
257        proto::ServerConfigurationValue,
258    ),
259    ClusterSystemConfiguration(
260        proto::ClusterSystemConfigurationKey,
261        proto::ClusterSystemConfigurationValue,
262    ),
263    ReplicaSystemConfiguration(
264        proto::ReplicaSystemConfigurationKey,
265        proto::ReplicaSystemConfigurationValue,
266    ),
267    SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
268    SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
269    StorageCollectionMetadata(
270        proto::StorageCollectionMetadataKey,
271        proto::StorageCollectionMetadataValue,
272    ),
273    UnfinalizedShard(proto::UnfinalizedShardKey, ()),
274    TxnWalShard((), proto::TxnWalShardValue),
275}
276
277impl StateUpdateKind {
278    pub(crate) fn collection_type(&self) -> Option<CollectionType> {
279        match self {
280            StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
281            StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
282            StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
283            StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
284            StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
285            StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
286            StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
287            StateUpdateKind::FenceToken(_) => None,
288            StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
289            StateUpdateKind::IntrospectionSourceIndex(_, _) => {
290                Some(CollectionType::ComputeIntrospectionSourceIndex)
291            }
292            StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
293            StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
294            StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
295            StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
296            StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
297            StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
298            StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
299            StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
300            StateUpdateKind::ClusterSystemConfiguration(_, _) => {
301                Some(CollectionType::ClusterSystemConfiguration)
302            }
303            StateUpdateKind::ReplicaSystemConfiguration(_, _) => {
304                Some(CollectionType::ReplicaSystemConfiguration)
305            }
306            StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
307            StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
308            StateUpdateKind::StorageCollectionMetadata(_, _) => {
309                Some(CollectionType::StorageCollectionMetadata)
310            }
311            StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
312            StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
313        }
314    }
315}
316
317/// Version of [`StateUpdateKind`] to allow reading/writing raw json from/to persist.
318#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
319pub struct StateUpdateKindJson(Jsonb);
320
321impl StateUpdateKindJson {
322    pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
323        let serde_value = serde_json::to_value(s).expect("valid json");
324        let row = Jsonb::from_serde_json(serde_value).expect("valid json");
325        StateUpdateKindJson(row)
326    }
327
328    pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
329        self.try_to_serde().expect("jsonb should roundtrip")
330    }
331
332    pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
333        &self,
334    ) -> Result<D, serde_json::error::Error> {
335        let serde_value = self.0.as_ref().to_serde_json();
336        serde_json::from_value::<D>(serde_value)
337    }
338
339    fn kind(&self) -> &str {
340        let row = self.0.row();
341        let mut iter = row.unpack_first().unwrap_map().iter();
342        let datum = iter
343            .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
344            .expect("kind field must exist");
345        datum.unwrap_str()
346    }
347
348    pub(crate) fn audit_log_id(&self) -> u64 {
349        assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
350        let row = self.0.row();
351        let mut iter = row.unpack_first().unwrap_map().iter();
352        let key = iter
353            .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
354            .expect("key field must exist")
355            .unwrap_map();
356        let event = key
357            .iter()
358            .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
359            .expect("event field must exist")
360            .unwrap_map();
361        let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
362        match event_version {
363            "V1" => {
364                let versioned_map = versioned_datum.unwrap_map();
365                let id = versioned_map
366                    .iter()
367                    .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
368                    .expect("event field must exist")
369                    .unwrap_numeric();
370                let mut cx = Numeric::context();
371                cx.try_into_u64(id.into_inner()).expect("invalid id")
372            }
373            version => unimplemented!("unsupported event version: {version}"),
374        }
375    }
376
377    /// Returns true if this is an update kind that is always deserializable, even before migrations. Otherwise, returns false.
378    pub(crate) fn is_always_deserializable(&self) -> bool {
379        // Construct some fake update kinds so we can extract exactly what the kind field will
380        // serialize as.
381        static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
382            [
383                StateUpdateKind::FenceToken(FenceToken {
384                    deploy_generation: 1,
385                    epoch: Epoch::new(1).expect("non-zero"),
386                }),
387                StateUpdateKind::Config(
388                    proto::ConfigKey { key: String::new() },
389                    proto::ConfigValue { value: 1 },
390                ),
391                StateUpdateKind::Setting(
392                    proto::SettingKey {
393                        name: String::new(),
394                    },
395                    proto::SettingValue {
396                        value: String::new(),
397                    },
398                ),
399                StateUpdateKind::AuditLog(
400                    proto::AuditLogKey {
401                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
402                            id: 1,
403                            event_type: proto::audit_log_event_v1::EventType::Create,
404                            object_type: proto::audit_log_event_v1::ObjectType::Cluster,
405                            user: None,
406                            occurred_at: proto::EpochMillis { millis: 1 },
407                            details: proto::audit_log_event_v1::Details::ResetAllV1(
408                                proto::Empty {},
409                            ),
410                        }),
411                    },
412                    (),
413                ),
414            ]
415            .into_iter()
416            .map(|kind| {
417                let json_kind: StateUpdateKindJson = kind.into();
418                json_kind.kind().to_string()
419            })
420            .collect()
421        });
422        DESERIALIZABLE_KINDS.contains(self.kind())
423    }
424
425    /// Returns true if this is an audit log update. Otherwise, returns false.
426    pub(crate) fn is_audit_log(&self) -> bool {
427        // Construct a fake audit log so we can extract exactly what the kind field will serialize
428        // as.
429        static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
430            let audit_log = StateUpdateKind::AuditLog(
431                proto::AuditLogKey {
432                    event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
433                        id: 1,
434                        event_type: proto::audit_log_event_v1::EventType::Create,
435                        object_type: proto::audit_log_event_v1::ObjectType::Cluster,
436                        user: None,
437                        occurred_at: proto::EpochMillis { millis: 1 },
438                        details: proto::audit_log_event_v1::Details::ResetAllV1(proto::Empty {}),
439                    }),
440                },
441                (),
442            );
443            let json_kind: StateUpdateKindJson = audit_log.into();
444            json_kind.kind().to_string()
445        });
446        &*AUDIT_LOG_KIND == self.kind()
447    }
448}
449
450/// Version of [`StateUpdateKind`] that is stored directly in persist.
451type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff);
452
453impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
454    type Error = DurableCatalogError;
455
456    fn try_from(
457        StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
458    ) -> Result<Self, Self::Error> {
459        let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
460        let update = kind.map(|kind| memory::objects::StateUpdate {
461            kind,
462            ts: ts.clone(),
463            diff: diff.clone().try_into().expect("invalid diff"),
464        });
465        Ok(update)
466    }
467}
468
469impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
470    type Error = DurableCatalogError;
471
472    fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
473        fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
474        where
475            PK: ProtoType<T::Key> + Clone,
476            PV: ProtoType<T::Value> + Clone,
477            T: DurableType,
478        {
479            let key = key.clone().into_rust()?;
480            let value = value.clone().into_rust()?;
481            Ok(T::from_key_value(key, value))
482        }
483
484        Ok(match kind {
485            StateUpdateKind::AuditLog(key, value) => {
486                let audit_log = into_durable(key, value)?;
487                Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
488            }
489            StateUpdateKind::Cluster(key, value) => {
490                let cluster = into_durable(key, value)?;
491                Some(memory::objects::StateUpdateKind::Cluster(cluster))
492            }
493            StateUpdateKind::ClusterReplica(key, value) => {
494                let cluster_replica = into_durable(key, value)?;
495                Some(memory::objects::StateUpdateKind::ClusterReplica(
496                    cluster_replica,
497                ))
498            }
499            StateUpdateKind::Comment(key, value) => {
500                let comment = into_durable(key, value)?;
501                Some(memory::objects::StateUpdateKind::Comment(comment))
502            }
503            StateUpdateKind::Database(key, value) => {
504                let database = into_durable(key, value)?;
505                Some(memory::objects::StateUpdateKind::Database(database))
506            }
507            StateUpdateKind::DefaultPrivilege(key, value) => {
508                let default_privilege = into_durable(key, value)?;
509                Some(memory::objects::StateUpdateKind::DefaultPrivilege(
510                    default_privilege,
511                ))
512            }
513            StateUpdateKind::Item(key, value) => {
514                let item = into_durable(key, value)?;
515                Some(memory::objects::StateUpdateKind::Item(item))
516            }
517            StateUpdateKind::IntrospectionSourceIndex(key, value) => {
518                let introspection_source_index = into_durable(key, value)?;
519                Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
520                    introspection_source_index,
521                ))
522            }
523            StateUpdateKind::NetworkPolicy(key, value) => {
524                let policy = into_durable(key, value)?;
525                Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
526            }
527            StateUpdateKind::Role(key, value) => {
528                let role = into_durable(key, value)?;
529                Some(memory::objects::StateUpdateKind::Role(role))
530            }
531            StateUpdateKind::RoleAuth(key, value) => {
532                let role_auth = into_durable(key, value)?;
533                Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
534            }
535            StateUpdateKind::Schema(key, value) => {
536                let schema = into_durable(key, value)?;
537                Some(memory::objects::StateUpdateKind::Schema(schema))
538            }
539            StateUpdateKind::SourceReferences(key, value) => {
540                let source_references = into_durable(key, value)?;
541                Some(memory::objects::StateUpdateKind::SourceReferences(
542                    source_references,
543                ))
544            }
545            StateUpdateKind::StorageCollectionMetadata(key, value) => {
546                let storage_collection_metadata = into_durable(key, value)?;
547                Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
548                    storage_collection_metadata,
549                ))
550            }
551            StateUpdateKind::SystemConfiguration(key, value) => {
552                let system_configuration = into_durable(key, value)?;
553                Some(memory::objects::StateUpdateKind::SystemConfiguration(
554                    system_configuration,
555                ))
556            }
557            StateUpdateKind::ClusterSystemConfiguration(key, value) => {
558                let cluster_system_configuration = into_durable(key, value)?;
559                Some(
560                    memory::objects::StateUpdateKind::ClusterSystemConfiguration(
561                        cluster_system_configuration,
562                    ),
563                )
564            }
565            StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
566                let replica_system_configuration = into_durable(key, value)?;
567                Some(
568                    memory::objects::StateUpdateKind::ReplicaSystemConfiguration(
569                        replica_system_configuration,
570                    ),
571                )
572            }
573            StateUpdateKind::SystemObjectMapping(key, value) => {
574                let system_object_mapping = into_durable(key, value)?;
575                Some(memory::objects::StateUpdateKind::SystemObjectMapping(
576                    system_object_mapping,
577                ))
578            }
579            StateUpdateKind::SystemPrivilege(key, value) => {
580                let system_privilege = into_durable(key, value)?;
581                Some(memory::objects::StateUpdateKind::SystemPrivilege(
582                    system_privilege,
583                ))
584            }
585            StateUpdateKind::UnfinalizedShard(key, value) => {
586                let unfinalized_shard = into_durable(key, value)?;
587                Some(memory::objects::StateUpdateKind::UnfinalizedShard(
588                    unfinalized_shard,
589                ))
590            }
591            // Not exposed to higher layers.
592            StateUpdateKind::Config(_, _)
593            | StateUpdateKind::FenceToken(_)
594            | StateUpdateKind::IdAllocator(_, _)
595            | StateUpdateKind::Setting(_, _)
596            | StateUpdateKind::TxnWalShard(_, _) => None,
597        })
598    }
599}
600
601impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
602    type Error = String;
603
604    fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
605        Ok(StateUpdate {
606            kind: TryInto::try_into(update.kind)?,
607            ts: update.ts,
608            diff: update.diff,
609        })
610    }
611}
612
613impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
614    type Error = String;
615
616    fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
617        let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
618        StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
619    }
620}
621
622impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
623    type Error = String;
624
625    fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
626        let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
627        StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
628    }
629}
630
631impl From<StateUpdateKind> for StateUpdateKindJson {
632    fn from(value: StateUpdateKind) -> Self {
633        let kind = value.into_proto_owned();
634        StateUpdateKindJson::from_serde(kind)
635    }
636}
637
638// Be very careful about changing these implementations. The default impl of `into_proto_owned`
639// calls `into_proto`, and this impl of `into_proto` calls `into_proto_owned`. It would be very
640// easy to accidentally cause infinite recursion.
641impl RustType<proto::StateUpdateKind> for StateUpdateKind {
642    fn into_proto(&self) -> proto::StateUpdateKind {
643        error!("unexpected clone of catalog data");
644        self.clone().into_proto_owned()
645    }
646
647    fn into_proto_owned(self) -> proto::StateUpdateKind {
648        match self {
649            StateUpdateKind::AuditLog(key, ()) => {
650                proto::StateUpdateKind::AuditLog(proto::AuditLog { key })
651            }
652            StateUpdateKind::Cluster(key, value) => {
653                proto::StateUpdateKind::Cluster(proto::Cluster { key, value })
654            }
655            StateUpdateKind::ClusterReplica(key, value) => {
656                proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value })
657            }
658            StateUpdateKind::Comment(key, value) => {
659                proto::StateUpdateKind::Comment(proto::Comment { key, value })
660            }
661            StateUpdateKind::Config(key, value) => {
662                proto::StateUpdateKind::Config(proto::Config { key, value })
663            }
664            StateUpdateKind::Database(key, value) => {
665                proto::StateUpdateKind::Database(proto::Database { key, value })
666            }
667            StateUpdateKind::DefaultPrivilege(key, value) => {
668                proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value })
669            }
670            StateUpdateKind::FenceToken(fence_token) => {
671                proto::StateUpdateKind::FenceToken(proto::FenceToken {
672                    deploy_generation: fence_token.deploy_generation,
673                    epoch: fence_token.epoch.get(),
674                })
675            }
676            StateUpdateKind::IdAllocator(key, value) => {
677                proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value })
678            }
679            StateUpdateKind::IntrospectionSourceIndex(key, value) => {
680                proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
681                    proto::ClusterIntrospectionSourceIndex { key, value },
682                )
683            }
684            StateUpdateKind::Item(key, value) => {
685                proto::StateUpdateKind::Item(proto::Item { key, value })
686            }
687            StateUpdateKind::NetworkPolicy(key, value) => {
688                proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value })
689            }
690            StateUpdateKind::Role(key, value) => {
691                proto::StateUpdateKind::Role(proto::Role { key, value })
692            }
693            StateUpdateKind::RoleAuth(key, value) => {
694                proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value })
695            }
696            StateUpdateKind::Schema(key, value) => {
697                proto::StateUpdateKind::Schema(proto::Schema { key, value })
698            }
699            StateUpdateKind::Setting(key, value) => {
700                proto::StateUpdateKind::Setting(proto::Setting { key, value })
701            }
702            StateUpdateKind::SourceReferences(key, value) => {
703                proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value })
704            }
705            StateUpdateKind::SystemConfiguration(key, value) => {
706                proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
707                    key,
708                    value,
709                })
710            }
711            StateUpdateKind::ClusterSystemConfiguration(key, value) => {
712                proto::StateUpdateKind::ClusterSystemConfiguration(
713                    proto::ClusterSystemConfiguration { key, value },
714                )
715            }
716            StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
717                proto::StateUpdateKind::ReplicaSystemConfiguration(
718                    proto::ReplicaSystemConfiguration { key, value },
719                )
720            }
721            StateUpdateKind::SystemObjectMapping(key, value) => {
722                proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value })
723            }
724            StateUpdateKind::SystemPrivilege(key, value) => {
725                proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value })
726            }
727            StateUpdateKind::StorageCollectionMetadata(key, value) => {
728                proto::StateUpdateKind::StorageCollectionMetadata(
729                    proto::StorageCollectionMetadata { key, value },
730                )
731            }
732            StateUpdateKind::UnfinalizedShard(key, ()) => {
733                proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key })
734            }
735            StateUpdateKind::TxnWalShard((), value) => {
736                proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value })
737            }
738        }
739    }
740
741    fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
742        Ok(match proto {
743            proto::StateUpdateKind::AuditLog(proto::AuditLog { key }) => {
744                StateUpdateKind::AuditLog(key, ())
745            }
746            proto::StateUpdateKind::Cluster(proto::Cluster { key, value }) => {
747                StateUpdateKind::Cluster(key, value)
748            }
749            proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value }) => {
750                StateUpdateKind::ClusterReplica(key, value)
751            }
752            proto::StateUpdateKind::Comment(proto::Comment { key, value }) => {
753                StateUpdateKind::Comment(key, value)
754            }
755            proto::StateUpdateKind::Config(proto::Config { key, value }) => {
756                StateUpdateKind::Config(key, value)
757            }
758            proto::StateUpdateKind::Database(proto::Database { key, value }) => {
759                StateUpdateKind::Database(key, value)
760            }
761            proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value }) => {
762                StateUpdateKind::DefaultPrivilege(key, value)
763            }
764            proto::StateUpdateKind::FenceToken(proto::FenceToken {
765                deploy_generation,
766                epoch,
767            }) => StateUpdateKind::FenceToken(FenceToken {
768                deploy_generation,
769                epoch: Epoch::new(epoch).ok_or_else(|| {
770                    TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
771                })?,
772            }),
773            proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value }) => {
774                StateUpdateKind::IdAllocator(key, value)
775            }
776            proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
777                proto::ClusterIntrospectionSourceIndex { key, value },
778            ) => StateUpdateKind::IntrospectionSourceIndex(key, value),
779            proto::StateUpdateKind::Item(proto::Item { key, value }) => {
780                StateUpdateKind::Item(key, value)
781            }
782            proto::StateUpdateKind::Role(proto::Role { key, value }) => {
783                StateUpdateKind::Role(key, value)
784            }
785            proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value }) => {
786                StateUpdateKind::RoleAuth(key, value)
787            }
788            proto::StateUpdateKind::Schema(proto::Schema { key, value }) => {
789                StateUpdateKind::Schema(key, value)
790            }
791            proto::StateUpdateKind::Setting(proto::Setting { key, value }) => {
792                StateUpdateKind::Setting(key, value)
793            }
794            proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
795                key,
796                value,
797            }) => StateUpdateKind::SystemConfiguration(key, value),
798            proto::StateUpdateKind::ClusterSystemConfiguration(
799                proto::ClusterSystemConfiguration { key, value },
800            ) => StateUpdateKind::ClusterSystemConfiguration(key, value),
801            proto::StateUpdateKind::ReplicaSystemConfiguration(
802                proto::ReplicaSystemConfiguration { key, value },
803            ) => StateUpdateKind::ReplicaSystemConfiguration(key, value),
804            proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value }) => {
805                StateUpdateKind::SystemObjectMapping(key, value)
806            }
807            proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value }) => {
808                StateUpdateKind::SystemPrivilege(key, value)
809            }
810            proto::StateUpdateKind::StorageCollectionMetadata(
811                proto::StorageCollectionMetadata { key, value },
812            ) => StateUpdateKind::StorageCollectionMetadata(key, value),
813            proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key }) => {
814                StateUpdateKind::UnfinalizedShard(key, ())
815            }
816            proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value }) => {
817                StateUpdateKind::TxnWalShard((), value)
818            }
819            proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value }) => {
820                StateUpdateKind::SourceReferences(key, value)
821            }
822            proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value }) => {
823                StateUpdateKind::NetworkPolicy(key, value)
824            }
825        })
826    }
827}
828
829/// Decodes a [`StateUpdate<StateUpdateKindJson>`] from the `(key, value, ts,
830/// diff)` tuple/update we store in persist.
831impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
832    fn from(kvtd: PersistStateUpdate) -> Self {
833        let ((key, ()), ts, diff) = kvtd;
834        StateUpdate {
835            kind: StateUpdateKindJson::from(key),
836            ts,
837            diff: diff.into(),
838        }
839    }
840}
841
842impl From<StateUpdateKindJson> for SourceData {
843    fn from(value: StateUpdateKindJson) -> SourceData {
844        let row = value.0.into_row();
845        SourceData(Ok(row))
846    }
847}
848
849impl From<SourceData> for StateUpdateKindJson {
850    fn from(value: SourceData) -> Self {
851        let row = value.0.expect("only Ok values stored in catalog shard");
852        StateUpdateKindJson(Jsonb::from_row(row))
853    }
854}
855
856#[cfg(test)]
857mod tests {
858    use mz_persist_types::Codec;
859    use mz_repr::{RelationDesc, SqlScalarType};
860    use mz_storage_types::sources::SourceData;
861    use proptest::prelude::*;
862
863    use crate::durable::Epoch;
864    use crate::durable::objects::FenceToken;
865    use crate::durable::objects::serialization::proto;
866    use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
867
868    #[mz_ore::test]
869    #[cfg_attr(miri, ignore)]
870    fn kind_test() {
871        let test_cases = [
872            (
873                StateUpdateKind::FenceToken(FenceToken {
874                    deploy_generation: 1,
875                    epoch: Epoch::new(1).expect("non-zero"),
876                }),
877                "FenceToken",
878            ),
879            (
880                StateUpdateKind::Config(
881                    proto::ConfigKey { key: String::new() },
882                    proto::ConfigValue { value: 1 },
883                ),
884                "Config",
885            ),
886            (
887                StateUpdateKind::Setting(
888                    proto::SettingKey {
889                        name: String::new(),
890                    },
891                    proto::SettingValue {
892                        value: String::new(),
893                    },
894                ),
895                "Setting",
896            ),
897            (
898                StateUpdateKind::AuditLog(
899                    proto::AuditLogKey {
900                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
901                            id: 1,
902                            event_type: proto::audit_log_event_v1::EventType::Create,
903                            object_type: proto::audit_log_event_v1::ObjectType::Cluster,
904                            user: None,
905                            occurred_at: proto::EpochMillis { millis: 4 },
906                            details: proto::audit_log_event_v1::Details::ResetAllV1(
907                                proto::Empty {},
908                            ),
909                        }),
910                    },
911                    (),
912                ),
913                "AuditLog",
914            ),
915        ];
916
917        for (kind, expected) in test_cases {
918            let json_kind: StateUpdateKindJson = kind.into();
919            let kind = json_kind.kind().to_string();
920            assert_eq!(expected, kind);
921        }
922    }
923
924    #[mz_ore::test]
925    #[cfg_attr(miri, ignore)]
926    fn audit_log_id_test() {
927        let test_cases = [
928            (
929                StateUpdateKind::AuditLog(
930                    proto::AuditLogKey {
931                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
932                            id: 1,
933                            event_type: proto::audit_log_event_v1::EventType::Create,
934                            object_type: proto::audit_log_event_v1::ObjectType::Cluster,
935                            user: None,
936                            occurred_at: proto::EpochMillis { millis: 4 },
937                            details: proto::audit_log_event_v1::Details::ResetAllV1(
938                                proto::Empty {},
939                            ),
940                        }),
941                    },
942                    (),
943                ),
944                1,
945            ),
946            (
947                StateUpdateKind::AuditLog(
948                    proto::AuditLogKey {
949                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
950                            id: 4,
951                            event_type: proto::audit_log_event_v1::EventType::Drop,
952                            object_type: proto::audit_log_event_v1::ObjectType::Database,
953                            user: None,
954                            occurred_at: proto::EpochMillis { millis: 7 },
955                            details: proto::audit_log_event_v1::Details::ResetAllV1(
956                                proto::Empty {},
957                            ),
958                        }),
959                    },
960                    (),
961                ),
962                4,
963            ),
964        ];
965
966        for (kind, expected) in test_cases {
967            let json_kind: StateUpdateKindJson = kind.into();
968            let id = json_kind.audit_log_id();
969            assert_eq!(expected, id);
970        }
971    }
972
973    proptest! {
974        #[mz_ore::test]
975        #[cfg_attr(miri, ignore)] // slow
976        fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
977            // Verify that we can map encode into the "raw" json format. This
978            // validates things like contained integers fitting in f64.
979            let raw = StateUpdateKindJson::from(kind.clone());
980            let desc = RelationDesc::builder().with_column("a", SqlScalarType::Jsonb.nullable(false)).finish();
981
982            // Verify that the raw roundtrips through the SourceData Codec impl.
983            let source_data = SourceData::from(raw.clone());
984            let mut encoded = Vec::new();
985            source_data.encode(&mut encoded);
986            let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
987            prop_assert_eq!(&source_data, &decoded);
988            let decoded = StateUpdateKindJson::from(decoded);
989            prop_assert_eq!(&raw, &decoded);
990
991            // Verify that the enum roundtrips.
992            let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
993            prop_assert_eq!(&kind, &decoded);
994        }
995    }
996}