mz_catalog/durable/upgrade/
v75_to_v76.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use crate::durable::traits::{UpgradeFrom, UpgradeInto};
11use crate::durable::upgrade::MigrationAction;
12use crate::durable::upgrade::wire_compatible::{WireCompatible, wire_compatible};
13use crate::durable::upgrade::{objects_v75 as v75, objects_v76 as v76};
14
15wire_compatible!(v75::ClusterKey with v76::ClusterKey);
16wire_compatible!(v75::ClusterReplicaKey with v76::ClusterReplicaKey);
17wire_compatible!(v75::ClusterId with v76::ClusterId);
18wire_compatible!(v75::RoleId with v76::RoleId);
19wire_compatible!(v75::MzAclItem with v76::MzAclItem);
20wire_compatible!(v75::ReplicaLogging with v76::ReplicaLogging);
21wire_compatible!(v75::replica_config::UnmanagedLocation with v76::replica_config::UnmanagedLocation);
22wire_compatible!(v75::OptimizerFeatureOverride with v76::OptimizerFeatureOverride);
23wire_compatible!(v75::ClusterSchedule with v76::ClusterSchedule);
24
25/// Removes the `disk` flag from managed cluster and replica configs.
26pub fn upgrade(
27    snapshot: Vec<v75::StateUpdateKind>,
28) -> Vec<MigrationAction<v75::StateUpdateKind, v76::StateUpdateKind>> {
29    let mut migrations = Vec::new();
30    for update in snapshot {
31        match update.kind {
32            Some(v75::state_update_kind::Kind::Cluster(old_cluster)) => {
33                let new_cluster =
34                    v76::state_update_kind::Cluster::upgrade_from(old_cluster.clone());
35                let old = v75::StateUpdateKind {
36                    kind: Some(v75::state_update_kind::Kind::Cluster(old_cluster)),
37                };
38                let new = v76::StateUpdateKind {
39                    kind: Some(v76::state_update_kind::Kind::Cluster(new_cluster)),
40                };
41
42                let migration = MigrationAction::Update(old, new);
43                migrations.push(migration);
44            }
45            Some(v75::state_update_kind::Kind::ClusterReplica(old_replica)) => {
46                let new_replica =
47                    v76::state_update_kind::ClusterReplica::upgrade_from(old_replica.clone());
48                let old = v75::StateUpdateKind {
49                    kind: Some(v75::state_update_kind::Kind::ClusterReplica(old_replica)),
50                };
51                let new = v76::StateUpdateKind {
52                    kind: Some(v76::state_update_kind::Kind::ClusterReplica(new_replica)),
53                };
54
55                let migration = MigrationAction::Update(old, new);
56                migrations.push(migration);
57            }
58            _ => {}
59        }
60    }
61    migrations
62}
63
64impl UpgradeFrom<v75::state_update_kind::Cluster> for v76::state_update_kind::Cluster {
65    fn upgrade_from(old: v75::state_update_kind::Cluster) -> Self {
66        v76::state_update_kind::Cluster {
67            key: old.key.as_ref().map(WireCompatible::convert),
68            value: old.value.map(UpgradeFrom::upgrade_from),
69        }
70    }
71}
72
73impl UpgradeFrom<v75::ClusterValue> for v76::ClusterValue {
74    fn upgrade_from(old: v75::ClusterValue) -> Self {
75        v76::ClusterValue {
76            name: old.name,
77            owner_id: old.owner_id.as_ref().map(WireCompatible::convert),
78            privileges: old.privileges.iter().map(WireCompatible::convert).collect(),
79            config: old.config.map(UpgradeFrom::upgrade_from),
80        }
81    }
82}
83
84impl UpgradeFrom<v75::ClusterConfig> for v76::ClusterConfig {
85    fn upgrade_from(old: v75::ClusterConfig) -> Self {
86        v76::ClusterConfig {
87            workload_class: old.workload_class,
88            variant: old.variant.map(UpgradeFrom::upgrade_from),
89        }
90    }
91}
92
93impl UpgradeFrom<v75::cluster_config::Variant> for v76::cluster_config::Variant {
94    fn upgrade_from(old: v75::cluster_config::Variant) -> Self {
95        match old {
96            v75::cluster_config::Variant::Unmanaged(_empty) => {
97                v76::cluster_config::Variant::Unmanaged(v76::Empty {})
98            }
99            v75::cluster_config::Variant::Managed(managed) => {
100                v76::cluster_config::Variant::Managed(managed.upgrade_into())
101            }
102        }
103    }
104}
105
106impl UpgradeFrom<v75::cluster_config::ManagedCluster> for v76::cluster_config::ManagedCluster {
107    fn upgrade_from(loc: v75::cluster_config::ManagedCluster) -> Self {
108        v76::cluster_config::ManagedCluster {
109            size: loc.size,
110            replication_factor: loc.replication_factor,
111            availability_zones: loc.availability_zones,
112            logging: loc.logging.as_ref().map(WireCompatible::convert),
113            optimizer_feature_overrides: loc
114                .optimizer_feature_overrides
115                .iter()
116                .map(WireCompatible::convert)
117                .collect(),
118            schedule: loc.schedule.as_ref().map(WireCompatible::convert),
119        }
120    }
121}
122
123impl UpgradeFrom<v75::state_update_kind::ClusterReplica>
124    for v76::state_update_kind::ClusterReplica
125{
126    fn upgrade_from(old: v75::state_update_kind::ClusterReplica) -> Self {
127        v76::state_update_kind::ClusterReplica {
128            key: old.key.as_ref().map(WireCompatible::convert),
129            value: old.value.map(UpgradeFrom::upgrade_from),
130        }
131    }
132}
133
134impl UpgradeFrom<v75::ClusterReplicaValue> for v76::ClusterReplicaValue {
135    fn upgrade_from(old: v75::ClusterReplicaValue) -> Self {
136        v76::ClusterReplicaValue {
137            cluster_id: old.cluster_id.as_ref().map(WireCompatible::convert),
138            name: old.name,
139            config: old.config.map(UpgradeFrom::upgrade_from),
140            owner_id: old.owner_id.as_ref().map(WireCompatible::convert),
141        }
142    }
143}
144
145impl UpgradeFrom<v75::ReplicaConfig> for v76::ReplicaConfig {
146    fn upgrade_from(old: v75::ReplicaConfig) -> Self {
147        v76::ReplicaConfig {
148            logging: old.logging.as_ref().map(WireCompatible::convert),
149            location: old.location.map(UpgradeFrom::upgrade_from),
150        }
151    }
152}
153
154impl UpgradeFrom<v75::replica_config::Location> for v76::replica_config::Location {
155    fn upgrade_from(old: v75::replica_config::Location) -> Self {
156        match old {
157            v75::replica_config::Location::Unmanaged(loc) => {
158                v76::replica_config::Location::Unmanaged(WireCompatible::convert(&loc))
159            }
160            v75::replica_config::Location::Managed(loc) => {
161                v76::replica_config::Location::Managed(loc.upgrade_into())
162            }
163        }
164    }
165}
166
167impl UpgradeFrom<v75::replica_config::ManagedLocation> for v76::replica_config::ManagedLocation {
168    fn upgrade_from(loc: v75::replica_config::ManagedLocation) -> Self {
169        v76::replica_config::ManagedLocation {
170            size: loc.size,
171            availability_zone: loc.availability_zone,
172            internal: loc.internal,
173            billed_as: loc.billed_as,
174            pending: loc.pending,
175        }
176    }
177}