mz_catalog/durable/upgrade/
v81_to_v82.rs1use crate::durable::upgrade::MigrationAction;
11use crate::durable::upgrade::objects_v81 as v81;
12use crate::durable::upgrade::objects_v82 as v82;
13
14pub fn upgrade(
21 snapshot: Vec<v81::StateUpdateKind>,
22) -> Vec<MigrationAction<v81::StateUpdateKind, v82::StateUpdateKind>> {
23 for update in &snapshot {
24 match update {
25 v81::StateUpdateKind::Item(item) => {
26 let is_ct = match &item.value.definition {
27 v81::CatalogItem::V1(v1) => v1.create_sql.starts_with("CREATE CONTINUAL TASK"),
28 };
29 if is_ct {
30 panic!(
31 "found continual task {:?} in catalog; \
32 drop all continual tasks before upgrading",
33 item.value.name
34 );
35 }
36 }
37 v81::StateUpdateKind::Comment(comment) => {
38 if matches!(comment.key.object, v81::CommentObject::ContinualTask(_)) {
39 panic!(
40 "found comment on continual task in catalog; \
41 drop all continual tasks before upgrading"
42 );
43 }
44 }
45 _ => {}
46 }
47 }
48
49 Vec::new()
50}
51
52#[cfg(test)]
53mod tests {
54 use super::upgrade;
55 use crate::durable::upgrade::objects_v81 as v81;
56
57 fn make_item(gid: u64, create_sql: &str) -> v81::StateUpdateKind {
58 v81::StateUpdateKind::Item(v81::Item {
59 key: v81::ItemKey {
60 gid: v81::CatalogItemId::User(gid),
61 },
62 value: v81::ItemValue {
63 schema_id: v81::SchemaId::User(1),
64 name: "test".to_string(),
65 definition: v81::CatalogItem::V1(v81::CatalogItemV1 {
66 create_sql: create_sql.to_string(),
67 }),
68 owner_id: v81::RoleId::User(1),
69 privileges: vec![],
70 oid: 20000,
71 global_id: v81::GlobalId::User(gid),
72 extra_versions: vec![],
73 },
74 })
75 }
76
77 fn make_comment_on_ct(item_id: u64) -> v81::StateUpdateKind {
78 v81::StateUpdateKind::Comment(v81::Comment {
79 key: v81::CommentKey {
80 object: v81::CommentObject::ContinualTask(v81::CatalogItemId::User(item_id)),
81 sub_component: None,
82 },
83 value: v81::CommentValue {
84 comment: "test comment".to_string(),
85 },
86 })
87 }
88
89 fn make_comment_on_table(item_id: u64) -> v81::StateUpdateKind {
90 v81::StateUpdateKind::Comment(v81::Comment {
91 key: v81::CommentKey {
92 object: v81::CommentObject::Table(v81::CatalogItemId::User(item_id)),
93 sub_component: None,
94 },
95 value: v81::CommentValue {
96 comment: "table comment".to_string(),
97 },
98 })
99 }
100
101 #[mz_ore::test]
102 #[should_panic(expected = "found continual task")]
103 fn test_panics_on_continual_task_item() {
104 let snapshot = vec![
105 make_item(1, "CREATE TABLE foo (a INT)"),
106 make_item(2, "CREATE CONTINUAL TASK ct1 ..."),
107 make_item(3, "CREATE VIEW v AS SELECT 1"),
108 ];
109 upgrade(snapshot);
110 }
111
112 #[mz_ore::test]
113 #[should_panic(expected = "found comment on continual task")]
114 fn test_panics_on_continual_task_comment() {
115 let snapshot = vec![make_comment_on_ct(1), make_comment_on_table(2)];
116 upgrade(snapshot);
117 }
118
119 #[mz_ore::test]
120 fn test_no_ct_returns_empty() {
121 let snapshot = vec![
122 make_item(1, "CREATE TABLE foo (a INT)"),
123 make_comment_on_table(1),
124 ];
125 let migrations = upgrade(snapshot);
126 assert!(migrations.is_empty());
127 }
128}