1use std::fmt::Debug;
37use std::sync::LazyLock;
38
39use mz_ore::collections::HashSet;
40use mz_proto::{ProtoType, RustType, TryFromProtoError};
41use mz_repr::Diff;
42use mz_repr::adt::jsonb::Jsonb;
43use mz_repr::adt::numeric::{Dec, Numeric};
44use mz_storage_types::StorageDiff;
45use mz_storage_types::sources::SourceData;
46use proptest_derive::Arbitrary;
47use tracing::error;
48
49use crate::durable::debug::CollectionType;
50use crate::durable::objects::serialization::proto;
51use crate::durable::objects::{DurableType, FenceToken};
52use crate::durable::persist::Timestamp;
53use crate::durable::transaction::TransactionBatch;
54use crate::durable::{DurableCatalogError, Epoch};
55use crate::memory;
56
57pub trait IntoStateUpdateKindJson:
59 Into<StateUpdateKindJson> + PartialEq + Eq + PartialOrd + Ord + Debug + Clone
60{
61 type Error: Debug;
62
63 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error>;
64}
65impl<
66 T: Into<StateUpdateKindJson>
67 + TryFrom<StateUpdateKindJson>
68 + PartialEq
69 + Eq
70 + PartialOrd
71 + Ord
72 + Debug
73 + Clone,
74> IntoStateUpdateKindJson for T
75where
76 T::Error: Debug,
77{
78 type Error = T::Error;
79
80 fn try_from(raw: StateUpdateKindJson) -> Result<Self, Self::Error> {
81 <T as TryFrom<StateUpdateKindJson>>::try_from(raw)
82 }
83}
84
85pub(crate) trait TryIntoStateUpdateKind: IntoStateUpdateKindJson {
87 type Error: Debug;
88
89 fn try_into(self) -> Result<StateUpdateKind, <Self as TryIntoStateUpdateKind>::Error>;
90}
91impl<T: IntoStateUpdateKindJson + TryInto<StateUpdateKind>> TryIntoStateUpdateKind for T
92where
93 <T as TryInto<StateUpdateKind>>::Error: Debug,
94{
95 type Error = <T as TryInto<StateUpdateKind>>::Error;
96
97 fn try_into(self) -> Result<StateUpdateKind, <T as TryInto<StateUpdateKind>>::Error> {
98 <T as TryInto<StateUpdateKind>>::try_into(self)
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
104pub struct StateUpdate<T: IntoStateUpdateKindJson = StateUpdateKind> {
105 pub kind: T,
107 pub ts: Timestamp,
109 pub diff: Diff,
111}
112
113impl StateUpdate {
114 pub(crate) fn from_txn_batch_ts(
116 txn_batch: TransactionBatch,
117 ts: Timestamp,
118 ) -> impl Iterator<Item = StateUpdate> {
119 Self::from_txn_batch(txn_batch).map(move |(kind, diff)| StateUpdate { kind, ts, diff })
120 }
121
122 pub(crate) fn from_txn_batch(
124 txn_batch: TransactionBatch,
125 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
126 fn from_batch<K, V>(
127 batch: Vec<(K, V, Diff)>,
128 kind: fn(K, V) -> StateUpdateKind,
129 ) -> impl Iterator<Item = (StateUpdateKind, Diff)> {
130 batch
131 .into_iter()
132 .map(move |(k, v, diff)| (kind(k, v), diff))
133 }
134 let TransactionBatch {
135 databases,
136 schemas,
137 items,
138 comments,
139 roles,
140 role_auth,
141 clusters,
142 cluster_replicas,
143 network_policies,
144 introspection_sources,
145 id_allocator,
146 configs,
147 settings,
148 source_references,
149 system_gid_mapping,
150 system_configurations,
151 default_privileges,
152 system_privileges,
153 storage_collection_metadata,
154 unfinalized_shards,
155 txn_wal_shard,
156 audit_log_updates,
157 upper: _,
158 } = txn_batch;
159 let databases = from_batch(databases, StateUpdateKind::Database);
160 let schemas = from_batch(schemas, StateUpdateKind::Schema);
161 let items = from_batch(items, StateUpdateKind::Item);
162 let comments = from_batch(comments, StateUpdateKind::Comment);
163 let roles = from_batch(roles, StateUpdateKind::Role);
164 let role_auth = from_batch(role_auth, StateUpdateKind::RoleAuth);
165 let clusters = from_batch(clusters, StateUpdateKind::Cluster);
166 let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica);
167 let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy);
168 let introspection_sources = from_batch(
169 introspection_sources,
170 StateUpdateKind::IntrospectionSourceIndex,
171 );
172 let id_allocators = from_batch(id_allocator, StateUpdateKind::IdAllocator);
173 let configs = from_batch(configs, StateUpdateKind::Config);
174 let settings = from_batch(settings, StateUpdateKind::Setting);
175 let system_object_mappings =
176 from_batch(system_gid_mapping, StateUpdateKind::SystemObjectMapping);
177 let system_configurations =
178 from_batch(system_configurations, StateUpdateKind::SystemConfiguration);
179 let default_privileges = from_batch(default_privileges, StateUpdateKind::DefaultPrivilege);
180 let source_references = from_batch(source_references, StateUpdateKind::SourceReferences);
181 let system_privileges = from_batch(system_privileges, StateUpdateKind::SystemPrivilege);
182 let storage_collection_metadata = from_batch(
183 storage_collection_metadata,
184 StateUpdateKind::StorageCollectionMetadata,
185 );
186 let unfinalized_shards = from_batch(unfinalized_shards, StateUpdateKind::UnfinalizedShard);
187 let txn_wal_shard = from_batch(txn_wal_shard, StateUpdateKind::TxnWalShard);
188 let audit_logs = from_batch(audit_log_updates, StateUpdateKind::AuditLog);
189
190 databases
191 .chain(schemas)
192 .chain(items)
193 .chain(comments)
194 .chain(roles)
195 .chain(role_auth)
196 .chain(clusters)
197 .chain(cluster_replicas)
198 .chain(network_policies)
199 .chain(introspection_sources)
200 .chain(id_allocators)
201 .chain(configs)
202 .chain(settings)
203 .chain(source_references)
204 .chain(system_object_mappings)
205 .chain(system_configurations)
206 .chain(default_privileges)
207 .chain(system_privileges)
208 .chain(storage_collection_metadata)
209 .chain(unfinalized_shards)
210 .chain(txn_wal_shard)
211 .chain(audit_logs)
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Arbitrary)]
220pub enum StateUpdateKind {
221 AuditLog(proto::AuditLogKey, ()),
222 Cluster(proto::ClusterKey, proto::ClusterValue),
223 ClusterReplica(proto::ClusterReplicaKey, proto::ClusterReplicaValue),
224 Comment(proto::CommentKey, proto::CommentValue),
225 Config(proto::ConfigKey, proto::ConfigValue),
226 Database(proto::DatabaseKey, proto::DatabaseValue),
227 DefaultPrivilege(proto::DefaultPrivilegesKey, proto::DefaultPrivilegesValue),
228 FenceToken(FenceToken),
229 IdAllocator(proto::IdAllocKey, proto::IdAllocValue),
230 IntrospectionSourceIndex(
231 proto::ClusterIntrospectionSourceIndexKey,
232 proto::ClusterIntrospectionSourceIndexValue,
233 ),
234 Item(proto::ItemKey, proto::ItemValue),
235 NetworkPolicy(proto::NetworkPolicyKey, proto::NetworkPolicyValue),
236 Role(proto::RoleKey, proto::RoleValue),
237 RoleAuth(proto::RoleAuthKey, proto::RoleAuthValue),
238 Schema(proto::SchemaKey, proto::SchemaValue),
239 Setting(proto::SettingKey, proto::SettingValue),
240 SourceReferences(proto::SourceReferencesKey, proto::SourceReferencesValue),
241 SystemConfiguration(
242 proto::ServerConfigurationKey,
243 proto::ServerConfigurationValue,
244 ),
245 SystemObjectMapping(proto::GidMappingKey, proto::GidMappingValue),
246 SystemPrivilege(proto::SystemPrivilegesKey, proto::SystemPrivilegesValue),
247 StorageCollectionMetadata(
248 proto::StorageCollectionMetadataKey,
249 proto::StorageCollectionMetadataValue,
250 ),
251 UnfinalizedShard(proto::UnfinalizedShardKey, ()),
252 TxnWalShard((), proto::TxnWalShardValue),
253}
254
255impl StateUpdateKind {
256 pub(crate) fn collection_type(&self) -> Option<CollectionType> {
257 match self {
258 StateUpdateKind::AuditLog(_, _) => Some(CollectionType::AuditLog),
259 StateUpdateKind::Cluster(_, _) => Some(CollectionType::ComputeInstance),
260 StateUpdateKind::ClusterReplica(_, _) => Some(CollectionType::ComputeReplicas),
261 StateUpdateKind::Comment(_, _) => Some(CollectionType::Comments),
262 StateUpdateKind::Config(_, _) => Some(CollectionType::Config),
263 StateUpdateKind::Database(_, _) => Some(CollectionType::Database),
264 StateUpdateKind::DefaultPrivilege(_, _) => Some(CollectionType::DefaultPrivileges),
265 StateUpdateKind::FenceToken(_) => None,
266 StateUpdateKind::IdAllocator(_, _) => Some(CollectionType::IdAlloc),
267 StateUpdateKind::IntrospectionSourceIndex(_, _) => {
268 Some(CollectionType::ComputeIntrospectionSourceIndex)
269 }
270 StateUpdateKind::Item(_, _) => Some(CollectionType::Item),
271 StateUpdateKind::NetworkPolicy(_, _) => Some(CollectionType::NetworkPolicy),
272 StateUpdateKind::Role(_, _) => Some(CollectionType::Role),
273 StateUpdateKind::RoleAuth(_, _) => Some(CollectionType::RoleAuth),
274 StateUpdateKind::Schema(_, _) => Some(CollectionType::Schema),
275 StateUpdateKind::Setting(_, _) => Some(CollectionType::Setting),
276 StateUpdateKind::SourceReferences(_, _) => Some(CollectionType::SourceReferences),
277 StateUpdateKind::SystemConfiguration(_, _) => Some(CollectionType::SystemConfiguration),
278 StateUpdateKind::SystemObjectMapping(_, _) => Some(CollectionType::SystemGidMapping),
279 StateUpdateKind::SystemPrivilege(_, _) => Some(CollectionType::SystemPrivileges),
280 StateUpdateKind::StorageCollectionMetadata(_, _) => {
281 Some(CollectionType::StorageCollectionMetadata)
282 }
283 StateUpdateKind::UnfinalizedShard(_, _) => Some(CollectionType::UnfinalizedShard),
284 StateUpdateKind::TxnWalShard(_, _) => Some(CollectionType::TxnWalShard),
285 }
286 }
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
291pub struct StateUpdateKindJson(Jsonb);
292
293impl StateUpdateKindJson {
294 pub(crate) fn from_serde<S: serde::Serialize>(s: S) -> Self {
295 let serde_value = serde_json::to_value(s).expect("valid json");
296 let row = Jsonb::from_serde_json(serde_value).expect("valid json");
297 StateUpdateKindJson(row)
298 }
299
300 pub(crate) fn to_serde<D: serde::de::DeserializeOwned>(&self) -> D {
301 self.try_to_serde().expect("jsonb should roundtrip")
302 }
303
304 pub(crate) fn try_to_serde<D: serde::de::DeserializeOwned>(
305 &self,
306 ) -> Result<D, serde_json::error::Error> {
307 let serde_value = self.0.as_ref().to_serde_json();
308 serde_json::from_value::<D>(serde_value)
309 }
310
311 fn kind(&self) -> &str {
312 let row = self.0.row();
313 let mut iter = row.unpack_first().unwrap_map().iter();
314 let datum = iter
315 .find_map(|(field, datum)| if field == "kind" { Some(datum) } else { None })
316 .expect("kind field must exist");
317 datum.unwrap_str()
318 }
319
320 pub(crate) fn audit_log_id(&self) -> u64 {
321 assert!(self.is_audit_log(), "unexpected update kind: {self:?}");
322 let row = self.0.row();
323 let mut iter = row.unpack_first().unwrap_map().iter();
324 let key = iter
325 .find_map(|(field, datum)| if field == "key" { Some(datum) } else { None })
326 .expect("key field must exist")
327 .unwrap_map();
328 let event = key
329 .iter()
330 .find_map(|(field, datum)| if field == "event" { Some(datum) } else { None })
331 .expect("event field must exist")
332 .unwrap_map();
333 let (event_version, versioned_datum) = event.iter().next().expect("event cannot be empty");
334 match event_version {
335 "V1" => {
336 let versioned_map = versioned_datum.unwrap_map();
337 let id = versioned_map
338 .iter()
339 .find_map(|(field, datum)| if field == "id" { Some(datum) } else { None })
340 .expect("event field must exist")
341 .unwrap_numeric();
342 let mut cx = Numeric::context();
343 cx.try_into_u64(id.into_inner()).expect("invalid id")
344 }
345 version => unimplemented!("unsupported event version: {version}"),
346 }
347 }
348
349 pub(crate) fn is_always_deserializable(&self) -> bool {
351 static DESERIALIZABLE_KINDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
354 [
355 StateUpdateKind::FenceToken(FenceToken {
356 deploy_generation: 1,
357 epoch: Epoch::new(1).expect("non-zero"),
358 }),
359 StateUpdateKind::Config(
360 proto::ConfigKey { key: String::new() },
361 proto::ConfigValue { value: 1 },
362 ),
363 StateUpdateKind::Setting(
364 proto::SettingKey {
365 name: String::new(),
366 },
367 proto::SettingValue {
368 value: String::new(),
369 },
370 ),
371 StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
372 ]
373 .into_iter()
374 .map(|kind| {
375 let json_kind: StateUpdateKindJson = kind.into();
376 json_kind.kind().to_string()
377 })
378 .collect()
379 });
380 DESERIALIZABLE_KINDS.contains(self.kind())
381 }
382
383 pub(crate) fn is_audit_log(&self) -> bool {
385 static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
388 let audit_log = StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ());
389 let json_kind: StateUpdateKindJson = audit_log.into();
390 json_kind.kind().to_string()
391 });
392 &*AUDIT_LOG_KIND == self.kind()
393 }
394}
395
396type PersistStateUpdate = (
398 (Result<SourceData, String>, Result<(), String>),
399 Timestamp,
400 StorageDiff,
401);
402
403impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
404 type Error = DurableCatalogError;
405
406 fn try_from(
407 StateUpdate { kind, ts, diff }: &StateUpdate<StateUpdateKind>,
408 ) -> Result<Self, Self::Error> {
409 let kind: Option<memory::objects::StateUpdateKind> = TryInto::try_into(kind)?;
410 let update = kind.map(|kind| memory::objects::StateUpdate {
411 kind,
412 ts: ts.clone(),
413 diff: diff.clone().try_into().expect("invalid diff"),
414 });
415 Ok(update)
416 }
417}
418
419impl TryFrom<&StateUpdateKind> for Option<memory::objects::StateUpdateKind> {
420 type Error = DurableCatalogError;
421
422 fn try_from(kind: &StateUpdateKind) -> Result<Self, Self::Error> {
423 fn into_durable<PK, PV, T>(key: &PK, value: &PV) -> Result<T, DurableCatalogError>
424 where
425 PK: ProtoType<T::Key> + Clone,
426 PV: ProtoType<T::Value> + Clone,
427 T: DurableType,
428 {
429 let key = key.clone().into_rust()?;
430 let value = value.clone().into_rust()?;
431 Ok(T::from_key_value(key, value))
432 }
433
434 Ok(match kind {
435 StateUpdateKind::AuditLog(key, value) => {
436 let audit_log = into_durable(key, value)?;
437 Some(memory::objects::StateUpdateKind::AuditLog(audit_log))
438 }
439 StateUpdateKind::Cluster(key, value) => {
440 let cluster = into_durable(key, value)?;
441 Some(memory::objects::StateUpdateKind::Cluster(cluster))
442 }
443 StateUpdateKind::ClusterReplica(key, value) => {
444 let cluster_replica = into_durable(key, value)?;
445 Some(memory::objects::StateUpdateKind::ClusterReplica(
446 cluster_replica,
447 ))
448 }
449 StateUpdateKind::Comment(key, value) => {
450 let comment = into_durable(key, value)?;
451 Some(memory::objects::StateUpdateKind::Comment(comment))
452 }
453 StateUpdateKind::Database(key, value) => {
454 let database = into_durable(key, value)?;
455 Some(memory::objects::StateUpdateKind::Database(database))
456 }
457 StateUpdateKind::DefaultPrivilege(key, value) => {
458 let default_privilege = into_durable(key, value)?;
459 Some(memory::objects::StateUpdateKind::DefaultPrivilege(
460 default_privilege,
461 ))
462 }
463 StateUpdateKind::Item(key, value) => {
464 let item = into_durable(key, value)?;
465 Some(memory::objects::StateUpdateKind::Item(item))
466 }
467 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
468 let introspection_source_index = into_durable(key, value)?;
469 Some(memory::objects::StateUpdateKind::IntrospectionSourceIndex(
470 introspection_source_index,
471 ))
472 }
473 StateUpdateKind::NetworkPolicy(key, value) => {
474 let policy = into_durable(key, value)?;
475 Some(memory::objects::StateUpdateKind::NetworkPolicy(policy))
476 }
477 StateUpdateKind::Role(key, value) => {
478 let role = into_durable(key, value)?;
479 Some(memory::objects::StateUpdateKind::Role(role))
480 }
481 StateUpdateKind::RoleAuth(key, value) => {
482 let role_auth = into_durable(key, value)?;
483 Some(memory::objects::StateUpdateKind::RoleAuth(role_auth))
484 }
485 StateUpdateKind::Schema(key, value) => {
486 let schema = into_durable(key, value)?;
487 Some(memory::objects::StateUpdateKind::Schema(schema))
488 }
489 StateUpdateKind::SourceReferences(key, value) => {
490 let source_references = into_durable(key, value)?;
491 Some(memory::objects::StateUpdateKind::SourceReferences(
492 source_references,
493 ))
494 }
495 StateUpdateKind::StorageCollectionMetadata(key, value) => {
496 let storage_collection_metadata = into_durable(key, value)?;
497 Some(memory::objects::StateUpdateKind::StorageCollectionMetadata(
498 storage_collection_metadata,
499 ))
500 }
501 StateUpdateKind::SystemConfiguration(key, value) => {
502 let system_configuration = into_durable(key, value)?;
503 Some(memory::objects::StateUpdateKind::SystemConfiguration(
504 system_configuration,
505 ))
506 }
507 StateUpdateKind::SystemObjectMapping(key, value) => {
508 let system_object_mapping = into_durable(key, value)?;
509 Some(memory::objects::StateUpdateKind::SystemObjectMapping(
510 system_object_mapping,
511 ))
512 }
513 StateUpdateKind::SystemPrivilege(key, value) => {
514 let system_privilege = into_durable(key, value)?;
515 Some(memory::objects::StateUpdateKind::SystemPrivilege(
516 system_privilege,
517 ))
518 }
519 StateUpdateKind::UnfinalizedShard(key, value) => {
520 let unfinalized_shard = into_durable(key, value)?;
521 Some(memory::objects::StateUpdateKind::UnfinalizedShard(
522 unfinalized_shard,
523 ))
524 }
525 StateUpdateKind::Config(_, _)
527 | StateUpdateKind::FenceToken(_)
528 | StateUpdateKind::IdAllocator(_, _)
529 | StateUpdateKind::Setting(_, _)
530 | StateUpdateKind::TxnWalShard(_, _) => None,
531 })
532 }
533}
534
535impl TryFrom<StateUpdate<StateUpdateKindJson>> for StateUpdate<StateUpdateKind> {
536 type Error = String;
537
538 fn try_from(update: StateUpdate<StateUpdateKindJson>) -> Result<Self, Self::Error> {
539 Ok(StateUpdate {
540 kind: TryInto::try_into(update.kind)?,
541 ts: update.ts,
542 diff: update.diff,
543 })
544 }
545}
546
547impl TryFrom<StateUpdateKindJson> for StateUpdateKind {
548 type Error = String;
549
550 fn try_from(value: StateUpdateKindJson) -> Result<Self, Self::Error> {
551 let kind: proto::state_update_kind::Kind =
552 value.try_to_serde().map_err(|err| err.to_string())?;
553 let kind = proto::StateUpdateKind { kind: Some(kind) };
554 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
555 }
556}
557
558impl TryFrom<&StateUpdateKindJson> for StateUpdateKind {
559 type Error = String;
560
561 fn try_from(value: &StateUpdateKindJson) -> Result<Self, Self::Error> {
562 let kind: proto::state_update_kind::Kind =
563 value.try_to_serde().map_err(|err| err.to_string())?;
564 let kind = proto::StateUpdateKind { kind: Some(kind) };
565 StateUpdateKind::from_proto(kind).map_err(|err| err.to_string())
566 }
567}
568
569impl From<StateUpdateKind> for StateUpdateKindJson {
570 fn from(value: StateUpdateKind) -> Self {
571 let kind = value.into_proto_owned();
572 let kind = kind.kind.expect("kind should be set");
573 StateUpdateKindJson::from_serde(kind)
574 }
575}
576
577impl RustType<proto::StateUpdateKind> for StateUpdateKind {
581 fn into_proto(&self) -> proto::StateUpdateKind {
582 error!("unexpected clone of catalog data");
583 self.clone().into_proto_owned()
584 }
585
586 fn into_proto_owned(self) -> proto::StateUpdateKind {
587 proto::StateUpdateKind {
588 kind: Some(match self {
589 StateUpdateKind::AuditLog(key, _value) => {
590 proto::state_update_kind::Kind::AuditLog(proto::state_update_kind::AuditLog {
591 key: Some(key),
592 })
593 }
594 StateUpdateKind::Cluster(key, value) => {
595 proto::state_update_kind::Kind::Cluster(proto::state_update_kind::Cluster {
596 key: Some(key),
597 value: Some(value),
598 })
599 }
600 StateUpdateKind::ClusterReplica(key, value) => {
601 proto::state_update_kind::Kind::ClusterReplica(
602 proto::state_update_kind::ClusterReplica {
603 key: Some(key),
604 value: Some(value),
605 },
606 )
607 }
608 StateUpdateKind::Comment(key, value) => {
609 proto::state_update_kind::Kind::Comment(proto::state_update_kind::Comment {
610 key: Some(key),
611 value: Some(value),
612 })
613 }
614 StateUpdateKind::Config(key, value) => {
615 proto::state_update_kind::Kind::Config(proto::state_update_kind::Config {
616 key: Some(key),
617 value: Some(value),
618 })
619 }
620 StateUpdateKind::Database(key, value) => {
621 proto::state_update_kind::Kind::Database(proto::state_update_kind::Database {
622 key: Some(key),
623 value: Some(value),
624 })
625 }
626 StateUpdateKind::DefaultPrivilege(key, value) => {
627 proto::state_update_kind::Kind::DefaultPrivileges(
628 proto::state_update_kind::DefaultPrivileges {
629 key: Some(key),
630 value: Some(value),
631 },
632 )
633 }
634 StateUpdateKind::FenceToken(fence_token) => {
635 proto::state_update_kind::Kind::FenceToken(
636 proto::state_update_kind::FenceToken {
637 deploy_generation: fence_token.deploy_generation,
638 epoch: fence_token.epoch.get(),
639 },
640 )
641 }
642 StateUpdateKind::IdAllocator(key, value) => {
643 proto::state_update_kind::Kind::IdAlloc(proto::state_update_kind::IdAlloc {
644 key: Some(key),
645 value: Some(value),
646 })
647 }
648 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
649 proto::state_update_kind::Kind::ClusterIntrospectionSourceIndex(
650 proto::state_update_kind::ClusterIntrospectionSourceIndex {
651 key: Some(key),
652 value: Some(value),
653 },
654 )
655 }
656 StateUpdateKind::Item(key, value) => {
657 proto::state_update_kind::Kind::Item(proto::state_update_kind::Item {
658 key: Some(key),
659 value: Some(value),
660 })
661 }
662 StateUpdateKind::NetworkPolicy(key, value) => {
663 proto::state_update_kind::Kind::NetworkPolicy(
664 proto::state_update_kind::NetworkPolicy {
665 key: Some(key),
666 value: Some(value),
667 },
668 )
669 }
670 StateUpdateKind::Role(key, value) => {
671 proto::state_update_kind::Kind::Role(proto::state_update_kind::Role {
672 key: Some(key),
673 value: Some(value),
674 })
675 }
676 StateUpdateKind::RoleAuth(key, value) => {
677 proto::state_update_kind::Kind::RoleAuth(proto::state_update_kind::RoleAuth {
678 key: Some(key),
679 value: Some(value),
680 })
681 }
682 StateUpdateKind::Schema(key, value) => {
683 proto::state_update_kind::Kind::Schema(proto::state_update_kind::Schema {
684 key: Some(key),
685 value: Some(value),
686 })
687 }
688 StateUpdateKind::Setting(key, value) => {
689 proto::state_update_kind::Kind::Setting(proto::state_update_kind::Setting {
690 key: Some(key),
691 value: Some(value),
692 })
693 }
694 StateUpdateKind::SourceReferences(key, value) => {
695 proto::state_update_kind::Kind::SourceReferences(
696 proto::state_update_kind::SourceReferences {
697 key: Some(key),
698 value: Some(value),
699 },
700 )
701 }
702 StateUpdateKind::SystemConfiguration(key, value) => {
703 proto::state_update_kind::Kind::ServerConfiguration(
704 proto::state_update_kind::ServerConfiguration {
705 key: Some(key),
706 value: Some(value),
707 },
708 )
709 }
710 StateUpdateKind::SystemObjectMapping(key, value) => {
711 proto::state_update_kind::Kind::GidMapping(
712 proto::state_update_kind::GidMapping {
713 key: Some(key),
714 value: Some(value),
715 },
716 )
717 }
718 StateUpdateKind::SystemPrivilege(key, value) => {
719 proto::state_update_kind::Kind::SystemPrivileges(
720 proto::state_update_kind::SystemPrivileges {
721 key: Some(key),
722 value: Some(value),
723 },
724 )
725 }
726 StateUpdateKind::StorageCollectionMetadata(key, value) => {
727 proto::state_update_kind::Kind::StorageCollectionMetadata(
728 proto::state_update_kind::StorageCollectionMetadata {
729 key: Some(key),
730 value: Some(value),
731 },
732 )
733 }
734 StateUpdateKind::UnfinalizedShard(key, ()) => {
735 proto::state_update_kind::Kind::UnfinalizedShard(
736 proto::state_update_kind::UnfinalizedShard { key: Some(key) },
737 )
738 }
739 StateUpdateKind::TxnWalShard((), value) => {
740 proto::state_update_kind::Kind::TxnWalShard(
741 proto::state_update_kind::TxnWalShard { value: Some(value) },
742 )
743 }
744 }),
745 }
746 }
747
748 fn from_proto(proto: proto::StateUpdateKind) -> Result<StateUpdateKind, TryFromProtoError> {
749 Ok(
750 match proto
751 .kind
752 .ok_or_else(|| TryFromProtoError::missing_field("StateUpdateKind::kind"))?
753 {
754 proto::state_update_kind::Kind::AuditLog(proto::state_update_kind::AuditLog {
755 key,
756 }) => StateUpdateKind::AuditLog(
757 key.ok_or_else(|| {
758 TryFromProtoError::missing_field("state_update_kind::AuditLog::key")
759 })?,
760 (),
761 ),
762 proto::state_update_kind::Kind::Cluster(proto::state_update_kind::Cluster {
763 key,
764 value,
765 }) => StateUpdateKind::Cluster(
766 key.ok_or_else(|| {
767 TryFromProtoError::missing_field("state_update_kind::Cluster::key")
768 })?,
769 value.ok_or_else(|| {
770 TryFromProtoError::missing_field("state_update_kind::Cluster::value")
771 })?,
772 ),
773 proto::state_update_kind::Kind::ClusterReplica(
774 proto::state_update_kind::ClusterReplica { key, value },
775 ) => StateUpdateKind::ClusterReplica(
776 key.ok_or_else(|| {
777 TryFromProtoError::missing_field("state_update_kind::ClusterReplica::key")
778 })?,
779 value.ok_or_else(|| {
780 TryFromProtoError::missing_field("state_update_kind::ClusterReplica::value")
781 })?,
782 ),
783 proto::state_update_kind::Kind::Comment(proto::state_update_kind::Comment {
784 key,
785 value,
786 }) => StateUpdateKind::Comment(
787 key.ok_or_else(|| {
788 TryFromProtoError::missing_field("state_update_kind::Comment::key")
789 })?,
790 value.ok_or_else(|| {
791 TryFromProtoError::missing_field("state_update_kind::Comment::value")
792 })?,
793 ),
794 proto::state_update_kind::Kind::Config(proto::state_update_kind::Config {
795 key,
796 value,
797 }) => StateUpdateKind::Config(
798 key.ok_or_else(|| {
799 TryFromProtoError::missing_field("state_update_kind::Config::key")
800 })?,
801 value.ok_or_else(|| {
802 TryFromProtoError::missing_field("state_update_kind::Config::value")
803 })?,
804 ),
805 proto::state_update_kind::Kind::Database(proto::state_update_kind::Database {
806 key,
807 value,
808 }) => StateUpdateKind::Database(
809 key.ok_or_else(|| {
810 TryFromProtoError::missing_field("state_update_kind::Database::key")
811 })?,
812 value.ok_or_else(|| {
813 TryFromProtoError::missing_field("state_update_kind::Database::value")
814 })?,
815 ),
816 proto::state_update_kind::Kind::DefaultPrivileges(
817 proto::state_update_kind::DefaultPrivileges { key, value },
818 ) => StateUpdateKind::DefaultPrivilege(
819 key.ok_or_else(|| {
820 TryFromProtoError::missing_field(
821 "state_update_kind::DefaultPrivileges::key",
822 )
823 })?,
824 value.ok_or_else(|| {
825 TryFromProtoError::missing_field(
826 "state_update_kind::DefaultPrivileges::value",
827 )
828 })?,
829 ),
830 proto::state_update_kind::Kind::FenceToken(
831 proto::state_update_kind::FenceToken {
832 deploy_generation,
833 epoch,
834 },
835 ) => StateUpdateKind::FenceToken(FenceToken {
836 deploy_generation,
837 epoch: Epoch::new(epoch).ok_or_else(|| {
838 TryFromProtoError::missing_field("state_update_kind::Epoch::epoch")
839 })?,
840 }),
841 proto::state_update_kind::Kind::IdAlloc(proto::state_update_kind::IdAlloc {
842 key,
843 value,
844 }) => StateUpdateKind::IdAllocator(
845 key.ok_or_else(|| {
846 TryFromProtoError::missing_field("state_update_kind::IdAlloc::key")
847 })?,
848 value.ok_or_else(|| {
849 TryFromProtoError::missing_field("state_update_kind::IdAlloc::value")
850 })?,
851 ),
852 proto::state_update_kind::Kind::ClusterIntrospectionSourceIndex(
853 proto::state_update_kind::ClusterIntrospectionSourceIndex { key, value },
854 ) => StateUpdateKind::IntrospectionSourceIndex(
855 key.ok_or_else(|| {
856 TryFromProtoError::missing_field(
857 "state_update_kind::ClusterIntrospectionSourceIndex::key",
858 )
859 })?,
860 value.ok_or_else(|| {
861 TryFromProtoError::missing_field(
862 "state_update_kind::ClusterIntrospectionSourceIndex::value",
863 )
864 })?,
865 ),
866 proto::state_update_kind::Kind::Item(proto::state_update_kind::Item {
867 key,
868 value,
869 }) => StateUpdateKind::Item(
870 key.ok_or_else(|| {
871 TryFromProtoError::missing_field("state_update_kind::Item::key")
872 })?,
873 value.ok_or_else(|| {
874 TryFromProtoError::missing_field("state_update_kind::Item::value")
875 })?,
876 ),
877 proto::state_update_kind::Kind::Role(proto::state_update_kind::Role {
878 key,
879 value,
880 }) => StateUpdateKind::Role(
881 key.ok_or_else(|| {
882 TryFromProtoError::missing_field("state_update_kind::Role::key")
883 })?,
884 value.ok_or_else(|| {
885 TryFromProtoError::missing_field("state_update_kind::Role::value")
886 })?,
887 ),
888 proto::state_update_kind::Kind::RoleAuth(proto::state_update_kind::RoleAuth {
889 key,
890 value,
891 }) => StateUpdateKind::RoleAuth(
892 key.ok_or_else(|| {
893 TryFromProtoError::missing_field("state_update_kind::RoleAuth::key")
894 })?,
895 value.ok_or_else(|| {
896 TryFromProtoError::missing_field("state_update_kind::RoleAuth::value")
897 })?,
898 ),
899 proto::state_update_kind::Kind::Schema(proto::state_update_kind::Schema {
900 key,
901 value,
902 }) => StateUpdateKind::Schema(
903 key.ok_or_else(|| {
904 TryFromProtoError::missing_field("state_update_kind::Schema::key")
905 })?,
906 value.ok_or_else(|| {
907 TryFromProtoError::missing_field("state_update_kind::Schema::value")
908 })?,
909 ),
910 proto::state_update_kind::Kind::Setting(proto::state_update_kind::Setting {
911 key,
912 value,
913 }) => StateUpdateKind::Setting(
914 key.ok_or_else(|| {
915 TryFromProtoError::missing_field("state_update_kind::Setting::key")
916 })?,
917 value.ok_or_else(|| {
918 TryFromProtoError::missing_field("state_update_kind::Setting::value")
919 })?,
920 ),
921 proto::state_update_kind::Kind::ServerConfiguration(
922 proto::state_update_kind::ServerConfiguration { key, value },
923 ) => StateUpdateKind::SystemConfiguration(
924 key.ok_or_else(|| {
925 TryFromProtoError::missing_field(
926 "state_update_kind::ServerConfiguration::key",
927 )
928 })?,
929 value.ok_or_else(|| {
930 TryFromProtoError::missing_field(
931 "state_update_kind::ServerConfiguration::value",
932 )
933 })?,
934 ),
935 proto::state_update_kind::Kind::GidMapping(
936 proto::state_update_kind::GidMapping { key, value },
937 ) => StateUpdateKind::SystemObjectMapping(
938 key.ok_or_else(|| {
939 TryFromProtoError::missing_field("state_update_kind::GidMapping::key")
940 })?,
941 value.ok_or_else(|| {
942 TryFromProtoError::missing_field("state_update_kind::GidMapping::value")
943 })?,
944 ),
945 proto::state_update_kind::Kind::SystemPrivileges(
946 proto::state_update_kind::SystemPrivileges { key, value },
947 ) => StateUpdateKind::SystemPrivilege(
948 key.ok_or_else(|| {
949 TryFromProtoError::missing_field("state_update_kind::SystemPrivileges::key")
950 })?,
951 value.ok_or_else(|| {
952 TryFromProtoError::missing_field(
953 "state_update_kind::SystemPrivileges::value",
954 )
955 })?,
956 ),
957 proto::state_update_kind::Kind::StorageCollectionMetadata(
958 proto::state_update_kind::StorageCollectionMetadata { key, value },
959 ) => StateUpdateKind::StorageCollectionMetadata(
960 key.ok_or_else(|| {
961 TryFromProtoError::missing_field(
962 "state_update_kind::StorageCollectionMetadata::key",
963 )
964 })?,
965 value.ok_or_else(|| {
966 TryFromProtoError::missing_field(
967 "state_update_kind::StorageCollectionMetadata::value",
968 )
969 })?,
970 ),
971 proto::state_update_kind::Kind::UnfinalizedShard(
972 proto::state_update_kind::UnfinalizedShard { key },
973 ) => StateUpdateKind::UnfinalizedShard(
974 key.ok_or_else(|| {
975 TryFromProtoError::missing_field(
976 "state_update_kind::StorageCollectionMetadata::key",
977 )
978 })?,
979 (),
980 ),
981 proto::state_update_kind::Kind::TxnWalShard(
982 proto::state_update_kind::TxnWalShard { value },
983 ) => StateUpdateKind::TxnWalShard(
984 (),
985 value.ok_or_else(|| {
986 TryFromProtoError::missing_field("state_update_kind::TxnWalShard::value")
987 })?,
988 ),
989 proto::state_update_kind::Kind::SourceReferences(
990 proto::state_update_kind::SourceReferences { key, value },
991 ) => StateUpdateKind::SourceReferences(
992 key.ok_or_else(|| {
993 TryFromProtoError::missing_field("state_update_kind::SourceReferences::key")
994 })?,
995 value.ok_or_else(|| {
996 TryFromProtoError::missing_field(
997 "state_update_kind::SourceReferences::value",
998 )
999 })?,
1000 ),
1001 proto::state_update_kind::Kind::NetworkPolicy(
1002 proto::state_update_kind::NetworkPolicy { key, value },
1003 ) => StateUpdateKind::NetworkPolicy(
1004 key.ok_or_else(|| {
1005 TryFromProtoError::missing_field("state_update_kind::NetworkPolicy::key")
1006 })?,
1007 value.ok_or_else(|| {
1008 TryFromProtoError::missing_field("state_update_kind::NetworkPolicy::value")
1009 })?,
1010 ),
1011 },
1012 )
1013 }
1014}
1015
1016impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
1019 fn from(kvtd: PersistStateUpdate) -> Self {
1020 let ((key, val), ts, diff) = kvtd;
1021 let (key, ()) = (
1022 key.expect("persist decoding error"),
1023 val.expect("persist decoding error"),
1024 );
1025 StateUpdate {
1026 kind: StateUpdateKindJson::from(key),
1027 ts,
1028 diff: diff.into(),
1029 }
1030 }
1031}
1032
1033impl From<StateUpdateKindJson> for SourceData {
1034 fn from(value: StateUpdateKindJson) -> SourceData {
1035 let row = value.0.into_row();
1036 SourceData(Ok(row))
1037 }
1038}
1039
1040impl From<SourceData> for StateUpdateKindJson {
1041 fn from(value: SourceData) -> Self {
1042 let row = value.0.expect("only Ok values stored in catalog shard");
1043 StateUpdateKindJson(Jsonb::from_row(row))
1044 }
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049 use mz_persist_types::Codec;
1050 use mz_repr::{RelationDesc, ScalarType};
1051 use mz_storage_types::sources::SourceData;
1052 use proptest::prelude::*;
1053
1054 use crate::durable::Epoch;
1055 use crate::durable::objects::FenceToken;
1056 use crate::durable::objects::serialization::proto;
1057 use crate::durable::objects::state_update::{StateUpdateKind, StateUpdateKindJson};
1058
1059 #[mz_ore::test]
1060 #[cfg_attr(miri, ignore)]
1061 fn kind_test() {
1062 let test_cases = [
1063 (
1064 StateUpdateKind::FenceToken(FenceToken {
1065 deploy_generation: 1,
1066 epoch: Epoch::new(1).expect("non-zero"),
1067 }),
1068 "FenceToken",
1069 ),
1070 (
1071 StateUpdateKind::Config(
1072 proto::ConfigKey { key: String::new() },
1073 proto::ConfigValue { value: 1 },
1074 ),
1075 "Config",
1076 ),
1077 (
1078 StateUpdateKind::Setting(
1079 proto::SettingKey {
1080 name: String::new(),
1081 },
1082 proto::SettingValue {
1083 value: String::new(),
1084 },
1085 ),
1086 "Setting",
1087 ),
1088 (
1089 StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
1090 "AuditLog",
1091 ),
1092 ];
1093
1094 for (kind, expected) in test_cases {
1095 let json_kind: StateUpdateKindJson = kind.into();
1096 let kind = json_kind.kind().to_string();
1097 assert_eq!(expected, kind);
1098 }
1099 }
1100
1101 #[mz_ore::test]
1102 #[cfg_attr(miri, ignore)]
1103 fn audit_log_id_test() {
1104 let test_cases = [
1105 (
1106 StateUpdateKind::AuditLog(
1107 proto::AuditLogKey {
1108 event: Some(proto::audit_log_key::Event::V1(proto::AuditLogEventV1 {
1109 id: 1,
1110 event_type: 2,
1111 object_type: 3,
1112 user: None,
1113 occurred_at: None,
1114 details: None,
1115 })),
1116 },
1117 (),
1118 ),
1119 1,
1120 ),
1121 (
1122 StateUpdateKind::AuditLog(
1123 proto::AuditLogKey {
1124 event: Some(proto::audit_log_key::Event::V1(proto::AuditLogEventV1 {
1125 id: 4,
1126 event_type: 5,
1127 object_type: 6,
1128 user: None,
1129 occurred_at: None,
1130 details: None,
1131 })),
1132 },
1133 (),
1134 ),
1135 4,
1136 ),
1137 ];
1138
1139 for (kind, expected) in test_cases {
1140 let json_kind: StateUpdateKindJson = kind.into();
1141 let id = json_kind.audit_log_id();
1142 assert_eq!(expected, id);
1143 }
1144 }
1145
1146 proptest! {
1147 #[mz_ore::test]
1148 #[cfg_attr(miri, ignore)] fn proptest_state_update_kind_roundtrip(kind: StateUpdateKind) {
1150 let raw = StateUpdateKindJson::from(kind.clone());
1153 let desc = RelationDesc::builder().with_column("a", ScalarType::Jsonb.nullable(false)).finish();
1154
1155 let source_data = SourceData::from(raw.clone());
1157 let mut encoded = Vec::new();
1158 source_data.encode(&mut encoded);
1159 let decoded = SourceData::decode(&encoded, &desc).expect("should be valid SourceData");
1160 prop_assert_eq!(&source_data, &decoded);
1161 let decoded = StateUpdateKindJson::from(decoded);
1162 prop_assert_eq!(&raw, &decoded);
1163
1164 let decoded = StateUpdateKind::try_from(decoded).expect("should be valid StateUpdateKind");
1166 prop_assert_eq!(&kind, &decoded);
1167 }
1168 }
1169}