Skip to main content

mz_catalog/durable/objects/
state_update.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This module contains various representations of a single catalog update and the logic necessary
11//! for converting between representations.
12//!
13//! The general lifecycle of a single update when read from persist is as follows:
14//!
15//!   1. The update is stored in persist as a [`PersistStateUpdate`].
16//!   2. After being read from persist the update is immediately converted into a
17//!      [`StateUpdate<StateUpdateKindJson>`], which models the update as a JSON.
18//!   3. The [`StateUpdateKindJson`] is converted into a protobuf message,
19//!      [`proto::StateUpdateKind`].
20//!   4. The update is then converted into a [`StateUpdate<StateUpdateKind>`], which is a strongly
21//!      typed Rust object.
22//!   5. Finally, the update is converted into an [`Option<memory::objects::StateUpdate>`], and
23//!      `Some` variants are given to the in-memory catalog. The in-memory catalog is only
24//!      interested in a subset of catalog updates which is why the [`Option`] is necessary.
25//!
26//! TLDR: [`PersistStateUpdate`] -> [`StateUpdate<StateUpdateKindJson>`] ->
27//!       [`proto::StateUpdateKind`] -> [`StateUpdate<StateUpdateKind>`] ->
28//!       [`Option<memory::objects::StateUpdate>`]
29//!
30//! The process of writing a catalog update to persist is the exact opposite.
31//!
32//! When running catalog protobuf upgrades/migrations we may need to take a detour and convert the
33//! [`StateUpdateKindJson`] to some `proto::object_v{x}::StateUpdateKind` before applying specific
34//! upgrades to get us to a valid [`proto::StateUpdateKind`].
35
36use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46#[cfg(test)]
47use proptest_derive::Arbitrary;
48use tracing::error;
49
50use crate::durable::debug::CollectionType;
51use crate::durable::objects::serialization::proto;
52use crate::durable::objects::{DurableType, FenceToken};
53use crate::durable::persist::Timestamp;
54use crate::durable::transaction::TransactionBatch;
55use crate::durable::{DurableCatalogError, Epoch};
56use crate::memory;
57
58/// Trait for objects that can be converted to/from a [`StateUpdateKindJson`].
59pub trait IntoStateUpdateKindJson:
60    Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
61{
62    type Error: Debug;
63
64    fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
65}
66impl<
67    T: Into<StateUpdateKindJson>
68        + TryFrom<StateUpdateKindJson>
69        + PartialEq
70        + Eq
71        + PartialOrd
72        + Ord
73        + Debug
74        + Clone,
75> IntoStateUpdateKindJson for T
76where
77    T::Error: Debug,
78{
79    type Error = T::Error;
80
81    fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
82        <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
83    }
84}
85
86/// Trait for objects that can be converted to/from a [`StateUpdateKind`].
87pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
88    type Error: Debug;
89
90    fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
91}
92impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
93where
94    <T as TryInto<StateUpdateKind>>::Error: Debug,
95{
96    type Error = <T as TryInto<StateUpdateKind>>::Error;
97
98    fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
99        <T as TryInto<StateUpdateKind>>::try_into(self)
100    }
101}
102
103/// A single update to the catalog state.
104#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
105pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
106    /// They kind and contents of the state update.
107    pub kind: T,
108    /// The timestamp at which the update occurred.
109    pub ts: Timestamp,
110    /// Record count difference for the update.
111    pub diff: Diff,
112}
113
114impl StateUpdate {
115    /// Convert a [`TransactionBatch`] to a list of [`StateUpdate`]s at timestamp `ts`.
116    pub(crate) fn from_txn_batch_ts(
117        txn_batch: TransactionBatch,
118        ts: Timestamp,
119    ) -> impl Iterator<Item = StateUpdate> {
120        Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
121    }
122
123    /// Convert a [`TransactionBatch`] to a list of [`StateUpdate`]s and [`Diff`]s.
124    pub(crate) fn from_txn_batch(
125        txn_batch: TransactionBatch,
126    ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
127        fn from_batch<K, V>(
128            batch: Vec<(K, V, Diff)>,
129            kind: fn(K, V) -> StateUpdateKind,
130        ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
131            batch
132                .into_iter()
133                .map(move |(k, v, diff)| (kind(k, v), diff))
134        }
135        let TransactionBatch {
136            databases,
137            schemas,
138            items,
139            comments,
140            roles,
141            role_auth,
142            clusters,
143            cluster_replicas,
144            network_policies,
145            introspection_sources,
146            id_allocator,
147            configs,
148            settings,
149            source_references,
150            system_gid_mapping,
151            system_configurations,
152            default_privileges,
153            system_privileges,
154            storage_collection_metadata,
155            unfinalized_shards,
156            txn_wal_shard,
157            audit_log_updates,
158            upper: _,
159        } = txn_batch;
160        let databases = from_batch(databases, StateUpdateKind::Database);
161        let schemas = from_batch(schemas, StateUpdateKind::Schema);
162        let items = from_batch(items, StateUpdateKind::Item);
163        let comments = from_batch(comments, StateUpdateKind::Comment);
164        let roles = from_batch(roles, StateUpdateKind::Role);
165        let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
166        let clusters = from_batch(clusters, StateUpdateKind::Cluster);
167        let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
168        let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
169        let introspection_sources = from_batch(
170            introspection_sources,
171            StateUpdateKind::IntrospectionSourceIndex,
172        );
173        let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
174        let configs = from_batch(configs, StateUpdateKind::Config);
175        let settings = from_batch(settings, StateUpdateKind::Setting);
176        let system_object_mappings =
177            from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
178        let system_configurations =
179            from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
180        let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
181        let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
182        let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
183        let storage_collection_metadata = from_batch(
184            storage_collection_metadata,
185            StateUpdateKind::StorageCollectionMetadata,
186        );
187        let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
188        let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
189        let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
190
191        databases
192            .chain(schemas)
193            .chain(items)
194            .chain(comments)
195            .chain(roles)
196            .chain(role_auth)
197            .chain(clusters)
198            .chain(cluster_replicas)
199            .chain(network_policies)
200            .chain(introspection_sources)
201            .chain(id_allocators)
202            .chain(configs)
203            .chain(settings)
204            .chain(source_references)
205            .chain(system_object_mappings)
206            .chain(system_configurations)
207            .chain(default_privileges)
208            .chain(system_privileges)
209            .chain(storage_collection_metadata)
210            .chain(unfinalized_shards)
211            .chain(txn_wal_shard)
212            .chain(audit_logs)
213    }
214}
215
216/// The contents of a single state update.
217///
218/// The entire catalog is serialized as bytes and saved in a single persist shard. We use this
219/// enum to determine what collection something in the catalog belongs to.
220#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
221#[cfg_attr(test, derive(Arbitrary))]
222pub enum StateUpdateKind {
223    AuditLog(proto::AuditLogKey, ()),
224    Cluster(proto::ClusterKey, proto::ClusterValue),
225    ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
226    Comment(proto::CommentKey, proto::CommentValue),
227    Config(proto::ConfigKey, proto::ConfigValue),
228    Database(proto::DatabaseKey, proto::DatabaseValue),
229    DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
230    FenceToken(FenceToken),
231    IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
232    IntrospectionSourceIndex(
233        proto::ClusterIntrospectionSourceIndexKey,
234        proto::ClusterIntrospectionSourceIndexValue,
235    ),
236    Item(proto::ItemKey, proto::ItemValue),
237    NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
238    Role(proto::RoleKey, proto::RoleValue),
239    RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
240    Schema(proto::SchemaKey, proto::SchemaValue),
241    Setting(proto::SettingKey, proto::SettingValue),
242    SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
243    SystemConfiguration(
244        proto::ServerConfigurationKey,
245        proto::ServerConfigurationValue,
246    ),
247    SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
248    SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
249    StorageCollectionMetadata(
250        proto::StorageCollectionMetadataKey,
251        proto::StorageCollectionMetadataValue,
252    ),
253    UnfinalizedShard(proto::UnfinalizedShardKey, ()),
254    TxnWalShard((), proto::TxnWalShardValue),
255}
256
257impl StateUpdateKind {
258    pub(crate) fn collection_type(&self) -> Option<CollectionType> {
259        match self {
260            StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
261            StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
262            StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
263            StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
264            StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
265            StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
266            StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
267            StateUpdateKind::FenceToken(_) => None,
268            StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
269            StateUpdateKind::IntrospectionSourceIndex(_, _) => {
270                Some(CollectionType::ComputeIntrospectionSourceIndex)
271            }
272            StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
273            StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
274            StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
275            StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
276            StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
277            StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
278            StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
279            StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
280            StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
281            StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
282            StateUpdateKind::StorageCollectionMetadata(_, _) => {
283                Some(CollectionType::StorageCollectionMetadata)
284            }
285            StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
286            StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
287        }
288    }
289}
290
291/// Version of [`StateUpdateKind`] to allow reading/writing raw json from/to persist.
292#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
293pub struct StateUpdateKindJson(Jsonb);
294
295impl StateUpdateKindJson {
296    pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
297        let serde_value = serde_json::to_value(s).expect("valid json");
298        let row = Jsonb::from_serde_json(serde_value).expect("valid json");
299        StateUpdateKindJson(row)
300    }
301
302    pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
303        self.try_to_serde().expect("jsonb should roundtrip")
304    }
305
306    pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
307        &self,
308    ) -> Result<D, serde_json::error::Error> {
309        let serde_value = self.0.as_ref().to_serde_json();
310        serde_json::from_value::<D>(serde_value)
311    }
312
313    fn kind(&self) -> &str {
314        let row = self.0.row();
315        let mut iter = row.unpack_first().unwrap_map().iter();
316        let datum = iter
317            .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
318            .expect("kind field must exist");
319        datum.unwrap_str()
320    }
321
322    pub(crate) fn audit_log_id(&self) -> u64 {
323        assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
324        let row = self.0.row();
325        let mut iter = row.unpack_first().unwrap_map().iter();
326        let key = iter
327            .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
328            .expect("key field must exist")
329            .unwrap_map();
330        let event = key
331            .iter()
332            .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
333            .expect("event field must exist")
334            .unwrap_map();
335        let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
336        match event_version {
337            "V1" => {
338                let versioned_map = versioned_datum.unwrap_map();
339                let id = versioned_map
340                    .iter()
341                    .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
342                    .expect("event field must exist")
343                    .unwrap_numeric();
344                let mut cx = Numeric::context();
345                cx.try_into_u64(id.into_inner()).expect("invalid id")
346            }
347            version => unimplemented!("unsupported event version: {version}"),
348        }
349    }
350
351    /// Returns true if this is an update kind that is always deserializable, even before migrations. Otherwise, returns false.
352    pub(crate) fn is_always_deserializable(&self) -> bool {
353        // Construct some fake update kinds so we can extract exactly what the kind field will
354        // serialize as.
355        static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
356            [
357                StateUpdateKind::FenceToken(FenceToken {
358                    deploy_generation: 1,
359                    epoch: Epoch::new(1).expect("non-zero"),
360                }),
361                StateUpdateKind::Config(
362                    proto::ConfigKey { key: String::new() },
363                    proto::ConfigValue { value: 1 },
364                ),
365                StateUpdateKind::Setting(
366                    proto::SettingKey {
367                        name: String::new(),
368                    },
369                    proto::SettingValue {
370                        value: String::new(),
371                    },
372                ),
373                StateUpdateKind::AuditLog(
374                    proto::AuditLogKey {
375                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
376                            id: 1,
377                            event_type: proto::audit_log_event_v1::EventType::Create,
378                            object_type: proto::audit_log_event_v1::ObjectType::Cluster,
379                            user: None,
380                            occurred_at: proto::EpochMillis { millis: 1 },
381                            details: proto::audit_log_event_v1::Details::ResetAllV1(
382                                proto::Empty {},
383                            ),
384                        }),
385                    },
386                    (),
387                ),
388            ]
389            .into_iter()
390            .map(|kind| {
391                let json_kind: StateUpdateKindJson = kind.into();
392                json_kind.kind().to_string()
393            })
394            .collect()
395        });
396        DESERIALIZABLE_KINDS.contains(self.kind())
397    }
398
399    /// Returns true if this is an audit log update. Otherwise, returns false.
400    pub(crate) fn is_audit_log(&self) -> bool {
401        // Construct a fake audit log so we can extract exactly what the kind field will serialize
402        // as.
403        static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
404            let audit_log = StateUpdateKind::AuditLog(
405                proto::AuditLogKey {
406                    event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
407                        id: 1,
408                        event_type: proto::audit_log_event_v1::EventType::Create,
409                        object_type: proto::audit_log_event_v1::ObjectType::Cluster,
410                        user: None,
411                        occurred_at: proto::EpochMillis { millis: 1 },
412                        details: proto::audit_log_event_v1::Details::ResetAllV1(proto::Empty {}),
413                    }),
414                },
415                (),
416            );
417            let json_kind: StateUpdateKindJson = audit_log.into();
418            json_kind.kind().to_string()
419        });
420        &*AUDIT_LOG_KIND == self.kind()
421    }
422}
423
424/// Version of [`StateUpdateKind`] that is stored directly in persist.
425type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff);
426
427impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
428    type Error = DurableCatalogError;
429
430    fn try_from(
431        StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
432    ) -> Result<Self, Self::Error> {
433        let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
434        let update = kind.map(|kind| memory::objects::StateUpdate {
435            kind,
436            ts: ts.clone(),
437            diff: diff.clone().try_into().expect("invalid diff"),
438        });
439        Ok(update)
440    }
441}
442
443impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
444    type Error = DurableCatalogError;
445
446    fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
447        fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
448        where
449            PK: ProtoType<T::Key> + Clone,
450            PV: ProtoType<T::Value> + Clone,
451            T: DurableType,
452        {
453            let key = key.clone().into_rust()?;
454            let value = value.clone().into_rust()?;
455            Ok(T::from_key_value(key, value))
456        }
457
458        Ok(match kind {
459            StateUpdateKind::AuditLog(key, value) => {
460                let audit_log = into_durable(key, value)?;
461                Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
462            }
463            StateUpdateKind::Cluster(key, value) => {
464                let cluster = into_durable(key, value)?;
465                Some(memory::objects::StateUpdateKind::Cluster(cluster))
466            }
467            StateUpdateKind::ClusterReplica(key, value) => {
468                let cluster_replica = into_durable(key, value)?;
469                Some(memory::objects::StateUpdateKind::ClusterReplica(
470                    cluster_replica,
471                ))
472            }
473            StateUpdateKind::Comment(key, value) => {
474                let comment = into_durable(key, value)?;
475                Some(memory::objects::StateUpdateKind::Comment(comment))
476            }
477            StateUpdateKind::Database(key, value) => {
478                let database = into_durable(key, value)?;
479                Some(memory::objects::StateUpdateKind::Database(database))
480            }
481            StateUpdateKind::DefaultPrivilege(key, value) => {
482                let default_privilege = into_durable(key, value)?;
483                Some(memory::objects::StateUpdateKind::DefaultPrivilege(
484                    default_privilege,
485                ))
486            }
487            StateUpdateKind::Item(key, value) => {
488                let item = into_durable(key, value)?;
489                Some(memory::objects::StateUpdateKind::Item(item))
490            }
491            StateUpdateKind::IntrospectionSourceIndex(key, value) => {
492                let introspection_source_index = into_durable(key, value)?;
493                Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
494                    introspection_source_index,
495                ))
496            }
497            StateUpdateKind::NetworkPolicy(key, value) => {
498                let policy = into_durable(key, value)?;
499                Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
500            }
501            StateUpdateKind::Role(key, value) => {
502                let role = into_durable(key, value)?;
503                Some(memory::objects::StateUpdateKind::Role(role))
504            }
505            StateUpdateKind::RoleAuth(key, value) => {
506                let role_auth = into_durable(key, value)?;
507                Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
508            }
509            StateUpdateKind::Schema(key, value) => {
510                let schema = into_durable(key, value)?;
511                Some(memory::objects::StateUpdateKind::Schema(schema))
512            }
513            StateUpdateKind::SourceReferences(key, value) => {
514                let source_references = into_durable(key, value)?;
515                Some(memory::objects::StateUpdateKind::SourceReferences(
516                    source_references,
517                ))
518            }
519            StateUpdateKind::StorageCollectionMetadata(key, value) => {
520                let storage_collection_metadata = into_durable(key, value)?;
521                Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
522                    storage_collection_metadata,
523                ))
524            }
525            StateUpdateKind::SystemConfiguration(key, value) => {
526                let system_configuration = into_durable(key, value)?;
527                Some(memory::objects::StateUpdateKind::SystemConfiguration(
528                    system_configuration,
529                ))
530            }
531            StateUpdateKind::SystemObjectMapping(key, value) => {
532                let system_object_mapping = into_durable(key, value)?;
533                Some(memory::objects::StateUpdateKind::SystemObjectMapping(
534                    system_object_mapping,
535                ))
536            }
537            StateUpdateKind::SystemPrivilege(key, value) => {
538                let system_privilege = into_durable(key, value)?;
539                Some(memory::objects::StateUpdateKind::SystemPrivilege(
540                    system_privilege,
541                ))
542            }
543            StateUpdateKind::UnfinalizedShard(key, value) => {
544                let unfinalized_shard = into_durable(key, value)?;
545                Some(memory::objects::StateUpdateKind::UnfinalizedShard(
546                    unfinalized_shard,
547                ))
548            }
549            // Not exposed to higher layers.
550            StateUpdateKind::Config(_, _)
551            | StateUpdateKind::FenceToken(_)
552            | StateUpdateKind::IdAllocator(_, _)
553            | StateUpdateKind::Setting(_, _)
554            | StateUpdateKind::TxnWalShard(_, _) => None,
555        })
556    }
557}
558
559impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
560    type Error = String;
561
562    fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
563        Ok(StateUpdate {
564            kind: TryInto::try_into(update.kind)?,
565            ts: update.ts,
566            diff: update.diff,
567        })
568    }
569}
570
571impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
572    type Error = String;
573
574    fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
575        let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
576        StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
577    }
578}
579
580impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
581    type Error = String;
582
583    fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
584        let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
585        StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
586    }
587}
588
589impl From<StateUpdateKind> for StateUpdateKindJson {
590    fn from(value: StateUpdateKind) -> Self {
591        let kind = value.into_proto_owned();
592        StateUpdateKindJson::from_serde(kind)
593    }
594}
595
596// Be very careful about changing these implementations. The default impl of `into_proto_owned`
597// calls `into_proto`, and this impl of `into_proto` calls `into_proto_owned`. It would be very
598// easy to accidentally cause infinite recursion.
599impl RustType<proto::StateUpdateKind> for StateUpdateKind {
600    fn into_proto(&self) -> proto::StateUpdateKind {
601        error!("unexpected clone of catalog data");
602        self.clone().into_proto_owned()
603    }
604
605    fn into_proto_owned(self) -> proto::StateUpdateKind {
606        match self {
607            StateUpdateKind::AuditLog(key, ()) => {
608                proto::StateUpdateKind::AuditLog(proto::AuditLog { key })
609            }
610            StateUpdateKind::Cluster(key, value) => {
611                proto::StateUpdateKind::Cluster(proto::Cluster { key, value })
612            }
613            StateUpdateKind::ClusterReplica(key, value) => {
614                proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value })
615            }
616            StateUpdateKind::Comment(key, value) => {
617                proto::StateUpdateKind::Comment(proto::Comment { key, value })
618            }
619            StateUpdateKind::Config(key, value) => {
620                proto::StateUpdateKind::Config(proto::Config { key, value })
621            }
622            StateUpdateKind::Database(key, value) => {
623                proto::StateUpdateKind::Database(proto::Database { key, value })
624            }
625            StateUpdateKind::DefaultPrivilege(key, value) => {
626                proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value })
627            }
628            StateUpdateKind::FenceToken(fence_token) => {
629                proto::StateUpdateKind::FenceToken(proto::FenceToken {
630                    deploy_generation: fence_token.deploy_generation,
631                    epoch: fence_token.epoch.get(),
632                })
633            }
634            StateUpdateKind::IdAllocator(key, value) => {
635                proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value })
636            }
637            StateUpdateKind::IntrospectionSourceIndex(key, value) => {
638                proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
639                    proto::ClusterIntrospectionSourceIndex { key, value },
640                )
641            }
642            StateUpdateKind::Item(key, value) => {
643                proto::StateUpdateKind::Item(proto::Item { key, value })
644            }
645            StateUpdateKind::NetworkPolicy(key, value) => {
646                proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value })
647            }
648            StateUpdateKind::Role(key, value) => {
649                proto::StateUpdateKind::Role(proto::Role { key, value })
650            }
651            StateUpdateKind::RoleAuth(key, value) => {
652                proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value })
653            }
654            StateUpdateKind::Schema(key, value) => {
655                proto::StateUpdateKind::Schema(proto::Schema { key, value })
656            }
657            StateUpdateKind::Setting(key, value) => {
658                proto::StateUpdateKind::Setting(proto::Setting { key, value })
659            }
660            StateUpdateKind::SourceReferences(key, value) => {
661                proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value })
662            }
663            StateUpdateKind::SystemConfiguration(key, value) => {
664                proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
665                    key,
666                    value,
667                })
668            }
669            StateUpdateKind::SystemObjectMapping(key, value) => {
670                proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value })
671            }
672            StateUpdateKind::SystemPrivilege(key, value) => {
673                proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value })
674            }
675            StateUpdateKind::StorageCollectionMetadata(key, value) => {
676                proto::StateUpdateKind::StorageCollectionMetadata(
677                    proto::StorageCollectionMetadata { key, value },
678                )
679            }
680            StateUpdateKind::UnfinalizedShard(key, ()) => {
681                proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key })
682            }
683            StateUpdateKind::TxnWalShard((), value) => {
684                proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value })
685            }
686        }
687    }
688
689    fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
690        Ok(match proto {
691            proto::StateUpdateKind::AuditLog(proto::AuditLog { key }) => {
692                StateUpdateKind::AuditLog(key, ())
693            }
694            proto::StateUpdateKind::Cluster(proto::Cluster { key, value }) => {
695                StateUpdateKind::Cluster(key, value)
696            }
697            proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value }) => {
698                StateUpdateKind::ClusterReplica(key, value)
699            }
700            proto::StateUpdateKind::Comment(proto::Comment { key, value }) => {
701                StateUpdateKind::Comment(key, value)
702            }
703            proto::StateUpdateKind::Config(proto::Config { key, value }) => {
704                StateUpdateKind::Config(key, value)
705            }
706            proto::StateUpdateKind::Database(proto::Database { key, value }) => {
707                StateUpdateKind::Database(key, value)
708            }
709            proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value }) => {
710                StateUpdateKind::DefaultPrivilege(key, value)
711            }
712            proto::StateUpdateKind::FenceToken(proto::FenceToken {
713                deploy_generation,
714                epoch,
715            }) => StateUpdateKind::FenceToken(FenceToken {
716                deploy_generation,
717                epoch: Epoch::new(epoch).ok_or_else(|| {
718                    TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
719                })?,
720            }),
721            proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value }) => {
722                StateUpdateKind::IdAllocator(key, value)
723            }
724            proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
725                proto::ClusterIntrospectionSourceIndex { key, value },
726            ) => StateUpdateKind::IntrospectionSourceIndex(key, value),
727            proto::StateUpdateKind::Item(proto::Item { key, value }) => {
728                StateUpdateKind::Item(key, value)
729            }
730            proto::StateUpdateKind::Role(proto::Role { key, value }) => {
731                StateUpdateKind::Role(key, value)
732            }
733            proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value }) => {
734                StateUpdateKind::RoleAuth(key, value)
735            }
736            proto::StateUpdateKind::Schema(proto::Schema { key, value }) => {
737                StateUpdateKind::Schema(key, value)
738            }
739            proto::StateUpdateKind::Setting(proto::Setting { key, value }) => {
740                StateUpdateKind::Setting(key, value)
741            }
742            proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
743                key,
744                value,
745            }) => StateUpdateKind::SystemConfiguration(key, value),
746            proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value }) => {
747                StateUpdateKind::SystemObjectMapping(key, value)
748            }
749            proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value }) => {
750                StateUpdateKind::SystemPrivilege(key, value)
751            }
752            proto::StateUpdateKind::StorageCollectionMetadata(
753                proto::StorageCollectionMetadata { key, value },
754            ) => StateUpdateKind::StorageCollectionMetadata(key, value),
755            proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key }) => {
756                StateUpdateKind::UnfinalizedShard(key, ())
757            }
758            proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value }) => {
759                StateUpdateKind::TxnWalShard((), value)
760            }
761            proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value }) => {
762                StateUpdateKind::SourceReferences(key, value)
763            }
764            proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value }) => {
765                StateUpdateKind::NetworkPolicy(key, value)
766            }
767        })
768    }
769}
770
771/// Decodes a [`StateUpdate<StateUpdateKindJson>`] from the `(key, value, ts,
772/// diff)` tuple/update we store in persist.
773impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
774    fn from(kvtd: PersistStateUpdate) -> Self {
775        let ((key, ()), ts, diff) = kvtd;
776        StateUpdate {
777            kind: StateUpdateKindJson::from(key),
778            ts,
779            diff: diff.into(),
780        }
781    }
782}
783
784impl From<StateUpdateKindJson> for SourceData {
785    fn from(value: StateUpdateKindJson) -> SourceData {
786        let row = value.0.into_row();
787        SourceData(Ok(row))
788    }
789}
790
791impl From<SourceData> for StateUpdateKindJson {
792    fn from(value: SourceData) -> Self {
793        let row = value.0.expect("only Ok values stored in catalog shard");
794        StateUpdateKindJson(Jsonb::from_row(row))
795    }
796}
797
798#[cfg(test)]
799mod tests {
800    use mz_persist_types::Codec;
801    use mz_repr::{RelationDesc, SqlScalarType};
802    use mz_storage_types::sources::SourceData;
803    use proptest::prelude::*;
804
805    use crate::durable::Epoch;
806    use crate::durable::objects::FenceToken;
807    use crate::durable::objects::serialization::proto;
808    use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
809
810    #[mz_ore::test]
811    #[cfg_attr(miri, ignore)]
812    fn kind_test() {
813        let test_cases = [
814            (
815                StateUpdateKind::FenceToken(FenceToken {
816                    deploy_generation: 1,
817                    epoch: Epoch::new(1).expect("non-zero"),
818                }),
819                "FenceToken",
820            ),
821            (
822                StateUpdateKind::Config(
823                    proto::ConfigKey { key: String::new() },
824                    proto::ConfigValue { value: 1 },
825                ),
826                "Config",
827            ),
828            (
829                StateUpdateKind::Setting(
830                    proto::SettingKey {
831                        name: String::new(),
832                    },
833                    proto::SettingValue {
834                        value: String::new(),
835                    },
836                ),
837                "Setting",
838            ),
839            (
840                StateUpdateKind::AuditLog(
841                    proto::AuditLogKey {
842                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
843                            id: 1,
844                            event_type: proto::audit_log_event_v1::EventType::Create,
845                            object_type: proto::audit_log_event_v1::ObjectType::Cluster,
846                            user: None,
847                            occurred_at: proto::EpochMillis { millis: 4 },
848                            details: proto::audit_log_event_v1::Details::ResetAllV1(
849                                proto::Empty {},
850                            ),
851                        }),
852                    },
853                    (),
854                ),
855                "AuditLog",
856            ),
857        ];
858
859        for (kind, expected) in test_cases {
860            let json_kind: StateUpdateKindJson = kind.into();
861            let kind = json_kind.kind().to_string();
862            assert_eq!(expected, kind);
863        }
864    }
865
866    #[mz_ore::test]
867    #[cfg_attr(miri, ignore)]
868    fn audit_log_id_test() {
869        let test_cases = [
870            (
871                StateUpdateKind::AuditLog(
872                    proto::AuditLogKey {
873                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
874                            id: 1,
875                            event_type: proto::audit_log_event_v1::EventType::Create,
876                            object_type: proto::audit_log_event_v1::ObjectType::Cluster,
877                            user: None,
878                            occurred_at: proto::EpochMillis { millis: 4 },
879                            details: proto::audit_log_event_v1::Details::ResetAllV1(
880                                proto::Empty {},
881                            ),
882                        }),
883                    },
884                    (),
885                ),
886                1,
887            ),
888            (
889                StateUpdateKind::AuditLog(
890                    proto::AuditLogKey {
891                        event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
892                            id: 4,
893                            event_type: proto::audit_log_event_v1::EventType::Drop,
894                            object_type: proto::audit_log_event_v1::ObjectType::Database,
895                            user: None,
896                            occurred_at: proto::EpochMillis { millis: 7 },
897                            details: proto::audit_log_event_v1::Details::ResetAllV1(
898                                proto::Empty {},
899                            ),
900                        }),
901                    },
902                    (),
903                ),
904                4,
905            ),
906        ];
907
908        for (kind, expected) in test_cases {
909            let json_kind: StateUpdateKindJson = kind.into();
910            let id = json_kind.audit_log_id();
911            assert_eq!(expected, id);
912        }
913    }
914
915    proptest! {
916        #[mz_ore::test]
917        #[cfg_attr(miri, ignore)] // slow
918        fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
919            // Verify that we can map encode into the "raw" json format. This
920            // validates things like contained integers fitting in f64.
921            let raw = StateUpdateKindJson::from(kind.clone());
922            let desc = RelationDesc::builder().with_column("a", SqlScalarType::Jsonb.nullable(false)).finish();
923
924            // Verify that the raw roundtrips through the SourceData Codec impl.
925            let source_data = SourceData::from(raw.clone());
926            let mut encoded = Vec::new();
927            source_data.encode(&mut encoded);
928            let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
929            prop_assert_eq!(&source_data, &decoded);
930            let decoded = StateUpdateKindJson::from(decoded);
931            prop_assert_eq!(&raw, &decoded);
932
933            // Verify that the enum roundtrips.
934            let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
935            prop_assert_eq!(&kind, &decoded);
936        }
937    }
938}