1use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46use proptest_derive::Arbitrary;
47use tracing::error;
48
49use crate::durable::debug::CollectionType;
50use crate::durable::objects::serialization::proto;
51use crate::durable::objects::{DurableType, FenceToken};
52use crate::durable::persist::Timestamp;
53use crate::durable::transaction::TransactionBatch;
54use crate::durable::{DurableCatalogError, Epoch};
55use crate::memory;
56
57pub trait IntoStateUpdateKindJson:
59 Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
60{
61 type Error: Debug;
62
63 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
64}
65impl<
66 T: Into<StateUpdateKindJson>
67 + TryFrom<StateUpdateKindJson>
68 + PartialEq
69 + Eq
70 + PartialOrd
71 + Ord
72 + Debug
73 + Clone,
74> IntoStateUpdateKindJson for T
75where
76 T::Error: Debug,
77{
78 type Error = T::Error;
79
80 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
81 <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
82 }
83}
84
85pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
87 type Error: Debug;
88
89 fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
90}
91impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
92where
93 <T as TryInto<StateUpdateKind>>::Error: Debug,
94{
95 type Error = <T as TryInto<StateUpdateKind>>::Error;
96
97 fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
98 <T as TryInto<StateUpdateKind>>::try_into(self)
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
104pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
105 pub kind: T,
107 pub ts: Timestamp,
109 pub diff: Diff,
111}
112
113impl StateUpdate {
114 pub(crate) fn from_txn_batch_ts(
116 txn_batch: TransactionBatch,
117 ts: Timestamp,
118 ) -> impl Iterator<Item = StateUpdate> {
119 Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
120 }
121
122 pub(crate) fn from_txn_batch(
124 txn_batch: TransactionBatch,
125 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
126 fn from_batch<K, V>(
127 batch: Vec<(K, V, Diff)>,
128 kind: fn(K, V) -> StateUpdateKind,
129 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
130 batch
131 .into_iter()
132 .map(move |(k, v, diff)| (kind(k, v), diff))
133 }
134 let TransactionBatch {
135 databases,
136 schemas,
137 items,
138 comments,
139 roles,
140 role_auth,
141 clusters,
142 cluster_replicas,
143 network_policies,
144 introspection_sources,
145 id_allocator,
146 configs,
147 settings,
148 source_references,
149 system_gid_mapping,
150 system_configurations,
151 default_privileges,
152 system_privileges,
153 storage_collection_metadata,
154 unfinalized_shards,
155 txn_wal_shard,
156 audit_log_updates,
157 upper: _,
158 } = txn_batch;
159 let databases = from_batch(databases, StateUpdateKind::Database);
160 let schemas = from_batch(schemas, StateUpdateKind::Schema);
161 let items = from_batch(items, StateUpdateKind::Item);
162 let comments = from_batch(comments, StateUpdateKind::Comment);
163 let roles = from_batch(roles, StateUpdateKind::Role);
164 let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
165 let clusters = from_batch(clusters, StateUpdateKind::Cluster);
166 let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
167 let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
168 let introspection_sources = from_batch(
169 introspection_sources,
170 StateUpdateKind::IntrospectionSourceIndex,
171 );
172 let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
173 let configs = from_batch(configs, StateUpdateKind::Config);
174 let settings = from_batch(settings, StateUpdateKind::Setting);
175 let system_object_mappings =
176 from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
177 let system_configurations =
178 from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
179 let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
180 let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
181 let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
182 let storage_collection_metadata = from_batch(
183 storage_collection_metadata,
184 StateUpdateKind::StorageCollectionMetadata,
185 );
186 let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
187 let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
188 let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
189
190 databases
191 .chain(schemas)
192 .chain(items)
193 .chain(comments)
194 .chain(roles)
195 .chain(role_auth)
196 .chain(clusters)
197 .chain(cluster_replicas)
198 .chain(network_policies)
199 .chain(introspection_sources)
200 .chain(id_allocators)
201 .chain(configs)
202 .chain(settings)
203 .chain(source_references)
204 .chain(system_object_mappings)
205 .chain(system_configurations)
206 .chain(default_privileges)
207 .chain(system_privileges)
208 .chain(storage_collection_metadata)
209 .chain(unfinalized_shards)
210 .chain(txn_wal_shard)
211 .chain(audit_logs)
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Arbitrary)]
220pub enum StateUpdateKind {
221 AuditLog(proto::AuditLogKey, ()),
222 Cluster(proto::ClusterKey, proto::ClusterValue),
223 ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
224 Comment(proto::CommentKey, proto::CommentValue),
225 Config(proto::ConfigKey, proto::ConfigValue),
226 Database(proto::DatabaseKey, proto::DatabaseValue),
227 DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
228 FenceToken(FenceToken),
229 IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
230 IntrospectionSourceIndex(
231 proto::ClusterIntrospectionSourceIndexKey,
232 proto::ClusterIntrospectionSourceIndexValue,
233 ),
234 Item(proto::ItemKey, proto::ItemValue),
235 NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
236 Role(proto::RoleKey, proto::RoleValue),
237 RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
238 Schema(proto::SchemaKey, proto::SchemaValue),
239 Setting(proto::SettingKey, proto::SettingValue),
240 SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
241 SystemConfiguration(
242 proto::ServerConfigurationKey,
243 proto::ServerConfigurationValue,
244 ),
245 SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
246 SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
247 StorageCollectionMetadata(
248 proto::StorageCollectionMetadataKey,
249 proto::StorageCollectionMetadataValue,
250 ),
251 UnfinalizedShard(proto::UnfinalizedShardKey, ()),
252 TxnWalShard((), proto::TxnWalShardValue),
253}
254
255impl StateUpdateKind {
256 pub(crate) fn collection_type(&self) -> Option<CollectionType> {
257 match self {
258 StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
259 StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
260 StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
261 StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
262 StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
263 StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
264 StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
265 StateUpdateKind::FenceToken(_) => None,
266 StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
267 StateUpdateKind::IntrospectionSourceIndex(_, _) => {
268 Some(CollectionType::ComputeIntrospectionSourceIndex)
269 }
270 StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
271 StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
272 StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
273 StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
274 StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
275 StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
276 StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
277 StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
278 StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
279 StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
280 StateUpdateKind::StorageCollectionMetadata(_, _) => {
281 Some(CollectionType::StorageCollectionMetadata)
282 }
283 StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
284 StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
285 }
286 }
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
291pub struct StateUpdateKindJson(Jsonb);
292
293impl StateUpdateKindJson {
294 pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
295 let serde_value = serde_json::to_value(s).expect("valid json");
296 let row = Jsonb::from_serde_json(serde_value).expect("valid json");
297 StateUpdateKindJson(row)
298 }
299
300 pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
301 self.try_to_serde().expect("jsonb should roundtrip")
302 }
303
304 pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
305 &self,
306 ) -> Result<D, serde_json::error::Error> {
307 let serde_value = self.0.as_ref().to_serde_json();
308 serde_json::from_value::<D>(serde_value)
309 }
310
311 fn kind(&self) -> &str {
312 let row = self.0.row();
313 let mut iter = row.unpack_first().unwrap_map().iter();
314 let datum = iter
315 .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
316 .expect("kind field must exist");
317 datum.unwrap_str()
318 }
319
320 pub(crate) fn audit_log_id(&self) -> u64 {
321 assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
322 let row = self.0.row();
323 let mut iter = row.unpack_first().unwrap_map().iter();
324 let key = iter
325 .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
326 .expect("key field must exist")
327 .unwrap_map();
328 let event = key
329 .iter()
330 .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
331 .expect("event field must exist")
332 .unwrap_map();
333 let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
334 match event_version {
335 "V1" => {
336 let versioned_map = versioned_datum.unwrap_map();
337 let id = versioned_map
338 .iter()
339 .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
340 .expect("event field must exist")
341 .unwrap_numeric();
342 let mut cx = Numeric::context();
343 cx.try_into_u64(id.into_inner()).expect("invalid id")
344 }
345 version => unimplemented!("unsupported event version: {version}"),
346 }
347 }
348
349 pub(crate) fn is_always_deserializable(&self) -> bool {
351 static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
354 [
355 StateUpdateKind::FenceToken(FenceToken {
356 deploy_generation: 1,
357 epoch: Epoch::new(1).expect("non-zero"),
358 }),
359 StateUpdateKind::Config(
360 proto::ConfigKey { key: String::new() },
361 proto::ConfigValue { value: 1 },
362 ),
363 StateUpdateKind::Setting(
364 proto::SettingKey {
365 name: String::new(),
366 },
367 proto::SettingValue {
368 value: String::new(),
369 },
370 ),
371 StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
372 ]
373 .into_iter()
374 .map(|kind| {
375 let json_kind: StateUpdateKindJson = kind.into();
376 json_kind.kind().to_string()
377 })
378 .collect()
379 });
380 DESERIALIZABLE_KINDS.contains(self.kind())
381 }
382
383 pub(crate) fn is_audit_log(&self) -> bool {
385 static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
388 let audit_log = StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ());
389 let json_kind: StateUpdateKindJson = audit_log.into();
390 json_kind.kind().to_string()
391 });
392 &*AUDIT_LOG_KIND == self.kind()
393 }
394}
395
396type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff);
398
399impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
400 type Error = DurableCatalogError;
401
402 fn try_from(
403 StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
404 ) -> Result<Self, Self::Error> {
405 let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
406 let update = kind.map(|kind| memory::objects::StateUpdate {
407 kind,
408 ts: ts.clone(),
409 diff: diff.clone().try_into().expect("invalid diff"),
410 });
411 Ok(update)
412 }
413}
414
415impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
416 type Error = DurableCatalogError;
417
418 fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
419 fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
420 where
421 PK: ProtoType<T::Key> + Clone,
422 PV: ProtoType<T::Value> + Clone,
423 T: DurableType,
424 {
425 let key = key.clone().into_rust()?;
426 let value = value.clone().into_rust()?;
427 Ok(T::from_key_value(key, value))
428 }
429
430 Ok(match kind {
431 StateUpdateKind::AuditLog(key, value) => {
432 let audit_log = into_durable(key, value)?;
433 Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
434 }
435 StateUpdateKind::Cluster(key, value) => {
436 let cluster = into_durable(key, value)?;
437 Some(memory::objects::StateUpdateKind::Cluster(cluster))
438 }
439 StateUpdateKind::ClusterReplica(key, value) => {
440 let cluster_replica = into_durable(key, value)?;
441 Some(memory::objects::StateUpdateKind::ClusterReplica(
442 cluster_replica,
443 ))
444 }
445 StateUpdateKind::Comment(key, value) => {
446 let comment = into_durable(key, value)?;
447 Some(memory::objects::StateUpdateKind::Comment(comment))
448 }
449 StateUpdateKind::Database(key, value) => {
450 let database = into_durable(key, value)?;
451 Some(memory::objects::StateUpdateKind::Database(database))
452 }
453 StateUpdateKind::DefaultPrivilege(key, value) => {
454 let default_privilege = into_durable(key, value)?;
455 Some(memory::objects::StateUpdateKind::DefaultPrivilege(
456 default_privilege,
457 ))
458 }
459 StateUpdateKind::Item(key, value) => {
460 let item = into_durable(key, value)?;
461 Some(memory::objects::StateUpdateKind::Item(item))
462 }
463 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
464 let introspection_source_index = into_durable(key, value)?;
465 Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
466 introspection_source_index,
467 ))
468 }
469 StateUpdateKind::NetworkPolicy(key, value) => {
470 let policy = into_durable(key, value)?;
471 Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
472 }
473 StateUpdateKind::Role(key, value) => {
474 let role = into_durable(key, value)?;
475 Some(memory::objects::StateUpdateKind::Role(role))
476 }
477 StateUpdateKind::RoleAuth(key, value) => {
478 let role_auth = into_durable(key, value)?;
479 Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
480 }
481 StateUpdateKind::Schema(key, value) => {
482 let schema = into_durable(key, value)?;
483 Some(memory::objects::StateUpdateKind::Schema(schema))
484 }
485 StateUpdateKind::SourceReferences(key, value) => {
486 let source_references = into_durable(key, value)?;
487 Some(memory::objects::StateUpdateKind::SourceReferences(
488 source_references,
489 ))
490 }
491 StateUpdateKind::StorageCollectionMetadata(key, value) => {
492 let storage_collection_metadata = into_durable(key, value)?;
493 Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
494 storage_collection_metadata,
495 ))
496 }
497 StateUpdateKind::SystemConfiguration(key, value) => {
498 let system_configuration = into_durable(key, value)?;
499 Some(memory::objects::StateUpdateKind::SystemConfiguration(
500 system_configuration,
501 ))
502 }
503 StateUpdateKind::SystemObjectMapping(key, value) => {
504 let system_object_mapping = into_durable(key, value)?;
505 Some(memory::objects::StateUpdateKind::SystemObjectMapping(
506 system_object_mapping,
507 ))
508 }
509 StateUpdateKind::SystemPrivilege(key, value) => {
510 let system_privilege = into_durable(key, value)?;
511 Some(memory::objects::StateUpdateKind::SystemPrivilege(
512 system_privilege,
513 ))
514 }
515 StateUpdateKind::UnfinalizedShard(key, value) => {
516 let unfinalized_shard = into_durable(key, value)?;
517 Some(memory::objects::StateUpdateKind::UnfinalizedShard(
518 unfinalized_shard,
519 ))
520 }
521 StateUpdateKind::Config(_, _)
523 | StateUpdateKind::FenceToken(_)
524 | StateUpdateKind::IdAllocator(_, _)
525 | StateUpdateKind::Setting(_, _)
526 | StateUpdateKind::TxnWalShard(_, _) => None,
527 })
528 }
529}
530
531impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
532 type Error = String;
533
534 fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
535 Ok(StateUpdate {
536 kind: TryInto::try_into(update.kind)?,
537 ts: update.ts,
538 diff: update.diff,
539 })
540 }
541}
542
543impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
544 type Error = String;
545
546 fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
547 let kind: proto::state_update_kind::Kind =
548 value.try_to_serde().map_err(|err| err.to_string())?;
549 let kind = proto::StateUpdateKind { kind: Some(kind) };
550 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
551 }
552}
553
554impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
555 type Error = String;
556
557 fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
558 let kind: proto::state_update_kind::Kind =
559 value.try_to_serde().map_err(|err| err.to_string())?;
560 let kind = proto::StateUpdateKind { kind: Some(kind) };
561 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
562 }
563}
564
565impl From<StateUpdateKind> for StateUpdateKindJson {
566 fn from(value: StateUpdateKind) -> Self {
567 let kind = value.into_proto_owned();
568 let kind = kind.kind.expect("kind should be set");
569 StateUpdateKindJson::from_serde(kind)
570 }
571}
572
573impl RustType<proto::StateUpdateKind> for StateUpdateKind {
577 fn into_proto(&self) -> proto::StateUpdateKind {
578 error!("unexpected clone of catalog data");
579 self.clone().into_proto_owned()
580 }
581
582 fn into_proto_owned(self) -> proto::StateUpdateKind {
583 proto::StateUpdateKind {
584 kind: Some(match self {
585 StateUpdateKind::AuditLog(key, _value) => {
586 proto::state_update_kind::Kind::AuditLog(proto::state_update_kind::AuditLog {
587 key: Some(key),
588 })
589 }
590 StateUpdateKind::Cluster(key, value) => {
591 proto::state_update_kind::Kind::Cluster(proto::state_update_kind::Cluster {
592 key: Some(key),
593 value: Some(value),
594 })
595 }
596 StateUpdateKind::ClusterReplica(key, value) => {
597 proto::state_update_kind::Kind::ClusterReplica(
598 proto::state_update_kind::ClusterReplica {
599 key: Some(key),
600 value: Some(value),
601 },
602 )
603 }
604 StateUpdateKind::Comment(key, value) => {
605 proto::state_update_kind::Kind::Comment(proto::state_update_kind::Comment {
606 key: Some(key),
607 value: Some(value),
608 })
609 }
610 StateUpdateKind::Config(key, value) => {
611 proto::state_update_kind::Kind::Config(proto::state_update_kind::Config {
612 key: Some(key),
613 value: Some(value),
614 })
615 }
616 StateUpdateKind::Database(key, value) => {
617 proto::state_update_kind::Kind::Database(proto::state_update_kind::Database {
618 key: Some(key),
619 value: Some(value),
620 })
621 }
622 StateUpdateKind::DefaultPrivilege(key, value) => {
623 proto::state_update_kind::Kind::DefaultPrivileges(
624 proto::state_update_kind::DefaultPrivileges {
625 key: Some(key),
626 value: Some(value),
627 },
628 )
629 }
630 StateUpdateKind::FenceToken(fence_token) => {
631 proto::state_update_kind::Kind::FenceToken(
632 proto::state_update_kind::FenceToken {
633 deploy_generation: fence_token.deploy_generation,
634 epoch: fence_token.epoch.get(),
635 },
636 )
637 }
638 StateUpdateKind::IdAllocator(key, value) => {
639 proto::state_update_kind::Kind::IdAlloc(proto::state_update_kind::IdAlloc {
640 key: Some(key),
641 value: Some(value),
642 })
643 }
644 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
645 proto::state_update_kind::Kind::ClusterIntrospectionSourceIndex(
646 proto::state_update_kind::ClusterIntrospectionSourceIndex {
647 key: Some(key),
648 value: Some(value),
649 },
650 )
651 }
652 StateUpdateKind::Item(key, value) => {
653 proto::state_update_kind::Kind::Item(proto::state_update_kind::Item {
654 key: Some(key),
655 value: Some(value),
656 })
657 }
658 StateUpdateKind::NetworkPolicy(key, value) => {
659 proto::state_update_kind::Kind::NetworkPolicy(
660 proto::state_update_kind::NetworkPolicy {
661 key: Some(key),
662 value: Some(value),
663 },
664 )
665 }
666 StateUpdateKind::Role(key, value) => {
667 proto::state_update_kind::Kind::Role(proto::state_update_kind::Role {
668 key: Some(key),
669 value: Some(value),
670 })
671 }
672 StateUpdateKind::RoleAuth(key, value) => {
673 proto::state_update_kind::Kind::RoleAuth(proto::state_update_kind::RoleAuth {
674 key: Some(key),
675 value: Some(value),
676 })
677 }
678 StateUpdateKind::Schema(key, value) => {
679 proto::state_update_kind::Kind::Schema(proto::state_update_kind::Schema {
680 key: Some(key),
681 value: Some(value),
682 })
683 }
684 StateUpdateKind::Setting(key, value) => {
685 proto::state_update_kind::Kind::Setting(proto::state_update_kind::Setting {
686 key: Some(key),
687 value: Some(value),
688 })
689 }
690 StateUpdateKind::SourceReferences(key, value) => {
691 proto::state_update_kind::Kind::SourceReferences(
692 proto::state_update_kind::SourceReferences {
693 key: Some(key),
694 value: Some(value),
695 },
696 )
697 }
698 StateUpdateKind::SystemConfiguration(key, value) => {
699 proto::state_update_kind::Kind::ServerConfiguration(
700 proto::state_update_kind::ServerConfiguration {
701 key: Some(key),
702 value: Some(value),
703 },
704 )
705 }
706 StateUpdateKind::SystemObjectMapping(key, value) => {
707 proto::state_update_kind::Kind::GidMapping(
708 proto::state_update_kind::GidMapping {
709 key: Some(key),
710 value: Some(value),
711 },
712 )
713 }
714 StateUpdateKind::SystemPrivilege(key, value) => {
715 proto::state_update_kind::Kind::SystemPrivileges(
716 proto::state_update_kind::SystemPrivileges {
717 key: Some(key),
718 value: Some(value),
719 },
720 )
721 }
722 StateUpdateKind::StorageCollectionMetadata(key, value) => {
723 proto::state_update_kind::Kind::StorageCollectionMetadata(
724 proto::state_update_kind::StorageCollectionMetadata {
725 key: Some(key),
726 value: Some(value),
727 },
728 )
729 }
730 StateUpdateKind::UnfinalizedShard(key, ()) => {
731 proto::state_update_kind::Kind::UnfinalizedShard(
732 proto::state_update_kind::UnfinalizedShard { key: Some(key) },
733 )
734 }
735 StateUpdateKind::TxnWalShard((), value) => {
736 proto::state_update_kind::Kind::TxnWalShard(
737 proto::state_update_kind::TxnWalShard { value: Some(value) },
738 )
739 }
740 }),
741 }
742 }
743
744 fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
745 Ok(
746 match proto
747 .kind
748 .ok_or_else(|| TryFromProtoError::missing_field("StateUpdateKind::kind"))?
749 {
750 proto::state_update_kind::Kind::AuditLog(proto::state_update_kind::AuditLog {
751 key,
752 }) => StateUpdateKind::AuditLog(
753 key.ok_or_else(|| {
754 TryFromProtoError::missing_field("state_update_kind::AuditLog::key")
755 })?,
756 (),
757 ),
758 proto::state_update_kind::Kind::Cluster(proto::state_update_kind::Cluster {
759 key,
760 value,
761 }) => StateUpdateKind::Cluster(
762 key.ok_or_else(|| {
763 TryFromProtoError::missing_field("state_update_kind::Cluster::key")
764 })?,
765 value.ok_or_else(|| {
766 TryFromProtoError::missing_field("state_update_kind::Cluster::value")
767 })?,
768 ),
769 proto::state_update_kind::Kind::ClusterReplica(
770 proto::state_update_kind::ClusterReplica { key, value },
771 ) => StateUpdateKind::ClusterReplica(
772 key.ok_or_else(|| {
773 TryFromProtoError::missing_field("state_update_kind::ClusterReplica::key")
774 })?,
775 value.ok_or_else(|| {
776 TryFromProtoError::missing_field("state_update_kind::ClusterReplica::value")
777 })?,
778 ),
779 proto::state_update_kind::Kind::Comment(proto::state_update_kind::Comment {
780 key,
781 value,
782 }) => StateUpdateKind::Comment(
783 key.ok_or_else(|| {
784 TryFromProtoError::missing_field("state_update_kind::Comment::key")
785 })?,
786 value.ok_or_else(|| {
787 TryFromProtoError::missing_field("state_update_kind::Comment::value")
788 })?,
789 ),
790 proto::state_update_kind::Kind::Config(proto::state_update_kind::Config {
791 key,
792 value,
793 }) => StateUpdateKind::Config(
794 key.ok_or_else(|| {
795 TryFromProtoError::missing_field("state_update_kind::Config::key")
796 })?,
797 value.ok_or_else(|| {
798 TryFromProtoError::missing_field("state_update_kind::Config::value")
799 })?,
800 ),
801 proto::state_update_kind::Kind::Database(proto::state_update_kind::Database {
802 key,
803 value,
804 }) => StateUpdateKind::Database(
805 key.ok_or_else(|| {
806 TryFromProtoError::missing_field("state_update_kind::Database::key")
807 })?,
808 value.ok_or_else(|| {
809 TryFromProtoError::missing_field("state_update_kind::Database::value")
810 })?,
811 ),
812 proto::state_update_kind::Kind::DefaultPrivileges(
813 proto::state_update_kind::DefaultPrivileges { key, value },
814 ) => StateUpdateKind::DefaultPrivilege(
815 key.ok_or_else(|| {
816 TryFromProtoError::missing_field(
817 "state_update_kind::DefaultPrivileges::key",
818 )
819 })?,
820 value.ok_or_else(|| {
821 TryFromProtoError::missing_field(
822 "state_update_kind::DefaultPrivileges::value",
823 )
824 })?,
825 ),
826 proto::state_update_kind::Kind::FenceToken(
827 proto::state_update_kind::FenceToken {
828 deploy_generation,
829 epoch,
830 },
831 ) => StateUpdateKind::FenceToken(FenceToken {
832 deploy_generation,
833 epoch: Epoch::new(epoch).ok_or_else(|| {
834 TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
835 })?,
836 }),
837 proto::state_update_kind::Kind::IdAlloc(proto::state_update_kind::IdAlloc {
838 key,
839 value,
840 }) => StateUpdateKind::IdAllocator(
841 key.ok_or_else(|| {
842 TryFromProtoError::missing_field("state_update_kind::IdAlloc::key")
843 })?,
844 value.ok_or_else(|| {
845 TryFromProtoError::missing_field("state_update_kind::IdAlloc::value")
846 })?,
847 ),
848 proto::state_update_kind::Kind::ClusterIntrospectionSourceIndex(
849 proto::state_update_kind::ClusterIntrospectionSourceIndex { key, value },
850 ) => StateUpdateKind::IntrospectionSourceIndex(
851 key.ok_or_else(|| {
852 TryFromProtoError::missing_field(
853 "state_update_kind::ClusterIntrospectionSourceIndex::key",
854 )
855 })?,
856 value.ok_or_else(|| {
857 TryFromProtoError::missing_field(
858 "state_update_kind::ClusterIntrospectionSourceIndex::value",
859 )
860 })?,
861 ),
862 proto::state_update_kind::Kind::Item(proto::state_update_kind::Item {
863 key,
864 value,
865 }) => StateUpdateKind::Item(
866 key.ok_or_else(|| {
867 TryFromProtoError::missing_field("state_update_kind::Item::key")
868 })?,
869 value.ok_or_else(|| {
870 TryFromProtoError::missing_field("state_update_kind::Item::value")
871 })?,
872 ),
873 proto::state_update_kind::Kind::Role(proto::state_update_kind::Role {
874 key,
875 value,
876 }) => StateUpdateKind::Role(
877 key.ok_or_else(|| {
878 TryFromProtoError::missing_field("state_update_kind::Role::key")
879 })?,
880 value.ok_or_else(|| {
881 TryFromProtoError::missing_field("state_update_kind::Role::value")
882 })?,
883 ),
884 proto::state_update_kind::Kind::RoleAuth(proto::state_update_kind::RoleAuth {
885 key,
886 value,
887 }) => StateUpdateKind::RoleAuth(
888 key.ok_or_else(|| {
889 TryFromProtoError::missing_field("state_update_kind::RoleAuth::key")
890 })?,
891 value.ok_or_else(|| {
892 TryFromProtoError::missing_field("state_update_kind::RoleAuth::value")
893 })?,
894 ),
895 proto::state_update_kind::Kind::Schema(proto::state_update_kind::Schema {
896 key,
897 value,
898 }) => StateUpdateKind::Schema(
899 key.ok_or_else(|| {
900 TryFromProtoError::missing_field("state_update_kind::Schema::key")
901 })?,
902 value.ok_or_else(|| {
903 TryFromProtoError::missing_field("state_update_kind::Schema::value")
904 })?,
905 ),
906 proto::state_update_kind::Kind::Setting(proto::state_update_kind::Setting {
907 key,
908 value,
909 }) => StateUpdateKind::Setting(
910 key.ok_or_else(|| {
911 TryFromProtoError::missing_field("state_update_kind::Setting::key")
912 })?,
913 value.ok_or_else(|| {
914 TryFromProtoError::missing_field("state_update_kind::Setting::value")
915 })?,
916 ),
917 proto::state_update_kind::Kind::ServerConfiguration(
918 proto::state_update_kind::ServerConfiguration { key, value },
919 ) => StateUpdateKind::SystemConfiguration(
920 key.ok_or_else(|| {
921 TryFromProtoError::missing_field(
922 "state_update_kind::ServerConfiguration::key",
923 )
924 })?,
925 value.ok_or_else(|| {
926 TryFromProtoError::missing_field(
927 "state_update_kind::ServerConfiguration::value",
928 )
929 })?,
930 ),
931 proto::state_update_kind::Kind::GidMapping(
932 proto::state_update_kind::GidMapping { key, value },
933 ) => StateUpdateKind::SystemObjectMapping(
934 key.ok_or_else(|| {
935 TryFromProtoError::missing_field("state_update_kind::GidMapping::key")
936 })?,
937 value.ok_or_else(|| {
938 TryFromProtoError::missing_field("state_update_kind::GidMapping::value")
939 })?,
940 ),
941 proto::state_update_kind::Kind::SystemPrivileges(
942 proto::state_update_kind::SystemPrivileges { key, value },
943 ) => StateUpdateKind::SystemPrivilege(
944 key.ok_or_else(|| {
945 TryFromProtoError::missing_field("state_update_kind::SystemPrivileges::key")
946 })?,
947 value.ok_or_else(|| {
948 TryFromProtoError::missing_field(
949 "state_update_kind::SystemPrivileges::value",
950 )
951 })?,
952 ),
953 proto::state_update_kind::Kind::StorageCollectionMetadata(
954 proto::state_update_kind::StorageCollectionMetadata { key, value },
955 ) => StateUpdateKind::StorageCollectionMetadata(
956 key.ok_or_else(|| {
957 TryFromProtoError::missing_field(
958 "state_update_kind::StorageCollectionMetadata::key",
959 )
960 })?,
961 value.ok_or_else(|| {
962 TryFromProtoError::missing_field(
963 "state_update_kind::StorageCollectionMetadata::value",
964 )
965 })?,
966 ),
967 proto::state_update_kind::Kind::UnfinalizedShard(
968 proto::state_update_kind::UnfinalizedShard { key },
969 ) => StateUpdateKind::UnfinalizedShard(
970 key.ok_or_else(|| {
971 TryFromProtoError::missing_field(
972 "state_update_kind::StorageCollectionMetadata::key",
973 )
974 })?,
975 (),
976 ),
977 proto::state_update_kind::Kind::TxnWalShard(
978 proto::state_update_kind::TxnWalShard { value },
979 ) => StateUpdateKind::TxnWalShard(
980 (),
981 value.ok_or_else(|| {
982 TryFromProtoError::missing_field("state_update_kind::TxnWalShard::value")
983 })?,
984 ),
985 proto::state_update_kind::Kind::SourceReferences(
986 proto::state_update_kind::SourceReferences { key, value },
987 ) => StateUpdateKind::SourceReferences(
988 key.ok_or_else(|| {
989 TryFromProtoError::missing_field("state_update_kind::SourceReferences::key")
990 })?,
991 value.ok_or_else(|| {
992 TryFromProtoError::missing_field(
993 "state_update_kind::SourceReferences::value",
994 )
995 })?,
996 ),
997 proto::state_update_kind::Kind::NetworkPolicy(
998 proto::state_update_kind::NetworkPolicy { key, value },
999 ) => StateUpdateKind::NetworkPolicy(
1000 key.ok_or_else(|| {
1001 TryFromProtoError::missing_field("state_update_kind::NetworkPolicy::key")
1002 })?,
1003 value.ok_or_else(|| {
1004 TryFromProtoError::missing_field("state_update_kind::NetworkPolicy::value")
1005 })?,
1006 ),
1007 },
1008 )
1009 }
1010}
1011
1012impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
1015 fn from(kvtd: PersistStateUpdate) -> Self {
1016 let ((key, ()), ts, diff) = kvtd;
1017 StateUpdate {
1018 kind: StateUpdateKindJson::from(key),
1019 ts,
1020 diff: diff.into(),
1021 }
1022 }
1023}
1024
1025impl From<StateUpdateKindJson> for SourceData {
1026 fn from(value: StateUpdateKindJson) -> SourceData {
1027 let row = value.0.into_row();
1028 SourceData(Ok(row))
1029 }
1030}
1031
1032impl From<SourceData> for StateUpdateKindJson {
1033 fn from(value: SourceData) -> Self {
1034 let row = value.0.expect("only Ok values stored in catalog shard");
1035 StateUpdateKindJson(Jsonb::from_row(row))
1036 }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use mz_persist_types::Codec;
1042 use mz_repr::{RelationDesc, SqlScalarType};
1043 use mz_storage_types::sources::SourceData;
1044 use proptest::prelude::*;
1045
1046 use crate::durable::Epoch;
1047 use crate::durable::objects::FenceToken;
1048 use crate::durable::objects::serialization::proto;
1049 use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
1050
1051 #[mz_ore::test]
1052 #[cfg_attr(miri, ignore)]
1053 fn kind_test() {
1054 let test_cases = [
1055 (
1056 StateUpdateKind::FenceToken(FenceToken {
1057 deploy_generation: 1,
1058 epoch: Epoch::new(1).expect("non-zero"),
1059 }),
1060 "FenceToken",
1061 ),
1062 (
1063 StateUpdateKind::Config(
1064 proto::ConfigKey { key: String::new() },
1065 proto::ConfigValue { value: 1 },
1066 ),
1067 "Config",
1068 ),
1069 (
1070 StateUpdateKind::Setting(
1071 proto::SettingKey {
1072 name: String::new(),
1073 },
1074 proto::SettingValue {
1075 value: String::new(),
1076 },
1077 ),
1078 "Setting",
1079 ),
1080 (
1081 StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
1082 "AuditLog",
1083 ),
1084 ];
1085
1086 for (kind, expected) in test_cases {
1087 let json_kind: StateUpdateKindJson = kind.into();
1088 let kind = json_kind.kind().to_string();
1089 assert_eq!(expected, kind);
1090 }
1091 }
1092
1093 #[mz_ore::test]
1094 #[cfg_attr(miri, ignore)]
1095 fn audit_log_id_test() {
1096 let test_cases = [
1097 (
1098 StateUpdateKind::AuditLog(
1099 proto::AuditLogKey {
1100 event: Some(proto::audit_log_key::Event::V1(proto::AuditLogEventV1 {
1101 id: 1,
1102 event_type: 2,
1103 object_type: 3,
1104 user: None,
1105 occurred_at: None,
1106 details: None,
1107 })),
1108 },
1109 (),
1110 ),
1111 1,
1112 ),
1113 (
1114 StateUpdateKind::AuditLog(
1115 proto::AuditLogKey {
1116 event: Some(proto::audit_log_key::Event::V1(proto::AuditLogEventV1 {
1117 id: 4,
1118 event_type: 5,
1119 object_type: 6,
1120 user: None,
1121 occurred_at: None,
1122 details: None,
1123 })),
1124 },
1125 (),
1126 ),
1127 4,
1128 ),
1129 ];
1130
1131 for (kind, expected) in test_cases {
1132 let json_kind: StateUpdateKindJson = kind.into();
1133 let id = json_kind.audit_log_id();
1134 assert_eq!(expected, id);
1135 }
1136 }
1137
1138 proptest! {
1139 #[mz_ore::test]
1140 #[cfg_attr(miri, ignore)] fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
1142 let raw = StateUpdateKindJson::from(kind.clone());
1145 let desc = RelationDesc::builder().with_column("a", SqlScalarType::Jsonb.nullable(false)).finish();
1146
1147 let source_data = SourceData::from(raw.clone());
1149 let mut encoded = Vec::new();
1150 source_data.encode(&mut encoded);
1151 let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
1152 prop_assert_eq!(&source_data, &decoded);
1153 let decoded = StateUpdateKindJson::from(decoded);
1154 prop_assert_eq!(&raw, &decoded);
1155
1156 let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
1158 prop_assert_eq!(&kind, &decoded);
1159 }
1160 }
1161}