mz_catalog/durable/upgrade/
v75_to_v76.rs1use crate::durable::traits::{UpgradeFrom, UpgradeInto};
11use crate::durable::upgrade::MigrationAction;
12use crate::durable::upgrade::wire_compatible::{WireCompatible, wire_compatible};
13use crate::durable::upgrade::{objects_v75 as v75, objects_v76 as v76};
14
15wire_compatible!(v75::ClusterKey with v76::ClusterKey);
16wire_compatible!(v75::ClusterReplicaKey with v76::ClusterReplicaKey);
17wire_compatible!(v75::ClusterId with v76::ClusterId);
18wire_compatible!(v75::RoleId with v76::RoleId);
19wire_compatible!(v75::MzAclItem with v76::MzAclItem);
20wire_compatible!(v75::ReplicaLogging with v76::ReplicaLogging);
21wire_compatible!(v75::replica_config::UnmanagedLocation with v76::replica_config::UnmanagedLocation);
22wire_compatible!(v75::OptimizerFeatureOverride with v76::OptimizerFeatureOverride);
23wire_compatible!(v75::ClusterSchedule with v76::ClusterSchedule);
24
25pub fn upgrade(
27 snapshot: Vec<v75::StateUpdateKind>,
28) -> Vec<MigrationAction<v75::StateUpdateKind, v76::StateUpdateKind>> {
29 let mut migrations = Vec::new();
30 for update in snapshot {
31 match update.kind {
32 Some(v75::state_update_kind::Kind::Cluster(old_cluster)) => {
33 let new_cluster =
34 v76::state_update_kind::Cluster::upgrade_from(old_cluster.clone());
35 let old = v75::StateUpdateKind {
36 kind: Some(v75::state_update_kind::Kind::Cluster(old_cluster)),
37 };
38 let new = v76::StateUpdateKind {
39 kind: Some(v76::state_update_kind::Kind::Cluster(new_cluster)),
40 };
41
42 let migration = MigrationAction::Update(old, new);
43 migrations.push(migration);
44 }
45 Some(v75::state_update_kind::Kind::ClusterReplica(old_replica)) => {
46 let new_replica =
47 v76::state_update_kind::ClusterReplica::upgrade_from(old_replica.clone());
48 let old = v75::StateUpdateKind {
49 kind: Some(v75::state_update_kind::Kind::ClusterReplica(old_replica)),
50 };
51 let new = v76::StateUpdateKind {
52 kind: Some(v76::state_update_kind::Kind::ClusterReplica(new_replica)),
53 };
54
55 let migration = MigrationAction::Update(old, new);
56 migrations.push(migration);
57 }
58 _ => {}
59 }
60 }
61 migrations
62}
63
64impl UpgradeFrom<v75::state_update_kind::Cluster> for v76::state_update_kind::Cluster {
65 fn upgrade_from(old: v75::state_update_kind::Cluster) -> Self {
66 v76::state_update_kind::Cluster {
67 key: old.key.as_ref().map(WireCompatible::convert),
68 value: old.value.map(UpgradeFrom::upgrade_from),
69 }
70 }
71}
72
73impl UpgradeFrom<v75::ClusterValue> for v76::ClusterValue {
74 fn upgrade_from(old: v75::ClusterValue) -> Self {
75 v76::ClusterValue {
76 name: old.name,
77 owner_id: old.owner_id.as_ref().map(WireCompatible::convert),
78 privileges: old.privileges.iter().map(WireCompatible::convert).collect(),
79 config: old.config.map(UpgradeFrom::upgrade_from),
80 }
81 }
82}
83
84impl UpgradeFrom<v75::ClusterConfig> for v76::ClusterConfig {
85 fn upgrade_from(old: v75::ClusterConfig) -> Self {
86 v76::ClusterConfig {
87 workload_class: old.workload_class,
88 variant: old.variant.map(UpgradeFrom::upgrade_from),
89 }
90 }
91}
92
93impl UpgradeFrom<v75::cluster_config::Variant> for v76::cluster_config::Variant {
94 fn upgrade_from(old: v75::cluster_config::Variant) -> Self {
95 match old {
96 v75::cluster_config::Variant::Unmanaged(_empty) => {
97 v76::cluster_config::Variant::Unmanaged(v76::Empty {})
98 }
99 v75::cluster_config::Variant::Managed(managed) => {
100 v76::cluster_config::Variant::Managed(managed.upgrade_into())
101 }
102 }
103 }
104}
105
106impl UpgradeFrom<v75::cluster_config::ManagedCluster> for v76::cluster_config::ManagedCluster {
107 fn upgrade_from(loc: v75::cluster_config::ManagedCluster) -> Self {
108 v76::cluster_config::ManagedCluster {
109 size: loc.size,
110 replication_factor: loc.replication_factor,
111 availability_zones: loc.availability_zones,
112 logging: loc.logging.as_ref().map(WireCompatible::convert),
113 optimizer_feature_overrides: loc
114 .optimizer_feature_overrides
115 .iter()
116 .map(WireCompatible::convert)
117 .collect(),
118 schedule: loc.schedule.as_ref().map(WireCompatible::convert),
119 }
120 }
121}
122
123impl UpgradeFrom<v75::state_update_kind::ClusterReplica>
124 for v76::state_update_kind::ClusterReplica
125{
126 fn upgrade_from(old: v75::state_update_kind::ClusterReplica) -> Self {
127 v76::state_update_kind::ClusterReplica {
128 key: old.key.as_ref().map(WireCompatible::convert),
129 value: old.value.map(UpgradeFrom::upgrade_from),
130 }
131 }
132}
133
134impl UpgradeFrom<v75::ClusterReplicaValue> for v76::ClusterReplicaValue {
135 fn upgrade_from(old: v75::ClusterReplicaValue) -> Self {
136 v76::ClusterReplicaValue {
137 cluster_id: old.cluster_id.as_ref().map(WireCompatible::convert),
138 name: old.name,
139 config: old.config.map(UpgradeFrom::upgrade_from),
140 owner_id: old.owner_id.as_ref().map(WireCompatible::convert),
141 }
142 }
143}
144
145impl UpgradeFrom<v75::ReplicaConfig> for v76::ReplicaConfig {
146 fn upgrade_from(old: v75::ReplicaConfig) -> Self {
147 v76::ReplicaConfig {
148 logging: old.logging.as_ref().map(WireCompatible::convert),
149 location: old.location.map(UpgradeFrom::upgrade_from),
150 }
151 }
152}
153
154impl UpgradeFrom<v75::replica_config::Location> for v76::replica_config::Location {
155 fn upgrade_from(old: v75::replica_config::Location) -> Self {
156 match old {
157 v75::replica_config::Location::Unmanaged(loc) => {
158 v76::replica_config::Location::Unmanaged(WireCompatible::convert(&loc))
159 }
160 v75::replica_config::Location::Managed(loc) => {
161 v76::replica_config::Location::Managed(loc.upgrade_into())
162 }
163 }
164 }
165}
166
167impl UpgradeFrom<v75::replica_config::ManagedLocation> for v76::replica_config::ManagedLocation {
168 fn upgrade_from(loc: v75::replica_config::ManagedLocation) -> Self {
169 v76::replica_config::ManagedLocation {
170 size: loc.size,
171 availability_zone: loc.availability_zone,
172 internal: loc.internal,
173 billed_as: loc.billed_as,
174 pending: loc.pending,
175 }
176 }
177}