1use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46#[cfg(test)]
47use proptest_derive::Arbitrary;
48use tracing::error;
49
50use crate::durable::debug::CollectionType;
51use crate::durable::objects::serialization::proto;
52use crate::durable::objects::{DurableType, FenceToken};
53use crate::durable::persist::Timestamp;
54use crate::durable::transaction::TransactionBatch;
55use crate::durable::{DurableCatalogError, Epoch};
56use crate::memory;
57
58pub trait IntoStateUpdateKindJson:
60 Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
61{
62 type Error: Debug;
63
64 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
65}
66impl<
67 T: Into<StateUpdateKindJson>
68 + TryFrom<StateUpdateKindJson>
69 + PartialEq
70 + Eq
71 + PartialOrd
72 + Ord
73 + Debug
74 + Clone,
75> IntoStateUpdateKindJson for T
76where
77 T::Error: Debug,
78{
79 type Error = T::Error;
80
81 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
82 <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
83 }
84}
85
86pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
88 type Error: Debug;
89
90 fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
91}
92impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
93where
94 <T as TryInto<StateUpdateKind>>::Error: Debug,
95{
96 type Error = <T as TryInto<StateUpdateKind>>::Error;
97
98 fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
99 <T as TryInto<StateUpdateKind>>::try_into(self)
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
105pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
106 pub kind: T,
108 pub ts: Timestamp,
110 pub diff: Diff,
112}
113
114impl StateUpdate {
115 pub(crate) fn from_txn_batch_ts(
117 txn_batch: TransactionBatch,
118 ts: Timestamp,
119 ) -> impl Iterator<Item = StateUpdate> {
120 Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
121 }
122
123 pub(crate) fn from_txn_batch(
125 txn_batch: TransactionBatch,
126 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
127 fn from_batch<K, V>(
128 batch: Vec<(K, V, Diff)>,
129 kind: fn(K, V) -> StateUpdateKind,
130 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
131 batch
132 .into_iter()
133 .map(move |(k, v, diff)| (kind(k, v), diff))
134 }
135 let TransactionBatch {
136 databases,
137 schemas,
138 items,
139 comments,
140 roles,
141 role_auth,
142 clusters,
143 cluster_replicas,
144 network_policies,
145 introspection_sources,
146 id_allocator,
147 configs,
148 settings,
149 source_references,
150 system_gid_mapping,
151 system_configurations,
152 cluster_system_configurations,
153 replica_system_configurations,
154 default_privileges,
155 system_privileges,
156 storage_collection_metadata,
157 unfinalized_shards,
158 txn_wal_shard,
159 audit_log_updates,
160 upper: _,
161 } = txn_batch;
162 let databases = from_batch(databases, StateUpdateKind::Database);
163 let schemas = from_batch(schemas, StateUpdateKind::Schema);
164 let items = from_batch(items, StateUpdateKind::Item);
165 let comments = from_batch(comments, StateUpdateKind::Comment);
166 let roles = from_batch(roles, StateUpdateKind::Role);
167 let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
168 let clusters = from_batch(clusters, StateUpdateKind::Cluster);
169 let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
170 let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
171 let introspection_sources = from_batch(
172 introspection_sources,
173 StateUpdateKind::IntrospectionSourceIndex,
174 );
175 let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
176 let configs = from_batch(configs, StateUpdateKind::Config);
177 let settings = from_batch(settings, StateUpdateKind::Setting);
178 let system_object_mappings =
179 from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
180 let system_configurations =
181 from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
182 let cluster_system_configurations = from_batch(
183 cluster_system_configurations,
184 StateUpdateKind::ClusterSystemConfiguration,
185 );
186 let replica_system_configurations = from_batch(
187 replica_system_configurations,
188 StateUpdateKind::ReplicaSystemConfiguration,
189 );
190 let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
191 let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
192 let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
193 let storage_collection_metadata = from_batch(
194 storage_collection_metadata,
195 StateUpdateKind::StorageCollectionMetadata,
196 );
197 let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
198 let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
199 let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
200
201 databases
202 .chain(schemas)
203 .chain(items)
204 .chain(comments)
205 .chain(roles)
206 .chain(role_auth)
207 .chain(clusters)
208 .chain(cluster_replicas)
209 .chain(network_policies)
210 .chain(introspection_sources)
211 .chain(id_allocators)
212 .chain(configs)
213 .chain(settings)
214 .chain(source_references)
215 .chain(system_object_mappings)
216 .chain(system_configurations)
217 .chain(cluster_system_configurations)
218 .chain(replica_system_configurations)
219 .chain(default_privileges)
220 .chain(system_privileges)
221 .chain(storage_collection_metadata)
222 .chain(unfinalized_shards)
223 .chain(txn_wal_shard)
224 .chain(audit_logs)
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
233#[cfg_attr(test, derive(Arbitrary))]
234pub enum StateUpdateKind {
235 AuditLog(proto::AuditLogKey, ()),
236 Cluster(proto::ClusterKey, proto::ClusterValue),
237 ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
238 Comment(proto::CommentKey, proto::CommentValue),
239 Config(proto::ConfigKey, proto::ConfigValue),
240 Database(proto::DatabaseKey, proto::DatabaseValue),
241 DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
242 FenceToken(FenceToken),
243 IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
244 IntrospectionSourceIndex(
245 proto::ClusterIntrospectionSourceIndexKey,
246 proto::ClusterIntrospectionSourceIndexValue,
247 ),
248 Item(proto::ItemKey, proto::ItemValue),
249 NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
250 Role(proto::RoleKey, proto::RoleValue),
251 RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
252 Schema(proto::SchemaKey, proto::SchemaValue),
253 Setting(proto::SettingKey, proto::SettingValue),
254 SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
255 SystemConfiguration(
256 proto::ServerConfigurationKey,
257 proto::ServerConfigurationValue,
258 ),
259 ClusterSystemConfiguration(
260 proto::ClusterSystemConfigurationKey,
261 proto::ClusterSystemConfigurationValue,
262 ),
263 ReplicaSystemConfiguration(
264 proto::ReplicaSystemConfigurationKey,
265 proto::ReplicaSystemConfigurationValue,
266 ),
267 SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
268 SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
269 StorageCollectionMetadata(
270 proto::StorageCollectionMetadataKey,
271 proto::StorageCollectionMetadataValue,
272 ),
273 UnfinalizedShard(proto::UnfinalizedShardKey, ()),
274 TxnWalShard((), proto::TxnWalShardValue),
275}
276
277impl StateUpdateKind {
278 pub(crate) fn collection_type(&self) -> Option<CollectionType> {
279 match self {
280 StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
281 StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
282 StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
283 StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
284 StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
285 StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
286 StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
287 StateUpdateKind::FenceToken(_) => None,
288 StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
289 StateUpdateKind::IntrospectionSourceIndex(_, _) => {
290 Some(CollectionType::ComputeIntrospectionSourceIndex)
291 }
292 StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
293 StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
294 StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
295 StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
296 StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
297 StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
298 StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
299 StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
300 StateUpdateKind::ClusterSystemConfiguration(_, _) => {
301 Some(CollectionType::ClusterSystemConfiguration)
302 }
303 StateUpdateKind::ReplicaSystemConfiguration(_, _) => {
304 Some(CollectionType::ReplicaSystemConfiguration)
305 }
306 StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
307 StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
308 StateUpdateKind::StorageCollectionMetadata(_, _) => {
309 Some(CollectionType::StorageCollectionMetadata)
310 }
311 StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
312 StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
313 }
314 }
315}
316
317#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
319pub struct StateUpdateKindJson(Jsonb);
320
321impl StateUpdateKindJson {
322 pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
323 let serde_value = serde_json::to_value(s).expect("valid json");
324 let row = Jsonb::from_serde_json(serde_value).expect("valid json");
325 StateUpdateKindJson(row)
326 }
327
328 pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
329 self.try_to_serde().expect("jsonb should roundtrip")
330 }
331
332 pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
333 &self,
334 ) -> Result<D, serde_json::error::Error> {
335 let serde_value = self.0.as_ref().to_serde_json();
336 serde_json::from_value::<D>(serde_value)
337 }
338
339 fn kind(&self) -> &str {
340 let row = self.0.row();
341 let mut iter = row.unpack_first().unwrap_map().iter();
342 let datum = iter
343 .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
344 .expect("kind field must exist");
345 datum.unwrap_str()
346 }
347
348 pub(crate) fn audit_log_id(&self) -> u64 {
349 assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
350 let row = self.0.row();
351 let mut iter = row.unpack_first().unwrap_map().iter();
352 let key = iter
353 .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
354 .expect("key field must exist")
355 .unwrap_map();
356 let event = key
357 .iter()
358 .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
359 .expect("event field must exist")
360 .unwrap_map();
361 let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
362 match event_version {
363 "V1" => {
364 let versioned_map = versioned_datum.unwrap_map();
365 let id = versioned_map
366 .iter()
367 .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
368 .expect("event field must exist")
369 .unwrap_numeric();
370 let mut cx = Numeric::context();
371 cx.try_into_u64(id.into_inner()).expect("invalid id")
372 }
373 version => unimplemented!("unsupported event version: {version}"),
374 }
375 }
376
377 pub(crate) fn is_always_deserializable(&self) -> bool {
379 static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
382 [
383 StateUpdateKind::FenceToken(FenceToken {
384 deploy_generation: 1,
385 epoch: Epoch::new(1).expect("non-zero"),
386 }),
387 StateUpdateKind::Config(
388 proto::ConfigKey { key: String::new() },
389 proto::ConfigValue { value: 1 },
390 ),
391 StateUpdateKind::Setting(
392 proto::SettingKey {
393 name: String::new(),
394 },
395 proto::SettingValue {
396 value: String::new(),
397 },
398 ),
399 StateUpdateKind::AuditLog(
400 proto::AuditLogKey {
401 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
402 id: 1,
403 event_type: proto::audit_log_event_v1::EventType::Create,
404 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
405 user: None,
406 occurred_at: proto::EpochMillis { millis: 1 },
407 details: proto::audit_log_event_v1::Details::ResetAllV1(
408 proto::Empty {},
409 ),
410 }),
411 },
412 (),
413 ),
414 ]
415 .into_iter()
416 .map(|kind| {
417 let json_kind: StateUpdateKindJson = kind.into();
418 json_kind.kind().to_string()
419 })
420 .collect()
421 });
422 DESERIALIZABLE_KINDS.contains(self.kind())
423 }
424
425 pub(crate) fn is_audit_log(&self) -> bool {
427 static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
430 let audit_log = StateUpdateKind::AuditLog(
431 proto::AuditLogKey {
432 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
433 id: 1,
434 event_type: proto::audit_log_event_v1::EventType::Create,
435 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
436 user: None,
437 occurred_at: proto::EpochMillis { millis: 1 },
438 details: proto::audit_log_event_v1::Details::ResetAllV1(proto::Empty {}),
439 }),
440 },
441 (),
442 );
443 let json_kind: StateUpdateKindJson = audit_log.into();
444 json_kind.kind().to_string()
445 });
446 &*AUDIT_LOG_KIND == self.kind()
447 }
448}
449
450type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff);
452
453impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
454 type Error = DurableCatalogError;
455
456 fn try_from(
457 StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
458 ) -> Result<Self, Self::Error> {
459 let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
460 let update = kind.map(|kind| memory::objects::StateUpdate {
461 kind,
462 ts: ts.clone(),
463 diff: diff.clone().try_into().expect("invalid diff"),
464 });
465 Ok(update)
466 }
467}
468
469impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
470 type Error = DurableCatalogError;
471
472 fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
473 fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
474 where
475 PK: ProtoType<T::Key> + Clone,
476 PV: ProtoType<T::Value> + Clone,
477 T: DurableType,
478 {
479 let key = key.clone().into_rust()?;
480 let value = value.clone().into_rust()?;
481 Ok(T::from_key_value(key, value))
482 }
483
484 Ok(match kind {
485 StateUpdateKind::AuditLog(key, value) => {
486 let audit_log = into_durable(key, value)?;
487 Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
488 }
489 StateUpdateKind::Cluster(key, value) => {
490 let cluster = into_durable(key, value)?;
491 Some(memory::objects::StateUpdateKind::Cluster(cluster))
492 }
493 StateUpdateKind::ClusterReplica(key, value) => {
494 let cluster_replica = into_durable(key, value)?;
495 Some(memory::objects::StateUpdateKind::ClusterReplica(
496 cluster_replica,
497 ))
498 }
499 StateUpdateKind::Comment(key, value) => {
500 let comment = into_durable(key, value)?;
501 Some(memory::objects::StateUpdateKind::Comment(comment))
502 }
503 StateUpdateKind::Database(key, value) => {
504 let database = into_durable(key, value)?;
505 Some(memory::objects::StateUpdateKind::Database(database))
506 }
507 StateUpdateKind::DefaultPrivilege(key, value) => {
508 let default_privilege = into_durable(key, value)?;
509 Some(memory::objects::StateUpdateKind::DefaultPrivilege(
510 default_privilege,
511 ))
512 }
513 StateUpdateKind::Item(key, value) => {
514 let item = into_durable(key, value)?;
515 Some(memory::objects::StateUpdateKind::Item(item))
516 }
517 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
518 let introspection_source_index = into_durable(key, value)?;
519 Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
520 introspection_source_index,
521 ))
522 }
523 StateUpdateKind::NetworkPolicy(key, value) => {
524 let policy = into_durable(key, value)?;
525 Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
526 }
527 StateUpdateKind::Role(key, value) => {
528 let role = into_durable(key, value)?;
529 Some(memory::objects::StateUpdateKind::Role(role))
530 }
531 StateUpdateKind::RoleAuth(key, value) => {
532 let role_auth = into_durable(key, value)?;
533 Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
534 }
535 StateUpdateKind::Schema(key, value) => {
536 let schema = into_durable(key, value)?;
537 Some(memory::objects::StateUpdateKind::Schema(schema))
538 }
539 StateUpdateKind::SourceReferences(key, value) => {
540 let source_references = into_durable(key, value)?;
541 Some(memory::objects::StateUpdateKind::SourceReferences(
542 source_references,
543 ))
544 }
545 StateUpdateKind::StorageCollectionMetadata(key, value) => {
546 let storage_collection_metadata = into_durable(key, value)?;
547 Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
548 storage_collection_metadata,
549 ))
550 }
551 StateUpdateKind::SystemConfiguration(key, value) => {
552 let system_configuration = into_durable(key, value)?;
553 Some(memory::objects::StateUpdateKind::SystemConfiguration(
554 system_configuration,
555 ))
556 }
557 StateUpdateKind::ClusterSystemConfiguration(key, value) => {
558 let cluster_system_configuration = into_durable(key, value)?;
559 Some(
560 memory::objects::StateUpdateKind::ClusterSystemConfiguration(
561 cluster_system_configuration,
562 ),
563 )
564 }
565 StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
566 let replica_system_configuration = into_durable(key, value)?;
567 Some(
568 memory::objects::StateUpdateKind::ReplicaSystemConfiguration(
569 replica_system_configuration,
570 ),
571 )
572 }
573 StateUpdateKind::SystemObjectMapping(key, value) => {
574 let system_object_mapping = into_durable(key, value)?;
575 Some(memory::objects::StateUpdateKind::SystemObjectMapping(
576 system_object_mapping,
577 ))
578 }
579 StateUpdateKind::SystemPrivilege(key, value) => {
580 let system_privilege = into_durable(key, value)?;
581 Some(memory::objects::StateUpdateKind::SystemPrivilege(
582 system_privilege,
583 ))
584 }
585 StateUpdateKind::UnfinalizedShard(key, value) => {
586 let unfinalized_shard = into_durable(key, value)?;
587 Some(memory::objects::StateUpdateKind::UnfinalizedShard(
588 unfinalized_shard,
589 ))
590 }
591 StateUpdateKind::Config(_, _)
593 | StateUpdateKind::FenceToken(_)
594 | StateUpdateKind::IdAllocator(_, _)
595 | StateUpdateKind::Setting(_, _)
596 | StateUpdateKind::TxnWalShard(_, _) => None,
597 })
598 }
599}
600
601impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
602 type Error = String;
603
604 fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
605 Ok(StateUpdate {
606 kind: TryInto::try_into(update.kind)?,
607 ts: update.ts,
608 diff: update.diff,
609 })
610 }
611}
612
613impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
614 type Error = String;
615
616 fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
617 let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
618 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
619 }
620}
621
622impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
623 type Error = String;
624
625 fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
626 let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
627 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
628 }
629}
630
631impl From<StateUpdateKind> for StateUpdateKindJson {
632 fn from(value: StateUpdateKind) -> Self {
633 let kind = value.into_proto_owned();
634 StateUpdateKindJson::from_serde(kind)
635 }
636}
637
638impl RustType<proto::StateUpdateKind> for StateUpdateKind {
642 fn into_proto(&self) -> proto::StateUpdateKind {
643 error!("unexpected clone of catalog data");
644 self.clone().into_proto_owned()
645 }
646
647 fn into_proto_owned(self) -> proto::StateUpdateKind {
648 match self {
649 StateUpdateKind::AuditLog(key, ()) => {
650 proto::StateUpdateKind::AuditLog(proto::AuditLog { key })
651 }
652 StateUpdateKind::Cluster(key, value) => {
653 proto::StateUpdateKind::Cluster(proto::Cluster { key, value })
654 }
655 StateUpdateKind::ClusterReplica(key, value) => {
656 proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value })
657 }
658 StateUpdateKind::Comment(key, value) => {
659 proto::StateUpdateKind::Comment(proto::Comment { key, value })
660 }
661 StateUpdateKind::Config(key, value) => {
662 proto::StateUpdateKind::Config(proto::Config { key, value })
663 }
664 StateUpdateKind::Database(key, value) => {
665 proto::StateUpdateKind::Database(proto::Database { key, value })
666 }
667 StateUpdateKind::DefaultPrivilege(key, value) => {
668 proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value })
669 }
670 StateUpdateKind::FenceToken(fence_token) => {
671 proto::StateUpdateKind::FenceToken(proto::FenceToken {
672 deploy_generation: fence_token.deploy_generation,
673 epoch: fence_token.epoch.get(),
674 })
675 }
676 StateUpdateKind::IdAllocator(key, value) => {
677 proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value })
678 }
679 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
680 proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
681 proto::ClusterIntrospectionSourceIndex { key, value },
682 )
683 }
684 StateUpdateKind::Item(key, value) => {
685 proto::StateUpdateKind::Item(proto::Item { key, value })
686 }
687 StateUpdateKind::NetworkPolicy(key, value) => {
688 proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value })
689 }
690 StateUpdateKind::Role(key, value) => {
691 proto::StateUpdateKind::Role(proto::Role { key, value })
692 }
693 StateUpdateKind::RoleAuth(key, value) => {
694 proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value })
695 }
696 StateUpdateKind::Schema(key, value) => {
697 proto::StateUpdateKind::Schema(proto::Schema { key, value })
698 }
699 StateUpdateKind::Setting(key, value) => {
700 proto::StateUpdateKind::Setting(proto::Setting { key, value })
701 }
702 StateUpdateKind::SourceReferences(key, value) => {
703 proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value })
704 }
705 StateUpdateKind::SystemConfiguration(key, value) => {
706 proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
707 key,
708 value,
709 })
710 }
711 StateUpdateKind::ClusterSystemConfiguration(key, value) => {
712 proto::StateUpdateKind::ClusterSystemConfiguration(
713 proto::ClusterSystemConfiguration { key, value },
714 )
715 }
716 StateUpdateKind::ReplicaSystemConfiguration(key, value) => {
717 proto::StateUpdateKind::ReplicaSystemConfiguration(
718 proto::ReplicaSystemConfiguration { key, value },
719 )
720 }
721 StateUpdateKind::SystemObjectMapping(key, value) => {
722 proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value })
723 }
724 StateUpdateKind::SystemPrivilege(key, value) => {
725 proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value })
726 }
727 StateUpdateKind::StorageCollectionMetadata(key, value) => {
728 proto::StateUpdateKind::StorageCollectionMetadata(
729 proto::StorageCollectionMetadata { key, value },
730 )
731 }
732 StateUpdateKind::UnfinalizedShard(key, ()) => {
733 proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key })
734 }
735 StateUpdateKind::TxnWalShard((), value) => {
736 proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value })
737 }
738 }
739 }
740
741 fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
742 Ok(match proto {
743 proto::StateUpdateKind::AuditLog(proto::AuditLog { key }) => {
744 StateUpdateKind::AuditLog(key, ())
745 }
746 proto::StateUpdateKind::Cluster(proto::Cluster { key, value }) => {
747 StateUpdateKind::Cluster(key, value)
748 }
749 proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value }) => {
750 StateUpdateKind::ClusterReplica(key, value)
751 }
752 proto::StateUpdateKind::Comment(proto::Comment { key, value }) => {
753 StateUpdateKind::Comment(key, value)
754 }
755 proto::StateUpdateKind::Config(proto::Config { key, value }) => {
756 StateUpdateKind::Config(key, value)
757 }
758 proto::StateUpdateKind::Database(proto::Database { key, value }) => {
759 StateUpdateKind::Database(key, value)
760 }
761 proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value }) => {
762 StateUpdateKind::DefaultPrivilege(key, value)
763 }
764 proto::StateUpdateKind::FenceToken(proto::FenceToken {
765 deploy_generation,
766 epoch,
767 }) => StateUpdateKind::FenceToken(FenceToken {
768 deploy_generation,
769 epoch: Epoch::new(epoch).ok_or_else(|| {
770 TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
771 })?,
772 }),
773 proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value }) => {
774 StateUpdateKind::IdAllocator(key, value)
775 }
776 proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
777 proto::ClusterIntrospectionSourceIndex { key, value },
778 ) => StateUpdateKind::IntrospectionSourceIndex(key, value),
779 proto::StateUpdateKind::Item(proto::Item { key, value }) => {
780 StateUpdateKind::Item(key, value)
781 }
782 proto::StateUpdateKind::Role(proto::Role { key, value }) => {
783 StateUpdateKind::Role(key, value)
784 }
785 proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value }) => {
786 StateUpdateKind::RoleAuth(key, value)
787 }
788 proto::StateUpdateKind::Schema(proto::Schema { key, value }) => {
789 StateUpdateKind::Schema(key, value)
790 }
791 proto::StateUpdateKind::Setting(proto::Setting { key, value }) => {
792 StateUpdateKind::Setting(key, value)
793 }
794 proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
795 key,
796 value,
797 }) => StateUpdateKind::SystemConfiguration(key, value),
798 proto::StateUpdateKind::ClusterSystemConfiguration(
799 proto::ClusterSystemConfiguration { key, value },
800 ) => StateUpdateKind::ClusterSystemConfiguration(key, value),
801 proto::StateUpdateKind::ReplicaSystemConfiguration(
802 proto::ReplicaSystemConfiguration { key, value },
803 ) => StateUpdateKind::ReplicaSystemConfiguration(key, value),
804 proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value }) => {
805 StateUpdateKind::SystemObjectMapping(key, value)
806 }
807 proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value }) => {
808 StateUpdateKind::SystemPrivilege(key, value)
809 }
810 proto::StateUpdateKind::StorageCollectionMetadata(
811 proto::StorageCollectionMetadata { key, value },
812 ) => StateUpdateKind::StorageCollectionMetadata(key, value),
813 proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key }) => {
814 StateUpdateKind::UnfinalizedShard(key, ())
815 }
816 proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value }) => {
817 StateUpdateKind::TxnWalShard((), value)
818 }
819 proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value }) => {
820 StateUpdateKind::SourceReferences(key, value)
821 }
822 proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value }) => {
823 StateUpdateKind::NetworkPolicy(key, value)
824 }
825 })
826 }
827}
828
829impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
832 fn from(kvtd: PersistStateUpdate) -> Self {
833 let ((key, ()), ts, diff) = kvtd;
834 StateUpdate {
835 kind: StateUpdateKindJson::from(key),
836 ts,
837 diff: diff.into(),
838 }
839 }
840}
841
842impl From<StateUpdateKindJson> for SourceData {
843 fn from(value: StateUpdateKindJson) -> SourceData {
844 let row = value.0.into_row();
845 SourceData(Ok(row))
846 }
847}
848
849impl From<SourceData> for StateUpdateKindJson {
850 fn from(value: SourceData) -> Self {
851 let row = value.0.expect("only Ok values stored in catalog shard");
852 StateUpdateKindJson(Jsonb::from_row(row))
853 }
854}
855
856#[cfg(test)]
857mod tests {
858 use mz_persist_types::Codec;
859 use mz_repr::{RelationDesc, SqlScalarType};
860 use mz_storage_types::sources::SourceData;
861 use proptest::prelude::*;
862
863 use crate::durable::Epoch;
864 use crate::durable::objects::FenceToken;
865 use crate::durable::objects::serialization::proto;
866 use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
867
868 #[mz_ore::test]
869 #[cfg_attr(miri, ignore)]
870 fn kind_test() {
871 let test_cases = [
872 (
873 StateUpdateKind::FenceToken(FenceToken {
874 deploy_generation: 1,
875 epoch: Epoch::new(1).expect("non-zero"),
876 }),
877 "FenceToken",
878 ),
879 (
880 StateUpdateKind::Config(
881 proto::ConfigKey { key: String::new() },
882 proto::ConfigValue { value: 1 },
883 ),
884 "Config",
885 ),
886 (
887 StateUpdateKind::Setting(
888 proto::SettingKey {
889 name: String::new(),
890 },
891 proto::SettingValue {
892 value: String::new(),
893 },
894 ),
895 "Setting",
896 ),
897 (
898 StateUpdateKind::AuditLog(
899 proto::AuditLogKey {
900 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
901 id: 1,
902 event_type: proto::audit_log_event_v1::EventType::Create,
903 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
904 user: None,
905 occurred_at: proto::EpochMillis { millis: 4 },
906 details: proto::audit_log_event_v1::Details::ResetAllV1(
907 proto::Empty {},
908 ),
909 }),
910 },
911 (),
912 ),
913 "AuditLog",
914 ),
915 ];
916
917 for (kind, expected) in test_cases {
918 let json_kind: StateUpdateKindJson = kind.into();
919 let kind = json_kind.kind().to_string();
920 assert_eq!(expected, kind);
921 }
922 }
923
924 #[mz_ore::test]
925 #[cfg_attr(miri, ignore)]
926 fn audit_log_id_test() {
927 let test_cases = [
928 (
929 StateUpdateKind::AuditLog(
930 proto::AuditLogKey {
931 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
932 id: 1,
933 event_type: proto::audit_log_event_v1::EventType::Create,
934 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
935 user: None,
936 occurred_at: proto::EpochMillis { millis: 4 },
937 details: proto::audit_log_event_v1::Details::ResetAllV1(
938 proto::Empty {},
939 ),
940 }),
941 },
942 (),
943 ),
944 1,
945 ),
946 (
947 StateUpdateKind::AuditLog(
948 proto::AuditLogKey {
949 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
950 id: 4,
951 event_type: proto::audit_log_event_v1::EventType::Drop,
952 object_type: proto::audit_log_event_v1::ObjectType::Database,
953 user: None,
954 occurred_at: proto::EpochMillis { millis: 7 },
955 details: proto::audit_log_event_v1::Details::ResetAllV1(
956 proto::Empty {},
957 ),
958 }),
959 },
960 (),
961 ),
962 4,
963 ),
964 ];
965
966 for (kind, expected) in test_cases {
967 let json_kind: StateUpdateKindJson = kind.into();
968 let id = json_kind.audit_log_id();
969 assert_eq!(expected, id);
970 }
971 }
972
973 proptest! {
974 #[mz_ore::test]
975 #[cfg_attr(miri, ignore)] fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
977 let raw = StateUpdateKindJson::from(kind.clone());
980 let desc = RelationDesc::builder().with_column("a", SqlScalarType::Jsonb.nullable(false)).finish();
981
982 let source_data = SourceData::from(raw.clone());
984 let mut encoded = Vec::new();
985 source_data.encode(&mut encoded);
986 let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
987 prop_assert_eq!(&source_data, &decoded);
988 let decoded = StateUpdateKindJson::from(decoded);
989 prop_assert_eq!(&raw, &decoded);
990
991 let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
993 prop_assert_eq!(&kind, &decoded);
994 }
995 }
996}