mz_catalog/durable/objects/
state_update.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains various representations of a single catalog update and the logic necessary
11//! for converting between representations.
12//!
13//! The general lifecycle of a single update when read from persist is as follows:
14//!
15//!   1. The update is stored in persist as a [`PersistStateUpdate`].
16//!   2. After being read from persist the update is immediately converted into a
17//!      [`StateUpdate<StateUpdateKindJson>`], which models the update as a JSON.
18//!   3. The [`StateUpdateKindJson`] is converted into a protobuf message,
19//!      [`proto::StateUpdateKind`].
20//!   4. The update is then converted into a [`StateUpdate<StateUpdateKind>`], which is a strongly
21//!      typed Rust object.
22//!   5. Finally, the update is converted into an [`Option<memory::objects::StateUpdate>`], and
23//!      `Some` variants are given to the in-memory catalog. The in-memory catalog is only
24//!      interested in a subset of catalog updates which is why the [`Option`] is necessary.
25//!
26//! TLDR: [`PersistStateUpdate`] -> [`StateUpdate<StateUpdateKindJson>`] ->
27//!       [`proto::StateUpdateKind`] -> [`StateUpdate<StateUpdateKind>`] ->
28//!       [`Option<memory::objects::StateUpdate>`]
29//!
30//! The process of writing a catalog update to persist is the exact opposite.
31//!
32//! When running catalog protobuf upgrades/migrations we may need to take a detour and convert the
33//! [`StateUpdateKindJson`] to some `proto::object_v{x}::StateUpdateKind` before applying specific
34//! upgrades to get us to a valid [`proto::StateUpdateKind`].
35
36use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46use proptest_derive::Arbitrary;
47use tracing::error;
48
49use crate::durable::debug::CollectionType;
50use crate::durable::objects::serialization::proto;
51use crate::durable::objects::{DurableType, FenceToken};
52use crate::durable::persist::Timestamp;
53use crate::durable::transaction::TransactionBatch;
54use crate::durable::{DurableCatalogError, Epoch};
55use crate::memory;
56
57/// Trait for objects that can be converted to/from a [`StateUpdateKindJson`].
58pub trait IntoStateUpdateKindJson:
59    Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
60{
61    type Error: Debug;
62
63    fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
64}
65impl<
66    T: Into<StateUpdateKindJson>
67        + TryFrom<StateUpdateKindJson>
68        + PartialEq
69        + Eq
70        + PartialOrd
71        + Ord
72        + Debug
73        + Clone,
74> IntoStateUpdateKindJson for T
75where
76    T::Error: Debug,
77{
78    type Error = T::Error;
79
80    fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
81        <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
82    }
83}
84
85/// Trait for objects that can be converted to/from a [`StateUpdateKind`].
86pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
87    type Error: Debug;
88
89    fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
90}
91impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
92where
93    <T as TryInto<StateUpdateKind>>::Error: Debug,
94{
95    type Error = <T as TryInto<StateUpdateKind>>::Error;
96
97    fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
98        <T as TryInto<StateUpdateKind>>::try_into(self)
99    }
100}
101
102/// A single update to the catalog state.
103#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
104pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
105    /// They kind and contents of the state update.
106    pub kind: T,
107    /// The timestamp at which the update occurred.
108    pub ts: Timestamp,
109    /// Record count difference for the update.
110    pub diff: Diff,
111}
112
113impl StateUpdate {
114    /// Convert a [`TransactionBatch`] to a list of [`StateUpdate`]s at timestamp `ts`.
115    pub(crate) fn from_txn_batch_ts(
116        txn_batch: TransactionBatch,
117        ts: Timestamp,
118    ) -> impl Iterator<Item = StateUpdate> {
119        Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
120    }
121
122    /// Convert a [`TransactionBatch`] to a list of [`StateUpdate`]s and [`Diff`]s.
123    pub(crate) fn from_txn_batch(
124        txn_batch: TransactionBatch,
125    ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
126        fn from_batch<K, V>(
127            batch: Vec<(K, V, Diff)>,
128            kind: fn(K, V) -> StateUpdateKind,
129        ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
130            batch
131                .into_iter()
132                .map(move |(k, v, diff)| (kind(k, v), diff))
133        }
134        let TransactionBatch {
135            databases,
136            schemas,
137            items,
138            comments,
139            roles,
140            role_auth,
141            clusters,
142            cluster_replicas,
143            network_policies,
144            introspection_sources,
145            id_allocator,
146            configs,
147            settings,
148            source_references,
149            system_gid_mapping,
150            system_configurations,
151            default_privileges,
152            system_privileges,
153            storage_collection_metadata,
154            unfinalized_shards,
155            txn_wal_shard,
156            audit_log_updates,
157            upper: _,
158        } = txn_batch;
159        let databases = from_batch(databases, StateUpdateKind::Database);
160        let schemas = from_batch(schemas, StateUpdateKind::Schema);
161        let items = from_batch(items, StateUpdateKind::Item);
162        let comments = from_batch(comments, StateUpdateKind::Comment);
163        let roles = from_batch(roles, StateUpdateKind::Role);
164        let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
165        let clusters = from_batch(clusters, StateUpdateKind::Cluster);
166        let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
167        let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
168        let introspection_sources = from_batch(
169            introspection_sources,
170            StateUpdateKind::IntrospectionSourceIndex,
171        );
172        let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
173        let configs = from_batch(configs, StateUpdateKind::Config);
174        let settings = from_batch(settings, StateUpdateKind::Setting);
175        let system_object_mappings =
176            from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
177        let system_configurations =
178            from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
179        let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
180        let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
181        let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
182        let storage_collection_metadata = from_batch(
183            storage_collection_metadata,
184            StateUpdateKind::StorageCollectionMetadata,
185        );
186        let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
187        let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
188        let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
189
190        databases
191            .chain(schemas)
192            .chain(items)
193            .chain(comments)
194            .chain(roles)
195            .chain(role_auth)
196            .chain(clusters)
197            .chain(cluster_replicas)
198            .chain(network_policies)
199            .chain(introspection_sources)
200            .chain(id_allocators)
201            .chain(configs)
202            .chain(settings)
203            .chain(source_references)
204            .chain(system_object_mappings)
205            .chain(system_configurations)
206            .chain(default_privileges)
207            .chain(system_privileges)
208            .chain(storage_collection_metadata)
209            .chain(unfinalized_shards)
210            .chain(txn_wal_shard)
211            .chain(audit_logs)
212    }
213}
214
215/// The contents of a single state update.
216///
217/// The entire catalog is serialized as bytes and saved in a single persist shard. We use this
218/// enum to determine what collection something in the catalog belongs to.
219#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Arbitrary)]
220pub enum StateUpdateKind {
221    AuditLog(proto::AuditLogKey, ()),
222    Cluster(proto::ClusterKey, proto::ClusterValue),
223    ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
224    Comment(proto::CommentKey, proto::CommentValue),
225    Config(proto::ConfigKey, proto::ConfigValue),
226    Database(proto::DatabaseKey, proto::DatabaseValue),
227    DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
228    FenceToken(FenceToken),
229    IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
230    IntrospectionSourceIndex(
231        proto::ClusterIntrospectionSourceIndexKey,
232        proto::ClusterIntrospectionSourceIndexValue,
233    ),
234    Item(proto::ItemKey, proto::ItemValue),
235    NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
236    Role(proto::RoleKey, proto::RoleValue),
237    RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
238    Schema(proto::SchemaKey, proto::SchemaValue),
239    Setting(proto::SettingKey, proto::SettingValue),
240    SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
241    SystemConfiguration(
242        proto::ServerConfigurationKey,
243        proto::ServerConfigurationValue,
244    ),
245    SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
246    SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
247    StorageCollectionMetadata(
248        proto::StorageCollectionMetadataKey,
249        proto::StorageCollectionMetadataValue,
250    ),
251    UnfinalizedShard(proto::UnfinalizedShardKey, ()),
252    TxnWalShard((), proto::TxnWalShardValue),
253}
254
255impl StateUpdateKind {
256    pub(crate) fn collection_type(&self) -> Option<CollectionType> {
257        match self {
258            StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
259            StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
260            StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
261            StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
262            StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
263            StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
264            StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
265            StateUpdateKind::FenceToken(_) => None,
266            StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
267            StateUpdateKind::IntrospectionSourceIndex(_, _) => {
268                Some(CollectionType::ComputeIntrospectionSourceIndex)
269            }
270            StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
271            StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
272            StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
273            StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
274            StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
275            StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
276            StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
277            StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
278            StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
279            StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
280            StateUpdateKind::StorageCollectionMetadata(_, _) => {
281                Some(CollectionType::StorageCollectionMetadata)
282            }
283            StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
284            StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
285        }
286    }
287}
288
289/// Version of [`StateUpdateKind`] to allow reading/writing raw json from/to persist.
290#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
291pub struct StateUpdateKindJson(Jsonb);
292
293impl StateUpdateKindJson {
294    pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
295        let serde_value = serde_json::to_value(s).expect("valid json");
296        let row = Jsonb::from_serde_json(serde_value).expect("valid json");
297        StateUpdateKindJson(row)
298    }
299
300    pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
301        self.try_to_serde().expect("jsonb should roundtrip")
302    }
303
304    pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
305        &self,
306    ) -> Result<D, serde_json::error::Error> {
307        let serde_value = self.0.as_ref().to_serde_json();
308        serde_json::from_value::<D>(serde_value)
309    }
310
311    fn kind(&self) -> &str {
312        let row = self.0.row();
313        let mut iter = row.unpack_first().unwrap_map().iter();
314        let datum = iter
315            .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
316            .expect("kind field must exist");
317        datum.unwrap_str()
318    }
319
320    pub(crate) fn audit_log_id(&self) -> u64 {
321        assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
322        let row = self.0.row();
323        let mut iter = row.unpack_first().unwrap_map().iter();
324        let key = iter
325            .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
326            .expect("key field must exist")
327            .unwrap_map();
328        let event = key
329            .iter()
330            .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
331            .expect("event field must exist")
332            .unwrap_map();
333        let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
334        match event_version {
335            "V1" => {
336                let versioned_map = versioned_datum.unwrap_map();
337                let id = versioned_map
338                    .iter()
339                    .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
340                    .expect("event field must exist")
341                    .unwrap_numeric();
342                let mut cx = Numeric::context();
343                cx.try_into_u64(id.into_inner()).expect("invalid id")
344            }
345            version => unimplemented!("unsupported event version: {version}"),
346        }
347    }
348
349    /// Returns true if this is an update kind that is always deserializable, even before migrations. Otherwise, returns false.
350    pub(crate) fn is_always_deserializable(&self) -> bool {
351        // Construct some fake update kinds so we can extract exactly what the kind field will
352        // serialize as.
353        static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
354            [
355                StateUpdateKind::FenceToken(FenceToken {
356                    deploy_generation: 1,
357                    epoch: Epoch::new(1).expect("non-zero"),
358                }),
359                StateUpdateKind::Config(
360                    proto::ConfigKey { key: String::new() },
361                    proto::ConfigValue { value: 1 },
362                ),
363                StateUpdateKind::Setting(
364                    proto::SettingKey {
365                        name: String::new(),
366                    },
367                    proto::SettingValue {
368                        value: String::new(),
369                    },
370                ),
371                StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
372            ]
373            .into_iter()
374            .map(|kind| {
375                let json_kind: StateUpdateKindJson = kind.into();
376                json_kind.kind().to_string()
377            })
378            .collect()
379        });
380        DESERIALIZABLE_KINDS.contains(self.kind())
381    }
382
383    /// Returns true if this is an audit log update. Otherwise, returns false.
384    pub(crate) fn is_audit_log(&self) -> bool {
385        // Construct a fake audit log so we can extract exactly what the kind field will serialize
386        // as.
387        static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
388            let audit_log = StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ());
389            let json_kind: StateUpdateKindJson = audit_log.into();
390            json_kind.kind().to_string()
391        });
392        &*AUDIT_LOG_KIND == self.kind()
393    }
394}
395
396/// Version of [`StateUpdateKind`] that is stored directly in persist.
397type PersistStateUpdate = (
398    (Result<SourceData, String>, Result<(), String>),
399    Timestamp,
400    StorageDiff,
401);
402
403impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
404    type Error = DurableCatalogError;
405
406    fn try_from(
407        StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
408    ) -> Result<Self, Self::Error> {
409        let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
410        let update = kind.map(|kind| memory::objects::StateUpdate {
411            kind,
412            ts: ts.clone(),
413            diff: diff.clone().try_into().expect("invalid diff"),
414        });
415        Ok(update)
416    }
417}
418
419impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
420    type Error = DurableCatalogError;
421
422    fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
423        fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
424        where
425            PK: ProtoType<T::Key> + Clone,
426            PV: ProtoType<T::Value> + Clone,
427            T: DurableType,
428        {
429            let key = key.clone().into_rust()?;
430            let value = value.clone().into_rust()?;
431            Ok(T::from_key_value(key, value))
432        }
433
434        Ok(match kind {
435            StateUpdateKind::AuditLog(key, value) => {
436                let audit_log = into_durable(key, value)?;
437                Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
438            }
439            StateUpdateKind::Cluster(key, value) => {
440                let cluster = into_durable(key, value)?;
441                Some(memory::objects::StateUpdateKind::Cluster(cluster))
442            }
443            StateUpdateKind::ClusterReplica(key, value) => {
444                let cluster_replica = into_durable(key, value)?;
445                Some(memory::objects::StateUpdateKind::ClusterReplica(
446                    cluster_replica,
447                ))
448            }
449            StateUpdateKind::Comment(key, value) => {
450                let comment = into_durable(key, value)?;
451                Some(memory::objects::StateUpdateKind::Comment(comment))
452            }
453            StateUpdateKind::Database(key, value) => {
454                let database = into_durable(key, value)?;
455                Some(memory::objects::StateUpdateKind::Database(database))
456            }
457            StateUpdateKind::DefaultPrivilege(key, value) => {
458                let default_privilege = into_durable(key, value)?;
459                Some(memory::objects::StateUpdateKind::DefaultPrivilege(
460                    default_privilege,
461                ))
462            }
463            StateUpdateKind::Item(key, value) => {
464                let item = into_durable(key, value)?;
465                Some(memory::objects::StateUpdateKind::Item(item))
466            }
467            StateUpdateKind::IntrospectionSourceIndex(key, value) => {
468                let introspection_source_index = into_durable(key, value)?;
469                Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
470                    introspection_source_index,
471                ))
472            }
473            StateUpdateKind::NetworkPolicy(key, value) => {
474                let policy = into_durable(key, value)?;
475                Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
476            }
477            StateUpdateKind::Role(key, value) => {
478                let role = into_durable(key, value)?;
479                Some(memory::objects::StateUpdateKind::Role(role))
480            }
481            StateUpdateKind::RoleAuth(key, value) => {
482                let role_auth = into_durable(key, value)?;
483                Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
484            }
485            StateUpdateKind::Schema(key, value) => {
486                let schema = into_durable(key, value)?;
487                Some(memory::objects::StateUpdateKind::Schema(schema))
488            }
489            StateUpdateKind::SourceReferences(key, value) => {
490                let source_references = into_durable(key, value)?;
491                Some(memory::objects::StateUpdateKind::SourceReferences(
492                    source_references,
493                ))
494            }
495            StateUpdateKind::StorageCollectionMetadata(key, value) => {
496                let storage_collection_metadata = into_durable(key, value)?;
497                Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
498                    storage_collection_metadata,
499                ))
500            }
501            StateUpdateKind::SystemConfiguration(key, value) => {
502                let system_configuration = into_durable(key, value)?;
503                Some(memory::objects::StateUpdateKind::SystemConfiguration(
504                    system_configuration,
505                ))
506            }
507            StateUpdateKind::SystemObjectMapping(key, value) => {
508                let system_object_mapping = into_durable(key, value)?;
509                Some(memory::objects::StateUpdateKind::SystemObjectMapping(
510                    system_object_mapping,
511                ))
512            }
513            StateUpdateKind::SystemPrivilege(key, value) => {
514                let system_privilege = into_durable(key, value)?;
515                Some(memory::objects::StateUpdateKind::SystemPrivilege(
516                    system_privilege,
517                ))
518            }
519            StateUpdateKind::UnfinalizedShard(key, value) => {
520                let unfinalized_shard = into_durable(key, value)?;
521                Some(memory::objects::StateUpdateKind::UnfinalizedShard(
522                    unfinalized_shard,
523                ))
524            }
525            // Not exposed to higher layers.
526            StateUpdateKind::Config(_, _)
527            | StateUpdateKind::FenceToken(_)
528            | StateUpdateKind::IdAllocator(_, _)
529            | StateUpdateKind::Setting(_, _)
530            | StateUpdateKind::TxnWalShard(_, _) => None,
531        })
532    }
533}
534
535impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
536    type Error = String;
537
538    fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
539        Ok(StateUpdate {
540            kind: TryInto::try_into(update.kind)?,
541            ts: update.ts,
542            diff: update.diff,
543        })
544    }
545}
546
547impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
548    type Error = String;
549
550    fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
551        let kind: proto::state_update_kind::Kind =
552            value.try_to_serde().map_err(|err| err.to_string())?;
553        let kind = proto::StateUpdateKind { kind: Some(kind) };
554        StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
555    }
556}
557
558impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
559    type Error = String;
560
561    fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
562        let kind: proto::state_update_kind::Kind =
563            value.try_to_serde().map_err(|err| err.to_string())?;
564        let kind = proto::StateUpdateKind { kind: Some(kind) };
565        StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
566    }
567}
568
569impl From<StateUpdateKind> for StateUpdateKindJson {
570    fn from(value: StateUpdateKind) -> Self {
571        let kind = value.into_proto_owned();
572        let kind = kind.kind.expect("kind should be set");
573        StateUpdateKindJson::from_serde(kind)
574    }
575}
576
577// Be very careful about changing these implementations. The default impl of `into_proto_owned`
578// calls `into_proto`, and this impl of `into_proto` calls `into_proto_owned`. It would be very
579// easy to accidentally cause infinite recursion.
580impl RustType<proto::StateUpdateKind> for StateUpdateKind {
581    fn into_proto(&self) -> proto::StateUpdateKind {
582        error!("unexpected clone of catalog data");
583        self.clone().into_proto_owned()
584    }
585
586    fn into_proto_owned(self) -> proto::StateUpdateKind {
587        proto::StateUpdateKind {
588            kind: Some(match self {
589                StateUpdateKind::AuditLog(key, _value) => {
590                    proto::state_update_kind::Kind::AuditLog(proto::state_update_kind::AuditLog {
591                        key: Some(key),
592                    })
593                }
594                StateUpdateKind::Cluster(key, value) => {
595                    proto::state_update_kind::Kind::Cluster(proto::state_update_kind::Cluster {
596                        key: Some(key),
597                        value: Some(value),
598                    })
599                }
600                StateUpdateKind::ClusterReplica(key, value) => {
601                    proto::state_update_kind::Kind::ClusterReplica(
602                        proto::state_update_kind::ClusterReplica {
603                            key: Some(key),
604                            value: Some(value),
605                        },
606                    )
607                }
608                StateUpdateKind::Comment(key, value) => {
609                    proto::state_update_kind::Kind::Comment(proto::state_update_kind::Comment {
610                        key: Some(key),
611                        value: Some(value),
612                    })
613                }
614                StateUpdateKind::Config(key, value) => {
615                    proto::state_update_kind::Kind::Config(proto::state_update_kind::Config {
616                        key: Some(key),
617                        value: Some(value),
618                    })
619                }
620                StateUpdateKind::Database(key, value) => {
621                    proto::state_update_kind::Kind::Database(proto::state_update_kind::Database {
622                        key: Some(key),
623                        value: Some(value),
624                    })
625                }
626                StateUpdateKind::DefaultPrivilege(key, value) => {
627                    proto::state_update_kind::Kind::DefaultPrivileges(
628                        proto::state_update_kind::DefaultPrivileges {
629                            key: Some(key),
630                            value: Some(value),
631                        },
632                    )
633                }
634                StateUpdateKind::FenceToken(fence_token) => {
635                    proto::state_update_kind::Kind::FenceToken(
636                        proto::state_update_kind::FenceToken {
637                            deploy_generation: fence_token.deploy_generation,
638                            epoch: fence_token.epoch.get(),
639                        },
640                    )
641                }
642                StateUpdateKind::IdAllocator(key, value) => {
643                    proto::state_update_kind::Kind::IdAlloc(proto::state_update_kind::IdAlloc {
644                        key: Some(key),
645                        value: Some(value),
646                    })
647                }
648                StateUpdateKind::IntrospectionSourceIndex(key, value) => {
649                    proto::state_update_kind::Kind::ClusterIntrospectionSourceIndex(
650                        proto::state_update_kind::ClusterIntrospectionSourceIndex {
651                            key: Some(key),
652                            value: Some(value),
653                        },
654                    )
655                }
656                StateUpdateKind::Item(key, value) => {
657                    proto::state_update_kind::Kind::Item(proto::state_update_kind::Item {
658                        key: Some(key),
659                        value: Some(value),
660                    })
661                }
662                StateUpdateKind::NetworkPolicy(key, value) => {
663                    proto::state_update_kind::Kind::NetworkPolicy(
664                        proto::state_update_kind::NetworkPolicy {
665                            key: Some(key),
666                            value: Some(value),
667                        },
668                    )
669                }
670                StateUpdateKind::Role(key, value) => {
671                    proto::state_update_kind::Kind::Role(proto::state_update_kind::Role {
672                        key: Some(key),
673                        value: Some(value),
674                    })
675                }
676                StateUpdateKind::RoleAuth(key, value) => {
677                    proto::state_update_kind::Kind::RoleAuth(proto::state_update_kind::RoleAuth {
678                        key: Some(key),
679                        value: Some(value),
680                    })
681                }
682                StateUpdateKind::Schema(key, value) => {
683                    proto::state_update_kind::Kind::Schema(proto::state_update_kind::Schema {
684                        key: Some(key),
685                        value: Some(value),
686                    })
687                }
688                StateUpdateKind::Setting(key, value) => {
689                    proto::state_update_kind::Kind::Setting(proto::state_update_kind::Setting {
690                        key: Some(key),
691                        value: Some(value),
692                    })
693                }
694                StateUpdateKind::SourceReferences(key, value) => {
695                    proto::state_update_kind::Kind::SourceReferences(
696                        proto::state_update_kind::SourceReferences {
697                            key: Some(key),
698                            value: Some(value),
699                        },
700                    )
701                }
702                StateUpdateKind::SystemConfiguration(key, value) => {
703                    proto::state_update_kind::Kind::ServerConfiguration(
704                        proto::state_update_kind::ServerConfiguration {
705                            key: Some(key),
706                            value: Some(value),
707                        },
708                    )
709                }
710                StateUpdateKind::SystemObjectMapping(key, value) => {
711                    proto::state_update_kind::Kind::GidMapping(
712                        proto::state_update_kind::GidMapping {
713                            key: Some(key),
714                            value: Some(value),
715                        },
716                    )
717                }
718                StateUpdateKind::SystemPrivilege(key, value) => {
719                    proto::state_update_kind::Kind::SystemPrivileges(
720                        proto::state_update_kind::SystemPrivileges {
721                            key: Some(key),
722                            value: Some(value),
723                        },
724                    )
725                }
726                StateUpdateKind::StorageCollectionMetadata(key, value) => {
727                    proto::state_update_kind::Kind::StorageCollectionMetadata(
728                        proto::state_update_kind::StorageCollectionMetadata {
729                            key: Some(key),
730                            value: Some(value),
731                        },
732                    )
733                }
734                StateUpdateKind::UnfinalizedShard(key, ()) => {
735                    proto::state_update_kind::Kind::UnfinalizedShard(
736                        proto::state_update_kind::UnfinalizedShard { key: Some(key) },
737                    )
738                }
739                StateUpdateKind::TxnWalShard((), value) => {
740                    proto::state_update_kind::Kind::TxnWalShard(
741                        proto::state_update_kind::TxnWalShard { value: Some(value) },
742                    )
743                }
744            }),
745        }
746    }
747
748    fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
749        Ok(
750            match proto
751                .kind
752                .ok_or_else(|| TryFromProtoError::missing_field("StateUpdateKind::kind"))?
753            {
754                proto::state_update_kind::Kind::AuditLog(proto::state_update_kind::AuditLog {
755                    key,
756                }) => StateUpdateKind::AuditLog(
757                    key.ok_or_else(|| {
758                        TryFromProtoError::missing_field("state_update_kind::AuditLog::key")
759                    })?,
760                    (),
761                ),
762                proto::state_update_kind::Kind::Cluster(proto::state_update_kind::Cluster {
763                    key,
764                    value,
765                }) => StateUpdateKind::Cluster(
766                    key.ok_or_else(|| {
767                        TryFromProtoError::missing_field("state_update_kind::Cluster::key")
768                    })?,
769                    value.ok_or_else(|| {
770                        TryFromProtoError::missing_field("state_update_kind::Cluster::value")
771                    })?,
772                ),
773                proto::state_update_kind::Kind::ClusterReplica(
774                    proto::state_update_kind::ClusterReplica { key, value },
775                ) => StateUpdateKind::ClusterReplica(
776                    key.ok_or_else(|| {
777                        TryFromProtoError::missing_field("state_update_kind::ClusterReplica::key")
778                    })?,
779                    value.ok_or_else(|| {
780                        TryFromProtoError::missing_field("state_update_kind::ClusterReplica::value")
781                    })?,
782                ),
783                proto::state_update_kind::Kind::Comment(proto::state_update_kind::Comment {
784                    key,
785                    value,
786                }) => StateUpdateKind::Comment(
787                    key.ok_or_else(|| {
788                        TryFromProtoError::missing_field("state_update_kind::Comment::key")
789                    })?,
790                    value.ok_or_else(|| {
791                        TryFromProtoError::missing_field("state_update_kind::Comment::value")
792                    })?,
793                ),
794                proto::state_update_kind::Kind::Config(proto::state_update_kind::Config {
795                    key,
796                    value,
797                }) => StateUpdateKind::Config(
798                    key.ok_or_else(|| {
799                        TryFromProtoError::missing_field("state_update_kind::Config::key")
800                    })?,
801                    value.ok_or_else(|| {
802                        TryFromProtoError::missing_field("state_update_kind::Config::value")
803                    })?,
804                ),
805                proto::state_update_kind::Kind::Database(proto::state_update_kind::Database {
806                    key,
807                    value,
808                }) => StateUpdateKind::Database(
809                    key.ok_or_else(|| {
810                        TryFromProtoError::missing_field("state_update_kind::Database::key")
811                    })?,
812                    value.ok_or_else(|| {
813                        TryFromProtoError::missing_field("state_update_kind::Database::value")
814                    })?,
815                ),
816                proto::state_update_kind::Kind::DefaultPrivileges(
817                    proto::state_update_kind::DefaultPrivileges { key, value },
818                ) => StateUpdateKind::DefaultPrivilege(
819                    key.ok_or_else(|| {
820                        TryFromProtoError::missing_field(
821                            "state_update_kind::DefaultPrivileges::key",
822                        )
823                    })?,
824                    value.ok_or_else(|| {
825                        TryFromProtoError::missing_field(
826                            "state_update_kind::DefaultPrivileges::value",
827                        )
828                    })?,
829                ),
830                proto::state_update_kind::Kind::FenceToken(
831                    proto::state_update_kind::FenceToken {
832                        deploy_generation,
833                        epoch,
834                    },
835                ) => StateUpdateKind::FenceToken(FenceToken {
836                    deploy_generation,
837                    epoch: Epoch::new(epoch).ok_or_else(|| {
838                        TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
839                    })?,
840                }),
841                proto::state_update_kind::Kind::IdAlloc(proto::state_update_kind::IdAlloc {
842                    key,
843                    value,
844                }) => StateUpdateKind::IdAllocator(
845                    key.ok_or_else(|| {
846                        TryFromProtoError::missing_field("state_update_kind::IdAlloc::key")
847                    })?,
848                    value.ok_or_else(|| {
849                        TryFromProtoError::missing_field("state_update_kind::IdAlloc::value")
850                    })?,
851                ),
852                proto::state_update_kind::Kind::ClusterIntrospectionSourceIndex(
853                    proto::state_update_kind::ClusterIntrospectionSourceIndex { key, value },
854                ) => StateUpdateKind::IntrospectionSourceIndex(
855                    key.ok_or_else(|| {
856                        TryFromProtoError::missing_field(
857                            "state_update_kind::ClusterIntrospectionSourceIndex::key",
858                        )
859                    })?,
860                    value.ok_or_else(|| {
861                        TryFromProtoError::missing_field(
862                            "state_update_kind::ClusterIntrospectionSourceIndex::value",
863                        )
864                    })?,
865                ),
866                proto::state_update_kind::Kind::Item(proto::state_update_kind::Item {
867                    key,
868                    value,
869                }) => StateUpdateKind::Item(
870                    key.ok_or_else(|| {
871                        TryFromProtoError::missing_field("state_update_kind::Item::key")
872                    })?,
873                    value.ok_or_else(|| {
874                        TryFromProtoError::missing_field("state_update_kind::Item::value")
875                    })?,
876                ),
877                proto::state_update_kind::Kind::Role(proto::state_update_kind::Role {
878                    key,
879                    value,
880                }) => StateUpdateKind::Role(
881                    key.ok_or_else(|| {
882                        TryFromProtoError::missing_field("state_update_kind::Role::key")
883                    })?,
884                    value.ok_or_else(|| {
885                        TryFromProtoError::missing_field("state_update_kind::Role::value")
886                    })?,
887                ),
888                proto::state_update_kind::Kind::RoleAuth(proto::state_update_kind::RoleAuth {
889                    key,
890                    value,
891                }) => StateUpdateKind::RoleAuth(
892                    key.ok_or_else(|| {
893                        TryFromProtoError::missing_field("state_update_kind::RoleAuth::key")
894                    })?,
895                    value.ok_or_else(|| {
896                        TryFromProtoError::missing_field("state_update_kind::RoleAuth::value")
897                    })?,
898                ),
899                proto::state_update_kind::Kind::Schema(proto::state_update_kind::Schema {
900                    key,
901                    value,
902                }) => StateUpdateKind::Schema(
903                    key.ok_or_else(|| {
904                        TryFromProtoError::missing_field("state_update_kind::Schema::key")
905                    })?,
906                    value.ok_or_else(|| {
907                        TryFromProtoError::missing_field("state_update_kind::Schema::value")
908                    })?,
909                ),
910                proto::state_update_kind::Kind::Setting(proto::state_update_kind::Setting {
911                    key,
912                    value,
913                }) => StateUpdateKind::Setting(
914                    key.ok_or_else(|| {
915                        TryFromProtoError::missing_field("state_update_kind::Setting::key")
916                    })?,
917                    value.ok_or_else(|| {
918                        TryFromProtoError::missing_field("state_update_kind::Setting::value")
919                    })?,
920                ),
921                proto::state_update_kind::Kind::ServerConfiguration(
922                    proto::state_update_kind::ServerConfiguration { key, value },
923                ) => StateUpdateKind::SystemConfiguration(
924                    key.ok_or_else(|| {
925                        TryFromProtoError::missing_field(
926                            "state_update_kind::ServerConfiguration::key",
927                        )
928                    })?,
929                    value.ok_or_else(|| {
930                        TryFromProtoError::missing_field(
931                            "state_update_kind::ServerConfiguration::value",
932                        )
933                    })?,
934                ),
935                proto::state_update_kind::Kind::GidMapping(
936                    proto::state_update_kind::GidMapping { key, value },
937                ) => StateUpdateKind::SystemObjectMapping(
938                    key.ok_or_else(|| {
939                        TryFromProtoError::missing_field("state_update_kind::GidMapping::key")
940                    })?,
941                    value.ok_or_else(|| {
942                        TryFromProtoError::missing_field("state_update_kind::GidMapping::value")
943                    })?,
944                ),
945                proto::state_update_kind::Kind::SystemPrivileges(
946                    proto::state_update_kind::SystemPrivileges { key, value },
947                ) => StateUpdateKind::SystemPrivilege(
948                    key.ok_or_else(|| {
949                        TryFromProtoError::missing_field("state_update_kind::SystemPrivileges::key")
950                    })?,
951                    value.ok_or_else(|| {
952                        TryFromProtoError::missing_field(
953                            "state_update_kind::SystemPrivileges::value",
954                        )
955                    })?,
956                ),
957                proto::state_update_kind::Kind::StorageCollectionMetadata(
958                    proto::state_update_kind::StorageCollectionMetadata { key, value },
959                ) => StateUpdateKind::StorageCollectionMetadata(
960                    key.ok_or_else(|| {
961                        TryFromProtoError::missing_field(
962                            "state_update_kind::StorageCollectionMetadata::key",
963                        )
964                    })?,
965                    value.ok_or_else(|| {
966                        TryFromProtoError::missing_field(
967                            "state_update_kind::StorageCollectionMetadata::value",
968                        )
969                    })?,
970                ),
971                proto::state_update_kind::Kind::UnfinalizedShard(
972                    proto::state_update_kind::UnfinalizedShard { key },
973                ) => StateUpdateKind::UnfinalizedShard(
974                    key.ok_or_else(|| {
975                        TryFromProtoError::missing_field(
976                            "state_update_kind::StorageCollectionMetadata::key",
977                        )
978                    })?,
979                    (),
980                ),
981                proto::state_update_kind::Kind::TxnWalShard(
982                    proto::state_update_kind::TxnWalShard { value },
983                ) => StateUpdateKind::TxnWalShard(
984                    (),
985                    value.ok_or_else(|| {
986                        TryFromProtoError::missing_field("state_update_kind::TxnWalShard::value")
987                    })?,
988                ),
989                proto::state_update_kind::Kind::SourceReferences(
990                    proto::state_update_kind::SourceReferences { key, value },
991                ) => StateUpdateKind::SourceReferences(
992                    key.ok_or_else(|| {
993                        TryFromProtoError::missing_field("state_update_kind::SourceReferences::key")
994                    })?,
995                    value.ok_or_else(|| {
996                        TryFromProtoError::missing_field(
997                            "state_update_kind::SourceReferences::value",
998                        )
999                    })?,
1000                ),
1001                proto::state_update_kind::Kind::NetworkPolicy(
1002                    proto::state_update_kind::NetworkPolicy { key, value },
1003                ) => StateUpdateKind::NetworkPolicy(
1004                    key.ok_or_else(|| {
1005                        TryFromProtoError::missing_field("state_update_kind::NetworkPolicy::key")
1006                    })?,
1007                    value.ok_or_else(|| {
1008                        TryFromProtoError::missing_field("state_update_kind::NetworkPolicy::value")
1009                    })?,
1010                ),
1011            },
1012        )
1013    }
1014}
1015
1016/// Decodes a [`StateUpdate<StateUpdateKindJson>`] from the `(key, value, ts,
1017/// diff)` tuple/update we store in persist.
1018impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
1019    fn from(kvtd: PersistStateUpdate) -> Self {
1020        let ((key, val), ts, diff) = kvtd;
1021        let (key, ()) = (
1022            key.expect("persist decoding error"),
1023            val.expect("persist decoding error"),
1024        );
1025        StateUpdate {
1026            kind: StateUpdateKindJson::from(key),
1027            ts,
1028            diff: diff.into(),
1029        }
1030    }
1031}
1032
1033impl From<StateUpdateKindJson> for SourceData {
1034    fn from(value: StateUpdateKindJson) -> SourceData {
1035        let row = value.0.into_row();
1036        SourceData(Ok(row))
1037    }
1038}
1039
1040impl From<SourceData> for StateUpdateKindJson {
1041    fn from(value: SourceData) -> Self {
1042        let row = value.0.expect("only Ok values stored in catalog shard");
1043        StateUpdateKindJson(Jsonb::from_row(row))
1044    }
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049    use mz_persist_types::Codec;
1050    use mz_repr::{RelationDesc, ScalarType};
1051    use mz_storage_types::sources::SourceData;
1052    use proptest::prelude::*;
1053
1054    use crate::durable::Epoch;
1055    use crate::durable::objects::FenceToken;
1056    use crate::durable::objects::serialization::proto;
1057    use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
1058
1059    #[mz_ore::test]
1060    #[cfg_attr(miri, ignore)]
1061    fn kind_test() {
1062        let test_cases = [
1063            (
1064                StateUpdateKind::FenceToken(FenceToken {
1065                    deploy_generation: 1,
1066                    epoch: Epoch::new(1).expect("non-zero"),
1067                }),
1068                "FenceToken",
1069            ),
1070            (
1071                StateUpdateKind::Config(
1072                    proto::ConfigKey { key: String::new() },
1073                    proto::ConfigValue { value: 1 },
1074                ),
1075                "Config",
1076            ),
1077            (
1078                StateUpdateKind::Setting(
1079                    proto::SettingKey {
1080                        name: String::new(),
1081                    },
1082                    proto::SettingValue {
1083                        value: String::new(),
1084                    },
1085                ),
1086                "Setting",
1087            ),
1088            (
1089                StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
1090                "AuditLog",
1091            ),
1092        ];
1093
1094        for (kind, expected) in test_cases {
1095            let json_kind: StateUpdateKindJson = kind.into();
1096            let kind = json_kind.kind().to_string();
1097            assert_eq!(expected, kind);
1098        }
1099    }
1100
1101    #[mz_ore::test]
1102    #[cfg_attr(miri, ignore)]
1103    fn audit_log_id_test() {
1104        let test_cases = [
1105            (
1106                StateUpdateKind::AuditLog(
1107                    proto::AuditLogKey {
1108                        event: Some(proto::audit_log_key::Event::V1(proto::AuditLogEventV1 {
1109                            id: 1,
1110                            event_type: 2,
1111                            object_type: 3,
1112                            user: None,
1113                            occurred_at: None,
1114                            details: None,
1115                        })),
1116                    },
1117                    (),
1118                ),
1119                1,
1120            ),
1121            (
1122                StateUpdateKind::AuditLog(
1123                    proto::AuditLogKey {
1124                        event: Some(proto::audit_log_key::Event::V1(proto::AuditLogEventV1 {
1125                            id: 4,
1126                            event_type: 5,
1127                            object_type: 6,
1128                            user: None,
1129                            occurred_at: None,
1130                            details: None,
1131                        })),
1132                    },
1133                    (),
1134                ),
1135                4,
1136            ),
1137        ];
1138
1139        for (kind, expected) in test_cases {
1140            let json_kind: StateUpdateKindJson = kind.into();
1141            let id = json_kind.audit_log_id();
1142            assert_eq!(expected, id);
1143        }
1144    }
1145
1146    proptest! {
1147        #[mz_ore::test]
1148        #[cfg_attr(miri, ignore)] // slow
1149        fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
1150            // Verify that we can map encode into the "raw" json format. This
1151            // validates things like contained integers fitting in f64.
1152            let raw = StateUpdateKindJson::from(kind.clone());
1153            let desc = RelationDesc::builder().with_column("a", ScalarType::Jsonb.nullable(false)).finish();
1154
1155            // Verify that the raw roundtrips through the SourceData Codec impl.
1156            let source_data = SourceData::from(raw.clone());
1157            let mut encoded = Vec::new();
1158            source_data.encode(&mut encoded);
1159            let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
1160            prop_assert_eq!(&source_data, &decoded);
1161            let decoded = StateUpdateKindJson::from(decoded);
1162            prop_assert_eq!(&raw, &decoded);
1163
1164            // Verify that the enum roundtrips.
1165            let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
1166            prop_assert_eq!(&kind, &decoded);
1167        }
1168    }
1169}