1use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46#[cfg(test)]
47use proptest_derive::Arbitrary;
48use tracing::error;
49
50use crate::durable::debug::CollectionType;
51use crate::durable::objects::serialization::proto;
52use crate::durable::objects::{DurableType, FenceToken};
53use crate::durable::persist::Timestamp;
54use crate::durable::transaction::TransactionBatch;
55use crate::durable::{DurableCatalogError, Epoch};
56use crate::memory;
57
58pub trait IntoStateUpdateKindJson:
60 Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
61{
62 type Error: Debug;
63
64 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
65}
66impl<
67 T: Into<StateUpdateKindJson>
68 + TryFrom<StateUpdateKindJson>
69 + PartialEq
70 + Eq
71 + PartialOrd
72 + Ord
73 + Debug
74 + Clone,
75> IntoStateUpdateKindJson for T
76where
77 T::Error: Debug,
78{
79 type Error = T::Error;
80
81 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
82 <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
83 }
84}
85
86pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
88 type Error: Debug;
89
90 fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
91}
92impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
93where
94 <T as TryInto<StateUpdateKind>>::Error: Debug,
95{
96 type Error = <T as TryInto<StateUpdateKind>>::Error;
97
98 fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
99 <T as TryInto<StateUpdateKind>>::try_into(self)
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
105pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
106 pub kind: T,
108 pub ts: Timestamp,
110 pub diff: Diff,
112}
113
114impl StateUpdate {
115 pub(crate) fn from_txn_batch_ts(
117 txn_batch: TransactionBatch,
118 ts: Timestamp,
119 ) -> impl Iterator<Item = StateUpdate> {
120 Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
121 }
122
123 pub(crate) fn from_txn_batch(
125 txn_batch: TransactionBatch,
126 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
127 fn from_batch<K, V>(
128 batch: Vec<(K, V, Diff)>,
129 kind: fn(K, V) -> StateUpdateKind,
130 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
131 batch
132 .into_iter()
133 .map(move |(k, v, diff)| (kind(k, v), diff))
134 }
135 let TransactionBatch {
136 databases,
137 schemas,
138 items,
139 comments,
140 roles,
141 role_auth,
142 clusters,
143 cluster_replicas,
144 network_policies,
145 introspection_sources,
146 id_allocator,
147 configs,
148 settings,
149 source_references,
150 system_gid_mapping,
151 system_configurations,
152 default_privileges,
153 system_privileges,
154 storage_collection_metadata,
155 unfinalized_shards,
156 txn_wal_shard,
157 audit_log_updates,
158 upper: _,
159 } = txn_batch;
160 let databases = from_batch(databases, StateUpdateKind::Database);
161 let schemas = from_batch(schemas, StateUpdateKind::Schema);
162 let items = from_batch(items, StateUpdateKind::Item);
163 let comments = from_batch(comments, StateUpdateKind::Comment);
164 let roles = from_batch(roles, StateUpdateKind::Role);
165 let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
166 let clusters = from_batch(clusters, StateUpdateKind::Cluster);
167 let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
168 let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
169 let introspection_sources = from_batch(
170 introspection_sources,
171 StateUpdateKind::IntrospectionSourceIndex,
172 );
173 let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
174 let configs = from_batch(configs, StateUpdateKind::Config);
175 let settings = from_batch(settings, StateUpdateKind::Setting);
176 let system_object_mappings =
177 from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
178 let system_configurations =
179 from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
180 let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
181 let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
182 let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
183 let storage_collection_metadata = from_batch(
184 storage_collection_metadata,
185 StateUpdateKind::StorageCollectionMetadata,
186 );
187 let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
188 let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
189 let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
190
191 databases
192 .chain(schemas)
193 .chain(items)
194 .chain(comments)
195 .chain(roles)
196 .chain(role_auth)
197 .chain(clusters)
198 .chain(cluster_replicas)
199 .chain(network_policies)
200 .chain(introspection_sources)
201 .chain(id_allocators)
202 .chain(configs)
203 .chain(settings)
204 .chain(source_references)
205 .chain(system_object_mappings)
206 .chain(system_configurations)
207 .chain(default_privileges)
208 .chain(system_privileges)
209 .chain(storage_collection_metadata)
210 .chain(unfinalized_shards)
211 .chain(txn_wal_shard)
212 .chain(audit_logs)
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
221#[cfg_attr(test, derive(Arbitrary))]
222pub enum StateUpdateKind {
223 AuditLog(proto::AuditLogKey, ()),
224 Cluster(proto::ClusterKey, proto::ClusterValue),
225 ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
226 Comment(proto::CommentKey, proto::CommentValue),
227 Config(proto::ConfigKey, proto::ConfigValue),
228 Database(proto::DatabaseKey, proto::DatabaseValue),
229 DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
230 FenceToken(FenceToken),
231 IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
232 IntrospectionSourceIndex(
233 proto::ClusterIntrospectionSourceIndexKey,
234 proto::ClusterIntrospectionSourceIndexValue,
235 ),
236 Item(proto::ItemKey, proto::ItemValue),
237 NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
238 Role(proto::RoleKey, proto::RoleValue),
239 RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
240 Schema(proto::SchemaKey, proto::SchemaValue),
241 Setting(proto::SettingKey, proto::SettingValue),
242 SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
243 SystemConfiguration(
244 proto::ServerConfigurationKey,
245 proto::ServerConfigurationValue,
246 ),
247 SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
248 SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
249 StorageCollectionMetadata(
250 proto::StorageCollectionMetadataKey,
251 proto::StorageCollectionMetadataValue,
252 ),
253 UnfinalizedShard(proto::UnfinalizedShardKey, ()),
254 TxnWalShard((), proto::TxnWalShardValue),
255}
256
257impl StateUpdateKind {
258 pub(crate) fn collection_type(&self) -> Option<CollectionType> {
259 match self {
260 StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
261 StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
262 StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
263 StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
264 StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
265 StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
266 StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
267 StateUpdateKind::FenceToken(_) => None,
268 StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
269 StateUpdateKind::IntrospectionSourceIndex(_, _) => {
270 Some(CollectionType::ComputeIntrospectionSourceIndex)
271 }
272 StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
273 StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
274 StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
275 StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
276 StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
277 StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
278 StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
279 StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
280 StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
281 StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
282 StateUpdateKind::StorageCollectionMetadata(_, _) => {
283 Some(CollectionType::StorageCollectionMetadata)
284 }
285 StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
286 StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
287 }
288 }
289}
290
291#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
293pub struct StateUpdateKindJson(Jsonb);
294
295impl StateUpdateKindJson {
296 pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
297 let serde_value = serde_json::to_value(s).expect("valid json");
298 let row = Jsonb::from_serde_json(serde_value).expect("valid json");
299 StateUpdateKindJson(row)
300 }
301
302 pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
303 self.try_to_serde().expect("jsonb should roundtrip")
304 }
305
306 pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
307 &self,
308 ) -> Result<D, serde_json::error::Error> {
309 let serde_value = self.0.as_ref().to_serde_json();
310 serde_json::from_value::<D>(serde_value)
311 }
312
313 fn kind(&self) -> &str {
314 let row = self.0.row();
315 let mut iter = row.unpack_first().unwrap_map().iter();
316 let datum = iter
317 .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
318 .expect("kind field must exist");
319 datum.unwrap_str()
320 }
321
322 pub(crate) fn audit_log_id(&self) -> u64 {
323 assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
324 let row = self.0.row();
325 let mut iter = row.unpack_first().unwrap_map().iter();
326 let key = iter
327 .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
328 .expect("key field must exist")
329 .unwrap_map();
330 let event = key
331 .iter()
332 .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
333 .expect("event field must exist")
334 .unwrap_map();
335 let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
336 match event_version {
337 "V1" => {
338 let versioned_map = versioned_datum.unwrap_map();
339 let id = versioned_map
340 .iter()
341 .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
342 .expect("event field must exist")
343 .unwrap_numeric();
344 let mut cx = Numeric::context();
345 cx.try_into_u64(id.into_inner()).expect("invalid id")
346 }
347 version => unimplemented!("unsupported event version: {version}"),
348 }
349 }
350
351 pub(crate) fn is_always_deserializable(&self) -> bool {
353 static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
356 [
357 StateUpdateKind::FenceToken(FenceToken {
358 deploy_generation: 1,
359 epoch: Epoch::new(1).expect("non-zero"),
360 }),
361 StateUpdateKind::Config(
362 proto::ConfigKey { key: String::new() },
363 proto::ConfigValue { value: 1 },
364 ),
365 StateUpdateKind::Setting(
366 proto::SettingKey {
367 name: String::new(),
368 },
369 proto::SettingValue {
370 value: String::new(),
371 },
372 ),
373 StateUpdateKind::AuditLog(
374 proto::AuditLogKey {
375 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
376 id: 1,
377 event_type: proto::audit_log_event_v1::EventType::Create,
378 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
379 user: None,
380 occurred_at: proto::EpochMillis { millis: 1 },
381 details: proto::audit_log_event_v1::Details::ResetAllV1(
382 proto::Empty {},
383 ),
384 }),
385 },
386 (),
387 ),
388 ]
389 .into_iter()
390 .map(|kind| {
391 let json_kind: StateUpdateKindJson = kind.into();
392 json_kind.kind().to_string()
393 })
394 .collect()
395 });
396 DESERIALIZABLE_KINDS.contains(self.kind())
397 }
398
399 pub(crate) fn is_audit_log(&self) -> bool {
401 static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
404 let audit_log = StateUpdateKind::AuditLog(
405 proto::AuditLogKey {
406 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
407 id: 1,
408 event_type: proto::audit_log_event_v1::EventType::Create,
409 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
410 user: None,
411 occurred_at: proto::EpochMillis { millis: 1 },
412 details: proto::audit_log_event_v1::Details::ResetAllV1(proto::Empty {}),
413 }),
414 },
415 (),
416 );
417 let json_kind: StateUpdateKindJson = audit_log.into();
418 json_kind.kind().to_string()
419 });
420 &*AUDIT_LOG_KIND == self.kind()
421 }
422}
423
424type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff);
426
427impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
428 type Error = DurableCatalogError;
429
430 fn try_from(
431 StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
432 ) -> Result<Self, Self::Error> {
433 let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
434 let update = kind.map(|kind| memory::objects::StateUpdate {
435 kind,
436 ts: ts.clone(),
437 diff: diff.clone().try_into().expect("invalid diff"),
438 });
439 Ok(update)
440 }
441}
442
443impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
444 type Error = DurableCatalogError;
445
446 fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
447 fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
448 where
449 PK: ProtoType<T::Key> + Clone,
450 PV: ProtoType<T::Value> + Clone,
451 T: DurableType,
452 {
453 let key = key.clone().into_rust()?;
454 let value = value.clone().into_rust()?;
455 Ok(T::from_key_value(key, value))
456 }
457
458 Ok(match kind {
459 StateUpdateKind::AuditLog(key, value) => {
460 let audit_log = into_durable(key, value)?;
461 Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
462 }
463 StateUpdateKind::Cluster(key, value) => {
464 let cluster = into_durable(key, value)?;
465 Some(memory::objects::StateUpdateKind::Cluster(cluster))
466 }
467 StateUpdateKind::ClusterReplica(key, value) => {
468 let cluster_replica = into_durable(key, value)?;
469 Some(memory::objects::StateUpdateKind::ClusterReplica(
470 cluster_replica,
471 ))
472 }
473 StateUpdateKind::Comment(key, value) => {
474 let comment = into_durable(key, value)?;
475 Some(memory::objects::StateUpdateKind::Comment(comment))
476 }
477 StateUpdateKind::Database(key, value) => {
478 let database = into_durable(key, value)?;
479 Some(memory::objects::StateUpdateKind::Database(database))
480 }
481 StateUpdateKind::DefaultPrivilege(key, value) => {
482 let default_privilege = into_durable(key, value)?;
483 Some(memory::objects::StateUpdateKind::DefaultPrivilege(
484 default_privilege,
485 ))
486 }
487 StateUpdateKind::Item(key, value) => {
488 let item = into_durable(key, value)?;
489 Some(memory::objects::StateUpdateKind::Item(item))
490 }
491 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
492 let introspection_source_index = into_durable(key, value)?;
493 Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
494 introspection_source_index,
495 ))
496 }
497 StateUpdateKind::NetworkPolicy(key, value) => {
498 let policy = into_durable(key, value)?;
499 Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
500 }
501 StateUpdateKind::Role(key, value) => {
502 let role = into_durable(key, value)?;
503 Some(memory::objects::StateUpdateKind::Role(role))
504 }
505 StateUpdateKind::RoleAuth(key, value) => {
506 let role_auth = into_durable(key, value)?;
507 Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
508 }
509 StateUpdateKind::Schema(key, value) => {
510 let schema = into_durable(key, value)?;
511 Some(memory::objects::StateUpdateKind::Schema(schema))
512 }
513 StateUpdateKind::SourceReferences(key, value) => {
514 let source_references = into_durable(key, value)?;
515 Some(memory::objects::StateUpdateKind::SourceReferences(
516 source_references,
517 ))
518 }
519 StateUpdateKind::StorageCollectionMetadata(key, value) => {
520 let storage_collection_metadata = into_durable(key, value)?;
521 Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
522 storage_collection_metadata,
523 ))
524 }
525 StateUpdateKind::SystemConfiguration(key, value) => {
526 let system_configuration = into_durable(key, value)?;
527 Some(memory::objects::StateUpdateKind::SystemConfiguration(
528 system_configuration,
529 ))
530 }
531 StateUpdateKind::SystemObjectMapping(key, value) => {
532 let system_object_mapping = into_durable(key, value)?;
533 Some(memory::objects::StateUpdateKind::SystemObjectMapping(
534 system_object_mapping,
535 ))
536 }
537 StateUpdateKind::SystemPrivilege(key, value) => {
538 let system_privilege = into_durable(key, value)?;
539 Some(memory::objects::StateUpdateKind::SystemPrivilege(
540 system_privilege,
541 ))
542 }
543 StateUpdateKind::UnfinalizedShard(key, value) => {
544 let unfinalized_shard = into_durable(key, value)?;
545 Some(memory::objects::StateUpdateKind::UnfinalizedShard(
546 unfinalized_shard,
547 ))
548 }
549 StateUpdateKind::Config(_, _)
551 | StateUpdateKind::FenceToken(_)
552 | StateUpdateKind::IdAllocator(_, _)
553 | StateUpdateKind::Setting(_, _)
554 | StateUpdateKind::TxnWalShard(_, _) => None,
555 })
556 }
557}
558
559impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
560 type Error = String;
561
562 fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
563 Ok(StateUpdate {
564 kind: TryInto::try_into(update.kind)?,
565 ts: update.ts,
566 diff: update.diff,
567 })
568 }
569}
570
571impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
572 type Error = String;
573
574 fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
575 let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
576 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
577 }
578}
579
580impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
581 type Error = String;
582
583 fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
584 let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
585 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
586 }
587}
588
589impl From<StateUpdateKind> for StateUpdateKindJson {
590 fn from(value: StateUpdateKind) -> Self {
591 let kind = value.into_proto_owned();
592 StateUpdateKindJson::from_serde(kind)
593 }
594}
595
596impl RustType<proto::StateUpdateKind> for StateUpdateKind {
600 fn into_proto(&self) -> proto::StateUpdateKind {
601 error!("unexpected clone of catalog data");
602 self.clone().into_proto_owned()
603 }
604
605 fn into_proto_owned(self) -> proto::StateUpdateKind {
606 match self {
607 StateUpdateKind::AuditLog(key, ()) => {
608 proto::StateUpdateKind::AuditLog(proto::AuditLog { key })
609 }
610 StateUpdateKind::Cluster(key, value) => {
611 proto::StateUpdateKind::Cluster(proto::Cluster { key, value })
612 }
613 StateUpdateKind::ClusterReplica(key, value) => {
614 proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value })
615 }
616 StateUpdateKind::Comment(key, value) => {
617 proto::StateUpdateKind::Comment(proto::Comment { key, value })
618 }
619 StateUpdateKind::Config(key, value) => {
620 proto::StateUpdateKind::Config(proto::Config { key, value })
621 }
622 StateUpdateKind::Database(key, value) => {
623 proto::StateUpdateKind::Database(proto::Database { key, value })
624 }
625 StateUpdateKind::DefaultPrivilege(key, value) => {
626 proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value })
627 }
628 StateUpdateKind::FenceToken(fence_token) => {
629 proto::StateUpdateKind::FenceToken(proto::FenceToken {
630 deploy_generation: fence_token.deploy_generation,
631 epoch: fence_token.epoch.get(),
632 })
633 }
634 StateUpdateKind::IdAllocator(key, value) => {
635 proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value })
636 }
637 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
638 proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
639 proto::ClusterIntrospectionSourceIndex { key, value },
640 )
641 }
642 StateUpdateKind::Item(key, value) => {
643 proto::StateUpdateKind::Item(proto::Item { key, value })
644 }
645 StateUpdateKind::NetworkPolicy(key, value) => {
646 proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value })
647 }
648 StateUpdateKind::Role(key, value) => {
649 proto::StateUpdateKind::Role(proto::Role { key, value })
650 }
651 StateUpdateKind::RoleAuth(key, value) => {
652 proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value })
653 }
654 StateUpdateKind::Schema(key, value) => {
655 proto::StateUpdateKind::Schema(proto::Schema { key, value })
656 }
657 StateUpdateKind::Setting(key, value) => {
658 proto::StateUpdateKind::Setting(proto::Setting { key, value })
659 }
660 StateUpdateKind::SourceReferences(key, value) => {
661 proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value })
662 }
663 StateUpdateKind::SystemConfiguration(key, value) => {
664 proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
665 key,
666 value,
667 })
668 }
669 StateUpdateKind::SystemObjectMapping(key, value) => {
670 proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value })
671 }
672 StateUpdateKind::SystemPrivilege(key, value) => {
673 proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value })
674 }
675 StateUpdateKind::StorageCollectionMetadata(key, value) => {
676 proto::StateUpdateKind::StorageCollectionMetadata(
677 proto::StorageCollectionMetadata { key, value },
678 )
679 }
680 StateUpdateKind::UnfinalizedShard(key, ()) => {
681 proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key })
682 }
683 StateUpdateKind::TxnWalShard((), value) => {
684 proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value })
685 }
686 }
687 }
688
689 fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
690 Ok(match proto {
691 proto::StateUpdateKind::AuditLog(proto::AuditLog { key }) => {
692 StateUpdateKind::AuditLog(key, ())
693 }
694 proto::StateUpdateKind::Cluster(proto::Cluster { key, value }) => {
695 StateUpdateKind::Cluster(key, value)
696 }
697 proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value }) => {
698 StateUpdateKind::ClusterReplica(key, value)
699 }
700 proto::StateUpdateKind::Comment(proto::Comment { key, value }) => {
701 StateUpdateKind::Comment(key, value)
702 }
703 proto::StateUpdateKind::Config(proto::Config { key, value }) => {
704 StateUpdateKind::Config(key, value)
705 }
706 proto::StateUpdateKind::Database(proto::Database { key, value }) => {
707 StateUpdateKind::Database(key, value)
708 }
709 proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value }) => {
710 StateUpdateKind::DefaultPrivilege(key, value)
711 }
712 proto::StateUpdateKind::FenceToken(proto::FenceToken {
713 deploy_generation,
714 epoch,
715 }) => StateUpdateKind::FenceToken(FenceToken {
716 deploy_generation,
717 epoch: Epoch::new(epoch).ok_or_else(|| {
718 TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
719 })?,
720 }),
721 proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value }) => {
722 StateUpdateKind::IdAllocator(key, value)
723 }
724 proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
725 proto::ClusterIntrospectionSourceIndex { key, value },
726 ) => StateUpdateKind::IntrospectionSourceIndex(key, value),
727 proto::StateUpdateKind::Item(proto::Item { key, value }) => {
728 StateUpdateKind::Item(key, value)
729 }
730 proto::StateUpdateKind::Role(proto::Role { key, value }) => {
731 StateUpdateKind::Role(key, value)
732 }
733 proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value }) => {
734 StateUpdateKind::RoleAuth(key, value)
735 }
736 proto::StateUpdateKind::Schema(proto::Schema { key, value }) => {
737 StateUpdateKind::Schema(key, value)
738 }
739 proto::StateUpdateKind::Setting(proto::Setting { key, value }) => {
740 StateUpdateKind::Setting(key, value)
741 }
742 proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
743 key,
744 value,
745 }) => StateUpdateKind::SystemConfiguration(key, value),
746 proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value }) => {
747 StateUpdateKind::SystemObjectMapping(key, value)
748 }
749 proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value }) => {
750 StateUpdateKind::SystemPrivilege(key, value)
751 }
752 proto::StateUpdateKind::StorageCollectionMetadata(
753 proto::StorageCollectionMetadata { key, value },
754 ) => StateUpdateKind::StorageCollectionMetadata(key, value),
755 proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key }) => {
756 StateUpdateKind::UnfinalizedShard(key, ())
757 }
758 proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value }) => {
759 StateUpdateKind::TxnWalShard((), value)
760 }
761 proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value }) => {
762 StateUpdateKind::SourceReferences(key, value)
763 }
764 proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value }) => {
765 StateUpdateKind::NetworkPolicy(key, value)
766 }
767 })
768 }
769}
770
771impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
774 fn from(kvtd: PersistStateUpdate) -> Self {
775 let ((key, ()), ts, diff) = kvtd;
776 StateUpdate {
777 kind: StateUpdateKindJson::from(key),
778 ts,
779 diff: diff.into(),
780 }
781 }
782}
783
784impl From<StateUpdateKindJson> for SourceData {
785 fn from(value: StateUpdateKindJson) -> SourceData {
786 let row = value.0.into_row();
787 SourceData(Ok(row))
788 }
789}
790
791impl From<SourceData> for StateUpdateKindJson {
792 fn from(value: SourceData) -> Self {
793 let row = value.0.expect("only Ok values stored in catalog shard");
794 StateUpdateKindJson(Jsonb::from_row(row))
795 }
796}
797
798#[cfg(test)]
799mod tests {
800 use mz_persist_types::Codec;
801 use mz_repr::{RelationDesc, SqlScalarType};
802 use mz_storage_types::sources::SourceData;
803 use proptest::prelude::*;
804
805 use crate::durable::Epoch;
806 use crate::durable::objects::FenceToken;
807 use crate::durable::objects::serialization::proto;
808 use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
809
810 #[mz_ore::test]
811 #[cfg_attr(miri, ignore)]
812 fn kind_test() {
813 let test_cases = [
814 (
815 StateUpdateKind::FenceToken(FenceToken {
816 deploy_generation: 1,
817 epoch: Epoch::new(1).expect("non-zero"),
818 }),
819 "FenceToken",
820 ),
821 (
822 StateUpdateKind::Config(
823 proto::ConfigKey { key: String::new() },
824 proto::ConfigValue { value: 1 },
825 ),
826 "Config",
827 ),
828 (
829 StateUpdateKind::Setting(
830 proto::SettingKey {
831 name: String::new(),
832 },
833 proto::SettingValue {
834 value: String::new(),
835 },
836 ),
837 "Setting",
838 ),
839 (
840 StateUpdateKind::AuditLog(
841 proto::AuditLogKey {
842 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
843 id: 1,
844 event_type: proto::audit_log_event_v1::EventType::Create,
845 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
846 user: None,
847 occurred_at: proto::EpochMillis { millis: 4 },
848 details: proto::audit_log_event_v1::Details::ResetAllV1(
849 proto::Empty {},
850 ),
851 }),
852 },
853 (),
854 ),
855 "AuditLog",
856 ),
857 ];
858
859 for (kind, expected) in test_cases {
860 let json_kind: StateUpdateKindJson = kind.into();
861 let kind = json_kind.kind().to_string();
862 assert_eq!(expected, kind);
863 }
864 }
865
866 #[mz_ore::test]
867 #[cfg_attr(miri, ignore)]
868 fn audit_log_id_test() {
869 let test_cases = [
870 (
871 StateUpdateKind::AuditLog(
872 proto::AuditLogKey {
873 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
874 id: 1,
875 event_type: proto::audit_log_event_v1::EventType::Create,
876 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
877 user: None,
878 occurred_at: proto::EpochMillis { millis: 4 },
879 details: proto::audit_log_event_v1::Details::ResetAllV1(
880 proto::Empty {},
881 ),
882 }),
883 },
884 (),
885 ),
886 1,
887 ),
888 (
889 StateUpdateKind::AuditLog(
890 proto::AuditLogKey {
891 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
892 id: 4,
893 event_type: proto::audit_log_event_v1::EventType::Drop,
894 object_type: proto::audit_log_event_v1::ObjectType::Database,
895 user: None,
896 occurred_at: proto::EpochMillis { millis: 7 },
897 details: proto::audit_log_event_v1::Details::ResetAllV1(
898 proto::Empty {},
899 ),
900 }),
901 },
902 (),
903 ),
904 4,
905 ),
906 ];
907
908 for (kind, expected) in test_cases {
909 let json_kind: StateUpdateKindJson = kind.into();
910 let id = json_kind.audit_log_id();
911 assert_eq!(expected, id);
912 }
913 }
914
915 proptest! {
916 #[mz_ore::test]
917 #[cfg_attr(miri, ignore)] fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
919 let raw = StateUpdateKindJson::from(kind.clone());
922 let desc = RelationDesc::builder().with_column("a", SqlScalarType::Jsonb.nullable(false)).finish();
923
924 let source_data = SourceData::from(raw.clone());
926 let mut encoded = Vec::new();
927 source_data.encode(&mut encoded);
928 let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
929 prop_assert_eq!(&source_data, &decoded);
930 let decoded = StateUpdateKindJson::from(decoded);
931 prop_assert_eq!(&raw, &decoded);
932
933 let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
935 prop_assert_eq!(&kind, &decoded);
936 }
937 }
938}