mz_catalog/durable/upgrade/
v86_to_v87.rs1use std::collections::BTreeMap;
11
12use crate::durable::upgrade::MigrationAction;
13use crate::durable::upgrade::json_compatible::JsonCompatible;
14use crate::durable::upgrade::objects_v86 as v86;
15use crate::durable::upgrade::objects_v87 as v87;
16
17crate::json_compatible!(v86::ClusterKey with v87::ClusterKey);
23crate::json_compatible!(v86::RoleId with v87::RoleId);
24crate::json_compatible!(v86::MzAclItem with v87::MzAclItem);
25crate::json_compatible!(v86::ReplicaLogging with v87::ReplicaLogging);
26crate::json_compatible!(v86::OptimizerFeatureOverride with v87::OptimizerFeatureOverride);
27crate::json_compatible!(v86::ClusterSchedule with v87::ClusterSchedule);
28crate::json_compatible!(v86::ClusterReplicaKey with v87::ClusterReplicaKey);
29crate::json_compatible!(v86::ClusterId with v87::ClusterId);
30crate::json_compatible!(v86::UnmanagedLocation with v87::UnmanagedLocation);
31
32pub fn upgrade(
49 snapshot: Vec<v86::StateUpdateKind>,
50) -> Vec<MigrationAction<v86::StateUpdateKind, v87::StateUpdateKind>> {
51 let mut cluster_azs: BTreeMap<v86::ClusterId, Vec<String>> = BTreeMap::new();
56 for update in &snapshot {
57 if let v86::StateUpdateKind::Cluster(cluster) = update {
58 if let v86::ClusterVariant::Managed(managed) = &cluster.value.config.variant {
59 cluster_azs.insert(cluster.key.id.clone(), managed.availability_zones.clone());
60 }
61 }
62 }
63
64 let mut migrations = Vec::new();
65 for update in snapshot {
66 match update {
67 v86::StateUpdateKind::Cluster(old_cluster)
68 if matches!(
69 old_cluster.value.config.variant,
70 v86::ClusterVariant::Managed(_)
71 ) =>
72 {
73 let new_cluster = migrate_managed_cluster(old_cluster.clone());
74 migrations.push(MigrationAction::Update(
75 v86::StateUpdateKind::Cluster(old_cluster),
76 v87::StateUpdateKind::Cluster(new_cluster),
77 ));
78 }
79 v86::StateUpdateKind::ClusterReplica(old_replica)
80 if matches!(
81 old_replica.value.config.location,
82 v86::ReplicaLocation::Managed(_)
83 ) =>
84 {
85 let cluster_pool = cluster_azs.get(&old_replica.value.cluster_id).cloned();
89 let new_replica = migrate_managed_replica(old_replica.clone(), cluster_pool);
90 migrations.push(MigrationAction::Update(
91 v86::StateUpdateKind::ClusterReplica(old_replica),
92 v87::StateUpdateKind::ClusterReplica(new_replica),
93 ));
94 }
95 _ => {}
98 }
99 }
100 migrations
101}
102
103fn migrate_managed_cluster(old: v86::Cluster) -> v87::Cluster {
106 let v86::Cluster { key, value } = old;
107 let v86::ClusterVariant::Managed(m) = value.config.variant else {
108 unreachable!("caller guards on the managed variant");
109 };
110 v87::Cluster {
111 key: JsonCompatible::convert(&key),
112 value: v87::ClusterValue {
113 name: value.name,
114 owner_id: JsonCompatible::convert(&value.owner_id),
115 privileges: value
116 .privileges
117 .iter()
118 .map(JsonCompatible::convert)
119 .collect(),
120 config: v87::ClusterConfig {
121 workload_class: value.config.workload_class,
122 variant: v87::ClusterVariant::Managed(v87::ManagedCluster {
123 size: m.size,
124 replication_factor: m.replication_factor,
125 availability_zones: m.availability_zones,
126 logging: JsonCompatible::convert(&m.logging),
127 optimizer_feature_overrides: m
128 .optimizer_feature_overrides
129 .iter()
130 .map(JsonCompatible::convert)
131 .collect(),
132 schedule: JsonCompatible::convert(&m.schedule),
133 auto_scaling_strategy: None,
136 reconfiguration: None,
137 burst: None,
138 }),
139 },
140 },
141 }
142}
143
144fn migrate_managed_replica(
151 old: v86::ClusterReplica,
152 cluster_pool: Option<Vec<String>>,
153) -> v87::ClusterReplica {
154 let v86::ClusterReplica { key, value } = old;
155 let v86::ReplicaLocation::Managed(m) = value.config.location else {
156 unreachable!("caller guards on the managed location");
157 };
158 let availability_zones = match cluster_pool {
159 Some(pool) => pool,
163 None => m.availability_zone.into_iter().collect(),
166 };
167 v87::ClusterReplica {
168 key: JsonCompatible::convert(&key),
169 value: v87::ClusterReplicaValue {
170 cluster_id: JsonCompatible::convert(&value.cluster_id),
171 name: value.name,
172 config: v87::ReplicaConfig {
173 logging: JsonCompatible::convert(&value.config.logging),
174 location: v87::ReplicaLocation::Managed(v87::ManagedLocation {
175 size: m.size,
176 availability_zones,
177 internal: m.internal,
178 billed_as: m.billed_as,
179 pending: m.pending,
180 }),
181 },
182 owner_id: JsonCompatible::convert(&value.owner_id),
183 },
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::upgrade;
190 use crate::durable::upgrade::MigrationAction;
191 use crate::durable::upgrade::objects_v86 as v86;
192 use crate::durable::upgrade::objects_v87 as v87;
193
194 fn managed_cluster(id: u64, availability_zones: Vec<String>) -> v86::StateUpdateKind {
195 v86::StateUpdateKind::Cluster(v86::Cluster {
196 key: v86::ClusterKey {
197 id: v86::ClusterId::User(id),
198 },
199 value: v86::ClusterValue {
200 name: format!("cluster_{id}"),
201 owner_id: v86::RoleId::User(1),
202 privileges: vec![],
203 config: v86::ClusterConfig {
204 workload_class: None,
205 variant: v86::ClusterVariant::Managed(v86::ManagedCluster {
206 size: "100cc".to_string(),
207 replication_factor: 1,
208 availability_zones,
209 logging: v86::ReplicaLogging {
210 log_logging: false,
211 interval: None,
212 },
213 optimizer_feature_overrides: vec![],
214 schedule: v86::ClusterSchedule::Manual,
215 }),
216 },
217 },
218 })
219 }
220
221 fn unmanaged_cluster(id: u64) -> v86::StateUpdateKind {
222 v86::StateUpdateKind::Cluster(v86::Cluster {
223 key: v86::ClusterKey {
224 id: v86::ClusterId::User(id),
225 },
226 value: v86::ClusterValue {
227 name: format!("cluster_{id}"),
228 owner_id: v86::RoleId::User(1),
229 privileges: vec![],
230 config: v86::ClusterConfig {
231 workload_class: None,
232 variant: v86::ClusterVariant::Unmanaged,
233 },
234 },
235 })
236 }
237
238 fn managed_replica(
239 replica_id: u64,
240 cluster_id: u64,
241 availability_zone: Option<String>,
242 ) -> v86::StateUpdateKind {
243 v86::StateUpdateKind::ClusterReplica(v86::ClusterReplica {
244 key: v86::ClusterReplicaKey {
245 id: v86::ReplicaId::User(replica_id),
246 },
247 value: v86::ClusterReplicaValue {
248 cluster_id: v86::ClusterId::User(cluster_id),
249 name: format!("r{replica_id}"),
250 config: v86::ReplicaConfig {
251 logging: v86::ReplicaLogging {
252 log_logging: false,
253 interval: None,
254 },
255 location: v86::ReplicaLocation::Managed(v86::ManagedLocation {
256 size: "100cc".to_string(),
257 availability_zone,
258 internal: false,
259 billed_as: None,
260 pending: false,
261 }),
262 },
263 owner_id: v86::RoleId::User(1),
264 },
265 })
266 }
267
268 #[mz_ore::test]
269 fn test_cluster_new_fields_default_none() {
270 let migrations = upgrade(vec![managed_cluster(1, vec!["az1".to_string()])]);
271 assert_eq!(migrations.len(), 1);
272 let MigrationAction::Update(_, v87::StateUpdateKind::Cluster(cluster)) = &migrations[0]
273 else {
274 panic!("expected a cluster update");
275 };
276 let v87::ClusterVariant::Managed(managed) = &cluster.value.config.variant else {
277 panic!("expected a managed cluster");
278 };
279 assert_eq!(managed.auto_scaling_strategy, None);
280 assert_eq!(managed.reconfiguration, None);
281 assert_eq!(managed.burst, None);
282 assert_eq!(managed.availability_zones, vec!["az1".to_string()]);
284 }
285
286 #[mz_ore::test]
287 fn test_replica_backfills_cluster_azs() {
288 let azs = vec!["az1".to_string(), "az2".to_string()];
289 let migrations = upgrade(vec![
290 managed_cluster(1, azs.clone()),
291 managed_replica(10, 1, None),
292 ]);
293 assert_eq!(migrations.len(), 2);
294
295 let MigrationAction::Update(_, v87::StateUpdateKind::ClusterReplica(replica)) =
296 &migrations[1]
297 else {
298 panic!("expected a replica update");
299 };
300 let v87::ReplicaLocation::Managed(loc) = &replica.value.config.location else {
301 panic!("expected a managed location");
302 };
303 assert_eq!(loc.availability_zones, azs);
304 }
305
306 #[mz_ore::test]
307 fn test_replica_of_unmanaged_cluster_carries_pin_as_list() {
308 let migrations = upgrade(vec![
314 unmanaged_cluster(1),
315 managed_replica(10, 1, Some("az1".to_string())),
316 ]);
317 assert_eq!(migrations.len(), 1);
318 let MigrationAction::Update(_, v87::StateUpdateKind::ClusterReplica(replica)) =
319 &migrations[0]
320 else {
321 panic!("expected a replica update");
322 };
323 let v87::ReplicaLocation::Managed(loc) = &replica.value.config.location else {
324 panic!("expected a managed location");
325 };
326 assert_eq!(loc.availability_zones, vec!["az1".to_string()]);
327 }
328
329 #[mz_ore::test]
330 fn test_replica_no_cluster_azs_backfills_empty() {
331 let migrations = upgrade(vec![
332 managed_cluster(1, vec![]),
333 managed_replica(10, 1, None),
334 ]);
335 let MigrationAction::Update(_, v87::StateUpdateKind::ClusterReplica(replica)) =
336 &migrations[1]
337 else {
338 panic!("expected a replica update");
339 };
340 let v87::ReplicaLocation::Managed(loc) = &replica.value.config.location else {
341 panic!("expected a managed location");
342 };
343 assert!(loc.availability_zones.is_empty());
344 }
345
346 #[mz_ore::test]
347 fn test_unmanaged_records_are_not_rewritten() {
348 let unmanaged_replica = v86::StateUpdateKind::ClusterReplica(v86::ClusterReplica {
352 key: v86::ClusterReplicaKey {
353 id: v86::ReplicaId::User(20),
354 },
355 value: v86::ClusterReplicaValue {
356 cluster_id: v86::ClusterId::User(2),
357 name: "r20".to_string(),
358 config: v86::ReplicaConfig {
359 logging: v86::ReplicaLogging {
360 log_logging: false,
361 interval: None,
362 },
363 location: v86::ReplicaLocation::Unmanaged(v86::UnmanagedLocation {
364 storagectl_addrs: vec!["s".to_string()],
365 computectl_addrs: vec!["c".to_string()],
366 }),
367 },
368 owner_id: v86::RoleId::User(1),
369 },
370 });
371 let migrations = upgrade(vec![unmanaged_cluster(2), unmanaged_replica]);
372 assert!(migrations.is_empty());
373 }
374}