Skip to main content

mz_catalog/durable/upgrade/
v86_to_v87.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use crate::durable::upgrade::MigrationAction;
13use crate::durable::upgrade::json_compatible::JsonCompatible;
14use crate::durable::upgrade::objects_v86 as v86;
15use crate::durable::upgrade::objects_v87 as v87;
16
17// Every sub-structure carried across the version unchanged. The only shapes
18// that actually gained fields between v86 and v87 are the managed `Cluster`
19// variant and the managed `ReplicaLocation`; everything else is JSON-identical
20// and is moved across via `JsonCompatible::convert`. The macro also generates
21// proptest round-trips asserting the two encodings stay byte-compatible.
22crate::json_compatible!(v86::ClusterKey with v87::ClusterKey);
23crate::json_compatible!(v86::RoleId with v87::RoleId);
24crate::json_compatible!(v86::MzAclItem with v87::MzAclItem);
25crate::json_compatible!(v86::ReplicaLogging with v87::ReplicaLogging);
26crate::json_compatible!(v86::OptimizerFeatureOverride with v87::OptimizerFeatureOverride);
27crate::json_compatible!(v86::ClusterSchedule with v87::ClusterSchedule);
28crate::json_compatible!(v86::ClusterReplicaKey with v87::ClusterReplicaKey);
29crate::json_compatible!(v86::ClusterId with v87::ClusterId);
30crate::json_compatible!(v86::UnmanagedLocation with v87::UnmanagedLocation);
31
32/// Adds the additive, behaviorally-inert durable cluster-controller state and
33/// reshapes the managed replica location's availability-zone field:
34///
35///   - `ManagedCluster` gains `auto_scaling_strategy`, `reconfiguration`, and
36///     `burst`, all defaulted to `None`.
37///   - The managed `ReplicaLocation`'s single `availability_zone` user-pin
38///     becomes an `availability_zones` list recording the zones the replica was
39///     provisioned under: for a replica of a managed cluster the owning
40///     *cluster*'s current `availability_zones` (the placement pool); for a
41///     replica of an unmanaged cluster its prior single pin, carried across as
42///     a zero- or one-element list.
43///
44/// Only the managed `Cluster` variant and managed `ReplicaLocation` changed
45/// shape, so only those records are rewritten. Unmanaged clusters and unmanaged
46/// replica locations are JSON-identical in v87 and are left untouched —
47/// emitting an update for them would just retract and re-add the same bytes.
48pub fn upgrade(
49    snapshot: Vec<v86::StateUpdateKind>,
50) -> Vec<MigrationAction<v86::StateUpdateKind, v87::StateUpdateKind>> {
51    // The per-replica provisioned-AZ list is backfilled from the owning
52    // cluster's `availability_zones`, so collect every managed cluster's list
53    // before rewriting replicas. Unmanaged clusters contribute no entry, so a
54    // replica of one backfills the empty list.
55    let mut cluster_azs: BTreeMap<v86::ClusterId, Vec<String>> = BTreeMap::new();
56    for update in &snapshot {
57        if let v86::StateUpdateKind::Cluster(cluster) = update {
58            if let v86::ClusterVariant::Managed(managed) = &cluster.value.config.variant {
59                cluster_azs.insert(cluster.key.id.clone(), managed.availability_zones.clone());
60            }
61        }
62    }
63
64    let mut migrations = Vec::new();
65    for update in snapshot {
66        match update {
67            v86::StateUpdateKind::Cluster(old_cluster)
68                if matches!(
69                    old_cluster.value.config.variant,
70                    v86::ClusterVariant::Managed(_)
71                ) =>
72            {
73                let new_cluster = migrate_managed_cluster(old_cluster.clone());
74                migrations.push(MigrationAction::Update(
75                    v86::StateUpdateKind::Cluster(old_cluster),
76                    v87::StateUpdateKind::Cluster(new_cluster),
77                ));
78            }
79            v86::StateUpdateKind::ClusterReplica(old_replica)
80                if matches!(
81                    old_replica.value.config.location,
82                    v86::ReplicaLocation::Managed(_)
83                ) =>
84            {
85                // `Some(pool)` for a replica of a managed cluster (the cluster's
86                // AZ list, possibly empty); `None` for a replica of an unmanaged
87                // cluster, which contributes no entry and keeps its own pin.
88                let cluster_pool = cluster_azs.get(&old_replica.value.cluster_id).cloned();
89                let new_replica = migrate_managed_replica(old_replica.clone(), cluster_pool);
90                migrations.push(MigrationAction::Update(
91                    v86::StateUpdateKind::ClusterReplica(old_replica),
92                    v87::StateUpdateKind::ClusterReplica(new_replica),
93                ));
94            }
95            // Unmanaged clusters and unmanaged locations are JSON-identical
96            // across the version; nothing else changed shape.
97            _ => {}
98        }
99    }
100    migrations
101}
102
103/// Rewrites a managed `Cluster`, reconstructing only the `ManagedCluster` that
104/// gained fields and carrying every other sub-structure across unchanged.
105fn migrate_managed_cluster(old: v86::Cluster) -> v87::Cluster {
106    let v86::Cluster { key, value } = old;
107    let v86::ClusterVariant::Managed(m) = value.config.variant else {
108        unreachable!("caller guards on the managed variant");
109    };
110    v87::Cluster {
111        key: JsonCompatible::convert(&key),
112        value: v87::ClusterValue {
113            name: value.name,
114            owner_id: JsonCompatible::convert(&value.owner_id),
115            privileges: value
116                .privileges
117                .iter()
118                .map(JsonCompatible::convert)
119                .collect(),
120            config: v87::ClusterConfig {
121                workload_class: value.config.workload_class,
122                variant: v87::ClusterVariant::Managed(v87::ManagedCluster {
123                    size: m.size,
124                    replication_factor: m.replication_factor,
125                    availability_zones: m.availability_zones,
126                    logging: JsonCompatible::convert(&m.logging),
127                    optimizer_feature_overrides: m
128                        .optimizer_feature_overrides
129                        .iter()
130                        .map(JsonCompatible::convert)
131                        .collect(),
132                    schedule: JsonCompatible::convert(&m.schedule),
133                    // Additive, defaulted: no policy or in-flight state for
134                    // existing clusters.
135                    auto_scaling_strategy: None,
136                    reconfiguration: None,
137                    burst: None,
138                }),
139            },
140        },
141    }
142}
143
144/// Rewrites a `ClusterReplica` with a managed location, reconstructing only the
145/// `ManagedLocation` whose `availability_zone` user-pin became an
146/// `availability_zones` list and carrying every other sub-structure across
147/// unchanged. `cluster_pool` is `Some` (the owning managed cluster's AZ list)
148/// for a replica of a managed cluster and `None` for a replica of an unmanaged
149/// cluster.
150fn migrate_managed_replica(
151    old: v86::ClusterReplica,
152    cluster_pool: Option<Vec<String>>,
153) -> v87::ClusterReplica {
154    let v86::ClusterReplica { key, value } = old;
155    let v86::ReplicaLocation::Managed(m) = value.config.location else {
156        unreachable!("caller guards on the managed location");
157    };
158    let availability_zones = match cluster_pool {
159        // Managed cluster: provisioned under the cluster's `AVAILABILITY ZONES`
160        // pool. (A managed cluster's replica never carries its own single-AZ
161        // pin, so `m.availability_zone` is always `None` here.)
162        Some(pool) => pool,
163        // Unmanaged cluster: carry the replica's user-pinned `AVAILABILITY ZONE`
164        // across as a zero- or one-element list.
165        None => m.availability_zone.into_iter().collect(),
166    };
167    v87::ClusterReplica {
168        key: JsonCompatible::convert(&key),
169        value: v87::ClusterReplicaValue {
170            cluster_id: JsonCompatible::convert(&value.cluster_id),
171            name: value.name,
172            config: v87::ReplicaConfig {
173                logging: JsonCompatible::convert(&value.config.logging),
174                location: v87::ReplicaLocation::Managed(v87::ManagedLocation {
175                    size: m.size,
176                    availability_zones,
177                    internal: m.internal,
178                    billed_as: m.billed_as,
179                    pending: m.pending,
180                }),
181            },
182            owner_id: JsonCompatible::convert(&value.owner_id),
183        },
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::upgrade;
190    use crate::durable::upgrade::MigrationAction;
191    use crate::durable::upgrade::objects_v86 as v86;
192    use crate::durable::upgrade::objects_v87 as v87;
193
194    fn managed_cluster(id: u64, availability_zones: Vec<String>) -> v86::StateUpdateKind {
195        v86::StateUpdateKind::Cluster(v86::Cluster {
196            key: v86::ClusterKey {
197                id: v86::ClusterId::User(id),
198            },
199            value: v86::ClusterValue {
200                name: format!("cluster_{id}"),
201                owner_id: v86::RoleId::User(1),
202                privileges: vec![],
203                config: v86::ClusterConfig {
204                    workload_class: None,
205                    variant: v86::ClusterVariant::Managed(v86::ManagedCluster {
206                        size: "100cc".to_string(),
207                        replication_factor: 1,
208                        availability_zones,
209                        logging: v86::ReplicaLogging {
210                            log_logging: false,
211                            interval: None,
212                        },
213                        optimizer_feature_overrides: vec![],
214                        schedule: v86::ClusterSchedule::Manual,
215                    }),
216                },
217            },
218        })
219    }
220
221    fn unmanaged_cluster(id: u64) -> v86::StateUpdateKind {
222        v86::StateUpdateKind::Cluster(v86::Cluster {
223            key: v86::ClusterKey {
224                id: v86::ClusterId::User(id),
225            },
226            value: v86::ClusterValue {
227                name: format!("cluster_{id}"),
228                owner_id: v86::RoleId::User(1),
229                privileges: vec![],
230                config: v86::ClusterConfig {
231                    workload_class: None,
232                    variant: v86::ClusterVariant::Unmanaged,
233                },
234            },
235        })
236    }
237
238    fn managed_replica(
239        replica_id: u64,
240        cluster_id: u64,
241        availability_zone: Option<String>,
242    ) -> v86::StateUpdateKind {
243        v86::StateUpdateKind::ClusterReplica(v86::ClusterReplica {
244            key: v86::ClusterReplicaKey {
245                id: v86::ReplicaId::User(replica_id),
246            },
247            value: v86::ClusterReplicaValue {
248                cluster_id: v86::ClusterId::User(cluster_id),
249                name: format!("r{replica_id}"),
250                config: v86::ReplicaConfig {
251                    logging: v86::ReplicaLogging {
252                        log_logging: false,
253                        interval: None,
254                    },
255                    location: v86::ReplicaLocation::Managed(v86::ManagedLocation {
256                        size: "100cc".to_string(),
257                        availability_zone,
258                        internal: false,
259                        billed_as: None,
260                        pending: false,
261                    }),
262                },
263                owner_id: v86::RoleId::User(1),
264            },
265        })
266    }
267
268    #[mz_ore::test]
269    fn test_cluster_new_fields_default_none() {
270        let migrations = upgrade(vec![managed_cluster(1, vec!["az1".to_string()])]);
271        assert_eq!(migrations.len(), 1);
272        let MigrationAction::Update(_, v87::StateUpdateKind::Cluster(cluster)) = &migrations[0]
273        else {
274            panic!("expected a cluster update");
275        };
276        let v87::ClusterVariant::Managed(managed) = &cluster.value.config.variant else {
277            panic!("expected a managed cluster");
278        };
279        assert_eq!(managed.auto_scaling_strategy, None);
280        assert_eq!(managed.reconfiguration, None);
281        assert_eq!(managed.burst, None);
282        // Existing fields are preserved.
283        assert_eq!(managed.availability_zones, vec!["az1".to_string()]);
284    }
285
286    #[mz_ore::test]
287    fn test_replica_backfills_cluster_azs() {
288        let azs = vec!["az1".to_string(), "az2".to_string()];
289        let migrations = upgrade(vec![
290            managed_cluster(1, azs.clone()),
291            managed_replica(10, 1, None),
292        ]);
293        assert_eq!(migrations.len(), 2);
294
295        let MigrationAction::Update(_, v87::StateUpdateKind::ClusterReplica(replica)) =
296            &migrations[1]
297        else {
298            panic!("expected a replica update");
299        };
300        let v87::ReplicaLocation::Managed(loc) = &replica.value.config.location else {
301            panic!("expected a managed location");
302        };
303        assert_eq!(loc.availability_zones, azs);
304    }
305
306    #[mz_ore::test]
307    fn test_replica_of_unmanaged_cluster_carries_pin_as_list() {
308        // A managed location can carry a single-AZ user-pin only when its
309        // owning cluster is unmanaged. That pin is carried across as a
310        // one-element `availability_zones` list. The unmanaged cluster itself
311        // is JSON-identical and emits no migration, so the replica is the only
312        // action.
313        let migrations = upgrade(vec![
314            unmanaged_cluster(1),
315            managed_replica(10, 1, Some("az1".to_string())),
316        ]);
317        assert_eq!(migrations.len(), 1);
318        let MigrationAction::Update(_, v87::StateUpdateKind::ClusterReplica(replica)) =
319            &migrations[0]
320        else {
321            panic!("expected a replica update");
322        };
323        let v87::ReplicaLocation::Managed(loc) = &replica.value.config.location else {
324            panic!("expected a managed location");
325        };
326        assert_eq!(loc.availability_zones, vec!["az1".to_string()]);
327    }
328
329    #[mz_ore::test]
330    fn test_replica_no_cluster_azs_backfills_empty() {
331        let migrations = upgrade(vec![
332            managed_cluster(1, vec![]),
333            managed_replica(10, 1, None),
334        ]);
335        let MigrationAction::Update(_, v87::StateUpdateKind::ClusterReplica(replica)) =
336            &migrations[1]
337        else {
338            panic!("expected a replica update");
339        };
340        let v87::ReplicaLocation::Managed(loc) = &replica.value.config.location else {
341            panic!("expected a managed location");
342        };
343        assert!(loc.availability_zones.is_empty());
344    }
345
346    #[mz_ore::test]
347    fn test_unmanaged_records_are_not_rewritten() {
348        // Unmanaged clusters and unmanaged replica locations are JSON-identical
349        // between v86 and v87, so the migration leaves them untouched rather
350        // than emitting a no-op retract+add of the same bytes.
351        let unmanaged_replica = v86::StateUpdateKind::ClusterReplica(v86::ClusterReplica {
352            key: v86::ClusterReplicaKey {
353                id: v86::ReplicaId::User(20),
354            },
355            value: v86::ClusterReplicaValue {
356                cluster_id: v86::ClusterId::User(2),
357                name: "r20".to_string(),
358                config: v86::ReplicaConfig {
359                    logging: v86::ReplicaLogging {
360                        log_logging: false,
361                        interval: None,
362                    },
363                    location: v86::ReplicaLocation::Unmanaged(v86::UnmanagedLocation {
364                        storagectl_addrs: vec!["s".to_string()],
365                        computectl_addrs: vec!["c".to_string()],
366                    }),
367                },
368                owner_id: v86::RoleId::User(1),
369            },
370        });
371        let migrations = upgrade(vec![unmanaged_cluster(2), unmanaged_replica]);
372        assert!(migrations.is_empty());
373    }
374}