mz_catalog/durable/upgrade/
v67_to_v68.rs1use crate::durable::traits::{UpgradeFrom, UpgradeInto};
11use crate::durable::upgrade::MigrationAction;
12use crate::durable::upgrade::wire_compatible::{WireCompatible, wire_compatible};
13use crate::durable::upgrade::{objects_v67 as v67, objects_v68 as v68};
14
15wire_compatible!(v67::GlobalId with v68::GlobalId);
16wire_compatible!(v67::CatalogItem with v68::CatalogItem);
17wire_compatible!(v67::SchemaId with v68::SchemaId);
18wire_compatible!(v67::CommentValue with v68::CommentValue);
19wire_compatible!(v67::RoleId with v68::RoleId);
20wire_compatible!(v67::MzAclItem with v68::MzAclItem);
21wire_compatible!(v67::DatabaseId with v68::DatabaseId);
22wire_compatible!(v67::ResolvedSchema with v68::ResolvedSchema);
23wire_compatible!(v67::ClusterId with v68::ClusterId);
24wire_compatible!(v67::ClusterReplicaId with v68::ClusterReplicaId);
25wire_compatible!(v67::SourceReferencesValue with v68::SourceReferencesValue);
26wire_compatible!(v67::GidMappingKey with v68::GidMappingKey);
27wire_compatible!(v67::ClusterIntrospectionSourceIndexKey with v68::ClusterIntrospectionSourceIndexKey);
28
29pub fn upgrade(
40    snapshot: Vec<v67::StateUpdateKind>,
41) -> Vec<MigrationAction<v67::StateUpdateKind, v68::StateUpdateKind>> {
42    snapshot
43        .iter()
44        .filter_map(|update| match &update.kind {
45            Some(v67::state_update_kind::Kind::Item(old_item)) => {
46                let new_item = v68::state_update_kind::Item::upgrade_from(old_item.clone());
48
49                let old_item = v67::StateUpdateKind {
50                    kind: Some(v67::state_update_kind::Kind::Item(old_item.clone())),
51                };
52                let new_item = v68::StateUpdateKind {
53                    kind: Some(v68::state_update_kind::Kind::Item(new_item)),
54                };
55
56                Some(MigrationAction::Update(old_item, new_item))
57            }
58            Some(v67::state_update_kind::Kind::Comment(old_comment)) => {
59                let new_comment =
61                    v68::state_update_kind::Comment::upgrade_from(old_comment.clone());
62
63                let old_comment = v67::StateUpdateKind {
64                    kind: Some(v67::state_update_kind::Kind::Comment(old_comment.clone())),
65                };
66                let new_comment = v68::StateUpdateKind {
67                    kind: Some(v68::state_update_kind::Kind::Comment(new_comment.clone())),
68                };
69
70                Some(MigrationAction::Update(old_comment, new_comment))
71            }
72            Some(v67::state_update_kind::Kind::SourceReferences(old_reference)) => {
73                let new_reference =
75                    v68::state_update_kind::SourceReferences::upgrade_from(old_reference.clone());
76
77                let old_reference = v67::StateUpdateKind {
78                    kind: Some(v67::state_update_kind::Kind::SourceReferences(
79                        old_reference.clone(),
80                    )),
81                };
82                let new_reference = v68::StateUpdateKind {
83                    kind: Some(v68::state_update_kind::Kind::SourceReferences(
84                        new_reference,
85                    )),
86                };
87
88                Some(MigrationAction::Update(old_reference, new_reference))
89            }
90            Some(v67::state_update_kind::Kind::GidMapping(old_mapping)) => {
91                let new_mapping = v68::state_update_kind::GidMapping {
92                    key: old_mapping.key.as_ref().map(v68::GidMappingKey::convert),
93                    value: old_mapping
95                        .value
96                        .as_ref()
97                        .map(|old| v68::GidMappingValue::upgrade_from(old.clone())),
98                };
99
100                let old_mapping = v67::StateUpdateKind {
101                    kind: Some(v67::state_update_kind::Kind::GidMapping(
102                        old_mapping.clone(),
103                    )),
104                };
105                let new_mapping = v68::StateUpdateKind {
106                    kind: Some(v68::state_update_kind::Kind::GidMapping(new_mapping)),
107                };
108
109                Some(MigrationAction::Update(old_mapping, new_mapping))
110            }
111            Some(v67::state_update_kind::Kind::ClusterIntrospectionSourceIndex(old_index)) => {
112                let new_index = v68::state_update_kind::ClusterIntrospectionSourceIndex {
113                    key: old_index
114                        .key
115                        .as_ref()
116                        .map(v68::ClusterIntrospectionSourceIndexKey::convert),
117                    value: old_index.value.as_ref().map(|old| {
118                        v68::ClusterIntrospectionSourceIndexValue::upgrade_from(old.clone())
119                    }),
120                };
121
122                let old_index = v67::StateUpdateKind {
123                    kind: Some(
124                        v67::state_update_kind::Kind::ClusterIntrospectionSourceIndex(
125                            old_index.clone(),
126                        ),
127                    ),
128                };
129                let new_index = v68::StateUpdateKind {
130                    kind: Some(
131                        v68::state_update_kind::Kind::ClusterIntrospectionSourceIndex(new_index),
132                    ),
133                };
134
135                Some(MigrationAction::Update(old_index, new_index))
136            }
137            _ => None,
138        })
139        .collect()
140}
141
142impl UpgradeFrom<v67::state_update_kind::Item> for v68::state_update_kind::Item {
143    fn upgrade_from(value: v67::state_update_kind::Item) -> Self {
144        let new_key = value.key.map(|k| v68::ItemKey {
145            gid: k.gid.map(v68::CatalogItemId::upgrade_from),
146        });
147        let new_val = value.value.map(|val| v68::ItemValue {
148            global_id: value
149                .key
150                .as_ref()
151                .and_then(|k| k.gid)
152                .map(|gid| v68::GlobalId::convert(&gid)),
153            schema_id: val.schema_id.map(|id| v68::SchemaId::convert(&id)),
154            definition: val.definition.map(|def| v68::CatalogItem::convert(&def)),
155            name: val.name,
156            owner_id: val.owner_id.map(|id| v68::RoleId::convert(&id)),
157            privileges: val
158                .privileges
159                .into_iter()
160                .map(|item| v68::MzAclItem::convert(&item))
161                .collect(),
162            oid: val.oid,
163            extra_versions: Vec::new(),
165        });
166
167        v68::state_update_kind::Item {
168            key: new_key,
169            value: new_val,
170        }
171    }
172}
173
174impl UpgradeFrom<v67::state_update_kind::Comment> for v68::state_update_kind::Comment {
175    fn upgrade_from(value: v67::state_update_kind::Comment) -> Self {
176        let new_key = value.key.map(|k| v68::CommentKey {
177            object: k.object.map(v68::comment_key::Object::upgrade_from),
178            sub_component: k
179                .sub_component
180                .map(v68::comment_key::SubComponent::upgrade_from),
181        });
182        let new_val = value.value.map(|v| v68::CommentValue::convert(&v));
183
184        v68::state_update_kind::Comment {
185            key: new_key,
186            value: new_val,
187        }
188    }
189}
190
191impl UpgradeFrom<v67::comment_key::Object> for v68::comment_key::Object {
192    fn upgrade_from(value: v67::comment_key::Object) -> Self {
193        match value {
194            v67::comment_key::Object::Table(global_id) => {
195                v68::comment_key::Object::Table(v68::CatalogItemId::upgrade_from(global_id))
196            }
197            v67::comment_key::Object::View(global_id) => {
198                v68::comment_key::Object::View(v68::CatalogItemId::upgrade_from(global_id))
199            }
200            v67::comment_key::Object::MaterializedView(global_id) => {
201                v68::comment_key::Object::MaterializedView(v68::CatalogItemId::upgrade_from(
202                    global_id,
203                ))
204            }
205            v67::comment_key::Object::Source(global_id) => {
206                v68::comment_key::Object::Source(v68::CatalogItemId::upgrade_from(global_id))
207            }
208            v67::comment_key::Object::Sink(global_id) => {
209                v68::comment_key::Object::Sink(v68::CatalogItemId::upgrade_from(global_id))
210            }
211            v67::comment_key::Object::Index(global_id) => {
212                v68::comment_key::Object::Index(v68::CatalogItemId::upgrade_from(global_id))
213            }
214            v67::comment_key::Object::Func(global_id) => {
215                v68::comment_key::Object::Func(v68::CatalogItemId::upgrade_from(global_id))
216            }
217            v67::comment_key::Object::Connection(global_id) => {
218                v68::comment_key::Object::Connection(v68::CatalogItemId::upgrade_from(global_id))
219            }
220            v67::comment_key::Object::Type(global_id) => {
221                v68::comment_key::Object::Type(v68::CatalogItemId::upgrade_from(global_id))
222            }
223            v67::comment_key::Object::Secret(global_id) => {
224                v68::comment_key::Object::Secret(v68::CatalogItemId::upgrade_from(global_id))
225            }
226            v67::comment_key::Object::ContinualTask(global_id) => {
227                v68::comment_key::Object::ContinualTask(v68::CatalogItemId::upgrade_from(global_id))
228            }
229            v67::comment_key::Object::Role(role_id) => {
230                v68::comment_key::Object::Role(v68::RoleId::convert(&role_id))
231            }
232            v67::comment_key::Object::Database(database_id) => {
233                v68::comment_key::Object::Database(v68::DatabaseId::convert(&database_id))
234            }
235            v67::comment_key::Object::Schema(resolved_schema) => {
236                v68::comment_key::Object::Schema(v68::ResolvedSchema::convert(&resolved_schema))
237            }
238            v67::comment_key::Object::Cluster(cluster_id) => {
239                v68::comment_key::Object::Cluster(v68::ClusterId::convert(&cluster_id))
240            }
241            v67::comment_key::Object::ClusterReplica(cluster_replica_id) => {
242                v68::comment_key::Object::ClusterReplica(v68::ClusterReplicaId::convert(
243                    &cluster_replica_id,
244                ))
245            }
246        }
247    }
248}
249
250impl UpgradeFrom<v67::comment_key::SubComponent> for v68::comment_key::SubComponent {
251    fn upgrade_from(value: v67::comment_key::SubComponent) -> Self {
252        match value {
253            v67::comment_key::SubComponent::ColumnPos(x) => {
254                v68::comment_key::SubComponent::ColumnPos(x)
255            }
256        }
257    }
258}
259
260impl UpgradeFrom<v67::GlobalId> for v68::CatalogItemId {
261    fn upgrade_from(id: v67::GlobalId) -> Self {
262        let value = match id.value {
263            Some(v67::global_id::Value::System(x)) => Some(v68::catalog_item_id::Value::System(x)),
264            Some(v67::global_id::Value::User(x)) => Some(v68::catalog_item_id::Value::User(x)),
265            Some(v67::global_id::Value::Transient(x)) => {
266                Some(v68::catalog_item_id::Value::Transient(x))
267            }
268            None => None,
269            Some(v67::global_id::Value::Explain(_)) => unreachable!("shouldn't persist Explain"),
270        };
271        v68::CatalogItemId { value }
272    }
273}
274
275impl UpgradeFrom<v67::state_update_kind::SourceReferences>
276    for v68::state_update_kind::SourceReferences
277{
278    fn upgrade_from(old: v67::state_update_kind::SourceReferences) -> Self {
279        v68::state_update_kind::SourceReferences {
280            key: old.key.map(|old| old.upgrade_into()),
281            value: old.value.map(|old| WireCompatible::convert(&old)),
282        }
283    }
284}
285
286impl UpgradeFrom<v67::SourceReferencesKey> for v68::SourceReferencesKey {
287    fn upgrade_from(value: v67::SourceReferencesKey) -> Self {
288        let source = match value.source {
289            Some(gid) => Some(gid.upgrade_into()),
290            None => None,
291        };
292        v68::SourceReferencesKey { source }
293    }
294}
295
296impl UpgradeFrom<v67::GidMappingValue> for v68::GidMappingValue {
297    fn upgrade_from(value: v67::GidMappingValue) -> Self {
298        v68::GidMappingValue {
299            id: value.id,
300            global_id: Some(v68::SystemGlobalId { value: value.id }),
301            fingerprint: value.fingerprint,
302        }
303    }
304}
305
306impl UpgradeFrom<v67::ClusterIntrospectionSourceIndexValue>
307    for v68::ClusterIntrospectionSourceIndexValue
308{
309    fn upgrade_from(value: v67::ClusterIntrospectionSourceIndexValue) -> Self {
310        v68::ClusterIntrospectionSourceIndexValue {
311            index_id: value.index_id,
312            global_id: Some(v68::SystemGlobalId {
313                value: value.index_id,
314            }),
315            oid: value.oid,
316        }
317    }
318}