1use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46use proptest_derive::Arbitrary;
47use tracing::error;
48
49use crate::durable::debug::CollectionType;
50use crate::durable::objects::serialization::proto;
51use crate::durable::objects::{DurableType, FenceToken};
52use crate::durable::persist::Timestamp;
53use crate::durable::transaction::TransactionBatch;
54use crate::durable::{DurableCatalogError, Epoch};
55use crate::memory;
56
57pub trait IntoStateUpdateKindJson:
59 Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
60{
61 type Error: Debug;
62
63 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
64}
65impl<
66 T: Into<StateUpdateKindJson>
67 + TryFrom<StateUpdateKindJson>
68 + PartialEq
69 + Eq
70 + PartialOrd
71 + Ord
72 + Debug
73 + Clone,
74> IntoStateUpdateKindJson for T
75where
76 T::Error: Debug,
77{
78 type Error = T::Error;
79
80 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
81 <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
82 }
83}
84
85pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
87 type Error: Debug;
88
89 fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
90}
91impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
92where
93 <T as TryInto<StateUpdateKind>>::Error: Debug,
94{
95 type Error = <T as TryInto<StateUpdateKind>>::Error;
96
97 fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
98 <T as TryInto<StateUpdateKind>>::try_into(self)
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
104pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
105 pub kind: T,
107 pub ts: Timestamp,
109 pub diff: Diff,
111}
112
113impl StateUpdate {
114 pub(crate) fn from_txn_batch_ts(
116 txn_batch: TransactionBatch,
117 ts: Timestamp,
118 ) -> impl Iterator<Item = StateUpdate> {
119 Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
120 }
121
122 pub(crate) fn from_txn_batch(
124 txn_batch: TransactionBatch,
125 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
126 fn from_batch<K, V>(
127 batch: Vec<(K, V, Diff)>,
128 kind: fn(K, V) -> StateUpdateKind,
129 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
130 batch
131 .into_iter()
132 .map(move |(k, v, diff)| (kind(k, v), diff))
133 }
134 let TransactionBatch {
135 databases,
136 schemas,
137 items,
138 comments,
139 roles,
140 role_auth,
141 clusters,
142 cluster_replicas,
143 network_policies,
144 introspection_sources,
145 id_allocator,
146 configs,
147 settings,
148 source_references,
149 system_gid_mapping,
150 system_configurations,
151 default_privileges,
152 system_privileges,
153 storage_collection_metadata,
154 unfinalized_shards,
155 txn_wal_shard,
156 audit_log_updates,
157 upper: _,
158 } = txn_batch;
159 let databases = from_batch(databases, StateUpdateKind::Database);
160 let schemas = from_batch(schemas, StateUpdateKind::Schema);
161 let items = from_batch(items, StateUpdateKind::Item);
162 let comments = from_batch(comments, StateUpdateKind::Comment);
163 let roles = from_batch(roles, StateUpdateKind::Role);
164 let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
165 let clusters = from_batch(clusters, StateUpdateKind::Cluster);
166 let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
167 let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
168 let introspection_sources = from_batch(
169 introspection_sources,
170 StateUpdateKind::IntrospectionSourceIndex,
171 );
172 let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
173 let configs = from_batch(configs, StateUpdateKind::Config);
174 let settings = from_batch(settings, StateUpdateKind::Setting);
175 let system_object_mappings =
176 from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
177 let system_configurations =
178 from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
179 let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
180 let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
181 let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
182 let storage_collection_metadata = from_batch(
183 storage_collection_metadata,
184 StateUpdateKind::StorageCollectionMetadata,
185 );
186 let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
187 let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
188 let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
189
190 databases
191 .chain(schemas)
192 .chain(items)
193 .chain(comments)
194 .chain(roles)
195 .chain(role_auth)
196 .chain(clusters)
197 .chain(cluster_replicas)
198 .chain(network_policies)
199 .chain(introspection_sources)
200 .chain(id_allocators)
201 .chain(configs)
202 .chain(settings)
203 .chain(source_references)
204 .chain(system_object_mappings)
205 .chain(system_configurations)
206 .chain(default_privileges)
207 .chain(system_privileges)
208 .chain(storage_collection_metadata)
209 .chain(unfinalized_shards)
210 .chain(txn_wal_shard)
211 .chain(audit_logs)
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Arbitrary)]
220pub enum StateUpdateKind {
221 AuditLog(proto::AuditLogKey, ()),
222 Cluster(proto::ClusterKey, proto::ClusterValue),
223 ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
224 Comment(proto::CommentKey, proto::CommentValue),
225 Config(proto::ConfigKey, proto::ConfigValue),
226 Database(proto::DatabaseKey, proto::DatabaseValue),
227 DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
228 FenceToken(FenceToken),
229 IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
230 IntrospectionSourceIndex(
231 proto::ClusterIntrospectionSourceIndexKey,
232 proto::ClusterIntrospectionSourceIndexValue,
233 ),
234 Item(proto::ItemKey, proto::ItemValue),
235 NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
236 Role(proto::RoleKey, proto::RoleValue),
237 RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
238 Schema(proto::SchemaKey, proto::SchemaValue),
239 Setting(proto::SettingKey, proto::SettingValue),
240 SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
241 SystemConfiguration(
242 proto::ServerConfigurationKey,
243 proto::ServerConfigurationValue,
244 ),
245 SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
246 SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
247 StorageCollectionMetadata(
248 proto::StorageCollectionMetadataKey,
249 proto::StorageCollectionMetadataValue,
250 ),
251 UnfinalizedShard(proto::UnfinalizedShardKey, ()),
252 TxnWalShard((), proto::TxnWalShardValue),
253}
254
255impl StateUpdateKind {
256 pub(crate) fn collection_type(&self) -> Option<CollectionType> {
257 match self {
258 StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
259 StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
260 StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
261 StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
262 StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
263 StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
264 StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
265 StateUpdateKind::FenceToken(_) => None,
266 StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
267 StateUpdateKind::IntrospectionSourceIndex(_, _) => {
268 Some(CollectionType::ComputeIntrospectionSourceIndex)
269 }
270 StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
271 StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
272 StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
273 StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
274 StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
275 StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
276 StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
277 StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
278 StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
279 StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
280 StateUpdateKind::StorageCollectionMetadata(_, _) => {
281 Some(CollectionType::StorageCollectionMetadata)
282 }
283 StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
284 StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
285 }
286 }
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
291pub struct StateUpdateKindJson(Jsonb);
292
293impl StateUpdateKindJson {
294 pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
295 let serde_value = serde_json::to_value(s).expect("valid json");
296 let row = Jsonb::from_serde_json(serde_value).expect("valid json");
297 StateUpdateKindJson(row)
298 }
299
300 pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
301 self.try_to_serde().expect("jsonb should roundtrip")
302 }
303
304 pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
305 &self,
306 ) -> Result<D, serde_json::error::Error> {
307 let serde_value = self.0.as_ref().to_serde_json();
308 serde_json::from_value::<D>(serde_value)
309 }
310
311 fn kind(&self) -> &str {
312 let row = self.0.row();
313 let mut iter = row.unpack_first().unwrap_map().iter();
314 let datum = iter
315 .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
316 .expect("kind field must exist");
317 datum.unwrap_str()
318 }
319
320 pub(crate) fn audit_log_id(&self) -> u64 {
321 assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
322 let row = self.0.row();
323 let mut iter = row.unpack_first().unwrap_map().iter();
324 let key = iter
325 .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
326 .expect("key field must exist")
327 .unwrap_map();
328 let event = key
329 .iter()
330 .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
331 .expect("event field must exist")
332 .unwrap_map();
333 let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
334 match event_version {
335 "V1" => {
336 let versioned_map = versioned_datum.unwrap_map();
337 let id = versioned_map
338 .iter()
339 .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
340 .expect("event field must exist")
341 .unwrap_numeric();
342 let mut cx = Numeric::context();
343 cx.try_into_u64(id.into_inner()).expect("invalid id")
344 }
345 version => unimplemented!("unsupported event version: {version}"),
346 }
347 }
348
349 pub(crate) fn is_always_deserializable(&self) -> bool {
351 static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
354 [
355 StateUpdateKind::FenceToken(FenceToken {
356 deploy_generation: 1,
357 epoch: Epoch::new(1).expect("non-zero"),
358 }),
359 StateUpdateKind::Config(
360 proto::ConfigKey { key: String::new() },
361 proto::ConfigValue { value: 1 },
362 ),
363 StateUpdateKind::Setting(
364 proto::SettingKey {
365 name: String::new(),
366 },
367 proto::SettingValue {
368 value: String::new(),
369 },
370 ),
371 StateUpdateKind::AuditLog(
372 proto::AuditLogKey {
373 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
374 id: 1,
375 event_type: proto::audit_log_event_v1::EventType::Create,
376 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
377 user: None,
378 occurred_at: proto::EpochMillis { millis: 1 },
379 details: proto::audit_log_event_v1::Details::ResetAllV1(
380 proto::Empty {},
381 ),
382 }),
383 },
384 (),
385 ),
386 ]
387 .into_iter()
388 .map(|kind| {
389 let json_kind: StateUpdateKindJson = kind.into();
390 json_kind.kind().to_string()
391 })
392 .collect()
393 });
394 DESERIALIZABLE_KINDS.contains(self.kind())
395 }
396
397 pub(crate) fn is_audit_log(&self) -> bool {
399 static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
402 let audit_log = StateUpdateKind::AuditLog(
403 proto::AuditLogKey {
404 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
405 id: 1,
406 event_type: proto::audit_log_event_v1::EventType::Create,
407 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
408 user: None,
409 occurred_at: proto::EpochMillis { millis: 1 },
410 details: proto::audit_log_event_v1::Details::ResetAllV1(proto::Empty {}),
411 }),
412 },
413 (),
414 );
415 let json_kind: StateUpdateKindJson = audit_log.into();
416 json_kind.kind().to_string()
417 });
418 &*AUDIT_LOG_KIND == self.kind()
419 }
420}
421
422type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff);
424
425impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
426 type Error = DurableCatalogError;
427
428 fn try_from(
429 StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
430 ) -> Result<Self, Self::Error> {
431 let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
432 let update = kind.map(|kind| memory::objects::StateUpdate {
433 kind,
434 ts: ts.clone(),
435 diff: diff.clone().try_into().expect("invalid diff"),
436 });
437 Ok(update)
438 }
439}
440
441impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
442 type Error = DurableCatalogError;
443
444 fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
445 fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
446 where
447 PK: ProtoType<T::Key> + Clone,
448 PV: ProtoType<T::Value> + Clone,
449 T: DurableType,
450 {
451 let key = key.clone().into_rust()?;
452 let value = value.clone().into_rust()?;
453 Ok(T::from_key_value(key, value))
454 }
455
456 Ok(match kind {
457 StateUpdateKind::AuditLog(key, value) => {
458 let audit_log = into_durable(key, value)?;
459 Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
460 }
461 StateUpdateKind::Cluster(key, value) => {
462 let cluster = into_durable(key, value)?;
463 Some(memory::objects::StateUpdateKind::Cluster(cluster))
464 }
465 StateUpdateKind::ClusterReplica(key, value) => {
466 let cluster_replica = into_durable(key, value)?;
467 Some(memory::objects::StateUpdateKind::ClusterReplica(
468 cluster_replica,
469 ))
470 }
471 StateUpdateKind::Comment(key, value) => {
472 let comment = into_durable(key, value)?;
473 Some(memory::objects::StateUpdateKind::Comment(comment))
474 }
475 StateUpdateKind::Database(key, value) => {
476 let database = into_durable(key, value)?;
477 Some(memory::objects::StateUpdateKind::Database(database))
478 }
479 StateUpdateKind::DefaultPrivilege(key, value) => {
480 let default_privilege = into_durable(key, value)?;
481 Some(memory::objects::StateUpdateKind::DefaultPrivilege(
482 default_privilege,
483 ))
484 }
485 StateUpdateKind::Item(key, value) => {
486 let item = into_durable(key, value)?;
487 Some(memory::objects::StateUpdateKind::Item(item))
488 }
489 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
490 let introspection_source_index = into_durable(key, value)?;
491 Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
492 introspection_source_index,
493 ))
494 }
495 StateUpdateKind::NetworkPolicy(key, value) => {
496 let policy = into_durable(key, value)?;
497 Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
498 }
499 StateUpdateKind::Role(key, value) => {
500 let role = into_durable(key, value)?;
501 Some(memory::objects::StateUpdateKind::Role(role))
502 }
503 StateUpdateKind::RoleAuth(key, value) => {
504 let role_auth = into_durable(key, value)?;
505 Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
506 }
507 StateUpdateKind::Schema(key, value) => {
508 let schema = into_durable(key, value)?;
509 Some(memory::objects::StateUpdateKind::Schema(schema))
510 }
511 StateUpdateKind::SourceReferences(key, value) => {
512 let source_references = into_durable(key, value)?;
513 Some(memory::objects::StateUpdateKind::SourceReferences(
514 source_references,
515 ))
516 }
517 StateUpdateKind::StorageCollectionMetadata(key, value) => {
518 let storage_collection_metadata = into_durable(key, value)?;
519 Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
520 storage_collection_metadata,
521 ))
522 }
523 StateUpdateKind::SystemConfiguration(key, value) => {
524 let system_configuration = into_durable(key, value)?;
525 Some(memory::objects::StateUpdateKind::SystemConfiguration(
526 system_configuration,
527 ))
528 }
529 StateUpdateKind::SystemObjectMapping(key, value) => {
530 let system_object_mapping = into_durable(key, value)?;
531 Some(memory::objects::StateUpdateKind::SystemObjectMapping(
532 system_object_mapping,
533 ))
534 }
535 StateUpdateKind::SystemPrivilege(key, value) => {
536 let system_privilege = into_durable(key, value)?;
537 Some(memory::objects::StateUpdateKind::SystemPrivilege(
538 system_privilege,
539 ))
540 }
541 StateUpdateKind::UnfinalizedShard(key, value) => {
542 let unfinalized_shard = into_durable(key, value)?;
543 Some(memory::objects::StateUpdateKind::UnfinalizedShard(
544 unfinalized_shard,
545 ))
546 }
547 StateUpdateKind::Config(_, _)
549 | StateUpdateKind::FenceToken(_)
550 | StateUpdateKind::IdAllocator(_, _)
551 | StateUpdateKind::Setting(_, _)
552 | StateUpdateKind::TxnWalShard(_, _) => None,
553 })
554 }
555}
556
557impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
558 type Error = String;
559
560 fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
561 Ok(StateUpdate {
562 kind: TryInto::try_into(update.kind)?,
563 ts: update.ts,
564 diff: update.diff,
565 })
566 }
567}
568
569impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
570 type Error = String;
571
572 fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
573 let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
574 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
575 }
576}
577
578impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
579 type Error = String;
580
581 fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
582 let kind: proto::StateUpdateKind = value.try_to_serde().map_err(|err| err.to_string())?;
583 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
584 }
585}
586
587impl From<StateUpdateKind> for StateUpdateKindJson {
588 fn from(value: StateUpdateKind) -> Self {
589 let kind = value.into_proto_owned();
590 StateUpdateKindJson::from_serde(kind)
591 }
592}
593
594impl RustType<proto::StateUpdateKind> for StateUpdateKind {
598 fn into_proto(&self) -> proto::StateUpdateKind {
599 error!("unexpected clone of catalog data");
600 self.clone().into_proto_owned()
601 }
602
603 fn into_proto_owned(self) -> proto::StateUpdateKind {
604 match self {
605 StateUpdateKind::AuditLog(key, ()) => {
606 proto::StateUpdateKind::AuditLog(proto::AuditLog { key })
607 }
608 StateUpdateKind::Cluster(key, value) => {
609 proto::StateUpdateKind::Cluster(proto::Cluster { key, value })
610 }
611 StateUpdateKind::ClusterReplica(key, value) => {
612 proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value })
613 }
614 StateUpdateKind::Comment(key, value) => {
615 proto::StateUpdateKind::Comment(proto::Comment { key, value })
616 }
617 StateUpdateKind::Config(key, value) => {
618 proto::StateUpdateKind::Config(proto::Config { key, value })
619 }
620 StateUpdateKind::Database(key, value) => {
621 proto::StateUpdateKind::Database(proto::Database { key, value })
622 }
623 StateUpdateKind::DefaultPrivilege(key, value) => {
624 proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value })
625 }
626 StateUpdateKind::FenceToken(fence_token) => {
627 proto::StateUpdateKind::FenceToken(proto::FenceToken {
628 deploy_generation: fence_token.deploy_generation,
629 epoch: fence_token.epoch.get(),
630 })
631 }
632 StateUpdateKind::IdAllocator(key, value) => {
633 proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value })
634 }
635 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
636 proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
637 proto::ClusterIntrospectionSourceIndex { key, value },
638 )
639 }
640 StateUpdateKind::Item(key, value) => {
641 proto::StateUpdateKind::Item(proto::Item { key, value })
642 }
643 StateUpdateKind::NetworkPolicy(key, value) => {
644 proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value })
645 }
646 StateUpdateKind::Role(key, value) => {
647 proto::StateUpdateKind::Role(proto::Role { key, value })
648 }
649 StateUpdateKind::RoleAuth(key, value) => {
650 proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value })
651 }
652 StateUpdateKind::Schema(key, value) => {
653 proto::StateUpdateKind::Schema(proto::Schema { key, value })
654 }
655 StateUpdateKind::Setting(key, value) => {
656 proto::StateUpdateKind::Setting(proto::Setting { key, value })
657 }
658 StateUpdateKind::SourceReferences(key, value) => {
659 proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value })
660 }
661 StateUpdateKind::SystemConfiguration(key, value) => {
662 proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
663 key,
664 value,
665 })
666 }
667 StateUpdateKind::SystemObjectMapping(key, value) => {
668 proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value })
669 }
670 StateUpdateKind::SystemPrivilege(key, value) => {
671 proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value })
672 }
673 StateUpdateKind::StorageCollectionMetadata(key, value) => {
674 proto::StateUpdateKind::StorageCollectionMetadata(
675 proto::StorageCollectionMetadata { key, value },
676 )
677 }
678 StateUpdateKind::UnfinalizedShard(key, ()) => {
679 proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key })
680 }
681 StateUpdateKind::TxnWalShard((), value) => {
682 proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value })
683 }
684 }
685 }
686
687 fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
688 Ok(match proto {
689 proto::StateUpdateKind::AuditLog(proto::AuditLog { key }) => {
690 StateUpdateKind::AuditLog(key, ())
691 }
692 proto::StateUpdateKind::Cluster(proto::Cluster { key, value }) => {
693 StateUpdateKind::Cluster(key, value)
694 }
695 proto::StateUpdateKind::ClusterReplica(proto::ClusterReplica { key, value }) => {
696 StateUpdateKind::ClusterReplica(key, value)
697 }
698 proto::StateUpdateKind::Comment(proto::Comment { key, value }) => {
699 StateUpdateKind::Comment(key, value)
700 }
701 proto::StateUpdateKind::Config(proto::Config { key, value }) => {
702 StateUpdateKind::Config(key, value)
703 }
704 proto::StateUpdateKind::Database(proto::Database { key, value }) => {
705 StateUpdateKind::Database(key, value)
706 }
707 proto::StateUpdateKind::DefaultPrivileges(proto::DefaultPrivileges { key, value }) => {
708 StateUpdateKind::DefaultPrivilege(key, value)
709 }
710 proto::StateUpdateKind::FenceToken(proto::FenceToken {
711 deploy_generation,
712 epoch,
713 }) => StateUpdateKind::FenceToken(FenceToken {
714 deploy_generation,
715 epoch: Epoch::new(epoch).ok_or_else(|| {
716 TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
717 })?,
718 }),
719 proto::StateUpdateKind::IdAlloc(proto::IdAlloc { key, value }) => {
720 StateUpdateKind::IdAllocator(key, value)
721 }
722 proto::StateUpdateKind::ClusterIntrospectionSourceIndex(
723 proto::ClusterIntrospectionSourceIndex { key, value },
724 ) => StateUpdateKind::IntrospectionSourceIndex(key, value),
725 proto::StateUpdateKind::Item(proto::Item { key, value }) => {
726 StateUpdateKind::Item(key, value)
727 }
728 proto::StateUpdateKind::Role(proto::Role { key, value }) => {
729 StateUpdateKind::Role(key, value)
730 }
731 proto::StateUpdateKind::RoleAuth(proto::RoleAuth { key, value }) => {
732 StateUpdateKind::RoleAuth(key, value)
733 }
734 proto::StateUpdateKind::Schema(proto::Schema { key, value }) => {
735 StateUpdateKind::Schema(key, value)
736 }
737 proto::StateUpdateKind::Setting(proto::Setting { key, value }) => {
738 StateUpdateKind::Setting(key, value)
739 }
740 proto::StateUpdateKind::ServerConfiguration(proto::ServerConfiguration {
741 key,
742 value,
743 }) => StateUpdateKind::SystemConfiguration(key, value),
744 proto::StateUpdateKind::GidMapping(proto::GidMapping { key, value }) => {
745 StateUpdateKind::SystemObjectMapping(key, value)
746 }
747 proto::StateUpdateKind::SystemPrivileges(proto::SystemPrivileges { key, value }) => {
748 StateUpdateKind::SystemPrivilege(key, value)
749 }
750 proto::StateUpdateKind::StorageCollectionMetadata(
751 proto::StorageCollectionMetadata { key, value },
752 ) => StateUpdateKind::StorageCollectionMetadata(key, value),
753 proto::StateUpdateKind::UnfinalizedShard(proto::UnfinalizedShard { key }) => {
754 StateUpdateKind::UnfinalizedShard(key, ())
755 }
756 proto::StateUpdateKind::TxnWalShard(proto::TxnWalShard { value }) => {
757 StateUpdateKind::TxnWalShard((), value)
758 }
759 proto::StateUpdateKind::SourceReferences(proto::SourceReferences { key, value }) => {
760 StateUpdateKind::SourceReferences(key, value)
761 }
762 proto::StateUpdateKind::NetworkPolicy(proto::NetworkPolicy { key, value }) => {
763 StateUpdateKind::NetworkPolicy(key, value)
764 }
765 })
766 }
767}
768
769impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
772 fn from(kvtd: PersistStateUpdate) -> Self {
773 let ((key, ()), ts, diff) = kvtd;
774 StateUpdate {
775 kind: StateUpdateKindJson::from(key),
776 ts,
777 diff: diff.into(),
778 }
779 }
780}
781
782impl From<StateUpdateKindJson> for SourceData {
783 fn from(value: StateUpdateKindJson) -> SourceData {
784 let row = value.0.into_row();
785 SourceData(Ok(row))
786 }
787}
788
789impl From<SourceData> for StateUpdateKindJson {
790 fn from(value: SourceData) -> Self {
791 let row = value.0.expect("only Ok values stored in catalog shard");
792 StateUpdateKindJson(Jsonb::from_row(row))
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use mz_persist_types::Codec;
799 use mz_repr::{RelationDesc, SqlScalarType};
800 use mz_storage_types::sources::SourceData;
801 use proptest::prelude::*;
802
803 use crate::durable::Epoch;
804 use crate::durable::objects::FenceToken;
805 use crate::durable::objects::serialization::proto;
806 use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
807
808 #[mz_ore::test]
809 #[cfg_attr(miri, ignore)]
810 fn kind_test() {
811 let test_cases = [
812 (
813 StateUpdateKind::FenceToken(FenceToken {
814 deploy_generation: 1,
815 epoch: Epoch::new(1).expect("non-zero"),
816 }),
817 "FenceToken",
818 ),
819 (
820 StateUpdateKind::Config(
821 proto::ConfigKey { key: String::new() },
822 proto::ConfigValue { value: 1 },
823 ),
824 "Config",
825 ),
826 (
827 StateUpdateKind::Setting(
828 proto::SettingKey {
829 name: String::new(),
830 },
831 proto::SettingValue {
832 value: String::new(),
833 },
834 ),
835 "Setting",
836 ),
837 (
838 StateUpdateKind::AuditLog(
839 proto::AuditLogKey {
840 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
841 id: 1,
842 event_type: proto::audit_log_event_v1::EventType::Create,
843 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
844 user: None,
845 occurred_at: proto::EpochMillis { millis: 4 },
846 details: proto::audit_log_event_v1::Details::ResetAllV1(
847 proto::Empty {},
848 ),
849 }),
850 },
851 (),
852 ),
853 "AuditLog",
854 ),
855 ];
856
857 for (kind, expected) in test_cases {
858 let json_kind: StateUpdateKindJson = kind.into();
859 let kind = json_kind.kind().to_string();
860 assert_eq!(expected, kind);
861 }
862 }
863
864 #[mz_ore::test]
865 #[cfg_attr(miri, ignore)]
866 fn audit_log_id_test() {
867 let test_cases = [
868 (
869 StateUpdateKind::AuditLog(
870 proto::AuditLogKey {
871 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
872 id: 1,
873 event_type: proto::audit_log_event_v1::EventType::Create,
874 object_type: proto::audit_log_event_v1::ObjectType::Cluster,
875 user: None,
876 occurred_at: proto::EpochMillis { millis: 4 },
877 details: proto::audit_log_event_v1::Details::ResetAllV1(
878 proto::Empty {},
879 ),
880 }),
881 },
882 (),
883 ),
884 1,
885 ),
886 (
887 StateUpdateKind::AuditLog(
888 proto::AuditLogKey {
889 event: proto::AuditLogEvent::V1(proto::AuditLogEventV1 {
890 id: 4,
891 event_type: proto::audit_log_event_v1::EventType::Drop,
892 object_type: proto::audit_log_event_v1::ObjectType::Database,
893 user: None,
894 occurred_at: proto::EpochMillis { millis: 7 },
895 details: proto::audit_log_event_v1::Details::ResetAllV1(
896 proto::Empty {},
897 ),
898 }),
899 },
900 (),
901 ),
902 4,
903 ),
904 ];
905
906 for (kind, expected) in test_cases {
907 let json_kind: StateUpdateKindJson = kind.into();
908 let id = json_kind.audit_log_id();
909 assert_eq!(expected, id);
910 }
911 }
912
913 proptest! {
914 #[mz_ore::test]
915 #[cfg_attr(miri, ignore)] fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
917 let raw = StateUpdateKindJson::from(kind.clone());
920 let desc = RelationDesc::builder().with_column("a", SqlScalarType::Jsonb.nullable(false)).finish();
921
922 let source_data = SourceData::from(raw.clone());
924 let mut encoded = Vec::new();
925 source_data.encode(&mut encoded);
926 let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
927 prop_assert_eq!(&source_data, &decoded);
928 let decoded = StateUpdateKindJson::from(decoded);
929 prop_assert_eq!(&raw, &decoded);
930
931 let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
933 prop_assert_eq!(&kind, &decoded);
934 }
935 }
936}