1use std::collections::BTreeMap;
64
65use mz_repr::Diff;
66
67use crate::durable::objects::state_update::{StateUpdate, StateUpdateKindJson};
68use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
69use crate::durable::upgrade::objects_v83 as v83;
70use crate::durable::{CatalogError, initialize::USER_VERSION_KEY};
71
72const FROM_VERSION: u64 = 82;
73const TO_VERSION: u64 = 83;
74
75#[derive(Debug, Default, PartialEq, Eq)]
78pub(crate) struct RepairStats {
79 pub repaired: usize,
82 pub stale_retracted: usize,
85 pub skipped_role: usize,
87 pub skipped_non_role: usize,
90}
91
92pub async fn upgrade(
93 unopened_catalog_state: &mut UnopenedPersistCatalogState,
94 mut commit_ts: Timestamp,
95) -> Result<(u64, Timestamp), CatalogError> {
96 tracing::info!(
97 from_version = FROM_VERSION,
98 to_version = TO_VERSION,
99 "running versioned Catalog upgrade (repair Role row drift)",
100 );
101
102 let (repairs, stats) = compute_repairs(&unopened_catalog_state.snapshot);
103
104 if !repairs.is_empty() {
105 tracing::info!(
106 repaired = stats.repaired,
107 stale_retracted = stats.stale_retracted,
108 "repairing Role rows left inconsistent by the v80->v81 migration's non-cloud no-op",
109 );
110 }
111 if stats.skipped_role > 0 || stats.skipped_non_role > 0 {
112 tracing::warn!(
113 skipped_role = stats.skipped_role,
114 skipped_non_role = stats.skipped_non_role,
115 "left dangling diffs that did not fit the v80-form-drift signature; review the WARN events emitted above",
116 );
117 }
118
119 let mut updates: Vec<(StateUpdateKindJson, Diff)> = repairs;
120 updates.push((version_update_kind(FROM_VERSION), Diff::MINUS_ONE));
121 updates.push((version_update_kind(TO_VERSION), Diff::ONE));
122
123 if matches!(unopened_catalog_state.mode, Mode::Writable) {
124 commit_ts = unopened_catalog_state
125 .compare_and_append(updates, commit_ts)
126 .await
127 .map_err(|e| e.unwrap_fence_error())?;
128 } else {
129 let ts = commit_ts;
130 let updates = updates
131 .into_iter()
132 .map(|(kind, diff)| StateUpdate { kind, ts, diff });
133 commit_ts = commit_ts.step_forward();
134 unopened_catalog_state.apply_updates_and_consolidate(updates)?;
135 }
136
137 unopened_catalog_state.consolidate();
138 Ok((TO_VERSION, commit_ts))
139}
140
141pub(crate) fn compute_repairs(
156 snapshot: &[(StateUpdateKindJson, Timestamp, Diff)],
157) -> (Vec<(StateUpdateKindJson, Diff)>, RepairStats) {
158 let mut role_plus_ones: BTreeMap<v83::RoleKey, Vec<RolePlusOne<'_>>> = BTreeMap::new();
162 for (kind_json, _, diff) in snapshot {
163 if *diff != Diff::ONE {
164 continue;
165 }
166 let Some(role) = try_as_role(kind_json) else {
167 continue;
168 };
169 role_plus_ones
170 .entry(role.key.clone())
171 .or_default()
172 .push(RolePlusOne {
173 bytes: kind_json,
174 parsed: role,
175 });
176 }
177
178 let mut repairs = Vec::new();
179 let mut stats = RepairStats::default();
180 for (kind_json, _, diff) in snapshot {
181 if *diff == Diff::ONE {
182 continue;
183 }
184
185 let Some(dangling) = try_as_role(kind_json) else {
188 tracing::warn!(
189 ?kind_json,
190 %diff,
191 "non-Role dangling diff; not repaired by the v80-form-drift migration",
192 );
193 stats.skipped_non_role += 1;
194 continue;
195 };
196
197 if *diff != Diff::MINUS_ONE {
200 tracing::warn!(
201 role_name = %dangling.value.name,
202 %diff,
203 "Role row with unexpected diff magnitude; not repaired",
204 );
205 stats.skipped_role += 1;
206 continue;
207 }
208
209 let siblings = role_plus_ones
220 .get(&dangling.key)
221 .map(Vec::as_slice)
222 .unwrap_or(&[]);
223 let mut stale: Vec<&RolePlusOne<'_>> = Vec::new();
224 let mut live: Option<&RolePlusOne<'_>> = None;
225 let mut ambiguous_live = false;
226 for sib in siblings {
227 if sib.parsed.value == dangling.value {
228 stale.push(sib);
229 } else if live.replace(sib).is_some() {
230 ambiguous_live = true;
231 }
232 }
233
234 if stale.is_empty() {
235 tracing::warn!(
236 role_name = %dangling.value.name,
237 num_siblings = siblings.len(),
238 "dangling Role -1 has no parsed-equal +1 sibling; not the v80-form-drift signature",
239 );
240 stats.skipped_role += 1;
241 continue;
242 }
243 if ambiguous_live {
244 tracing::warn!(
245 role_name = %dangling.value.name,
246 num_siblings = siblings.len(),
247 "Role key has multiple distinct live +1 rows; refusing to auto-repair",
248 );
249 stats.skipped_role += 1;
250 continue;
251 }
252
253 tracing::info!(
254 role_name = %dangling.value.name,
255 stale_byte_forms = stale.len(),
256 has_live = live.is_some(),
257 "repairing v80-form-drift phantom retraction",
258 );
259 repairs.push((kind_json.clone(), Diff::ONE));
261 for s in stale {
265 if s.bytes == kind_json {
268 continue;
269 }
270 repairs.push((s.bytes.clone(), Diff::MINUS_ONE));
271 stats.stale_retracted += 1;
272 }
273 stats.repaired += 1;
274 }
275
276 (repairs, stats)
277}
278
279struct RolePlusOne<'a> {
283 bytes: &'a StateUpdateKindJson,
284 parsed: v83::Role,
285}
286
287fn try_as_role(kind_json: &StateUpdateKindJson) -> Option<v83::Role> {
292 let kind: v83::StateUpdateKind = kind_json.try_to_serde().ok()?;
293 match kind {
294 v83::StateUpdateKind::Role(role) => Some(role),
295 _ => None,
296 }
297}
298
299fn version_update_kind(version: u64) -> StateUpdateKindJson {
303 use crate::durable::objects::serialization::proto;
304 use crate::durable::objects::state_update::StateUpdateKind;
305 StateUpdateKind::Config(
306 proto::ConfigKey {
307 key: USER_VERSION_KEY.to_string(),
308 },
309 proto::ConfigValue { value: version },
310 )
311 .into()
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317 use crate::durable::upgrade::objects_v83 as v83;
318 use mz_repr::Diff;
319
320 fn role_kind(
321 user_id: u64,
322 name: &str,
323 oid: u32,
324 login: Option<bool>,
325 superuser: Option<bool>,
326 auto_provision_source: Option<v83::AutoProvisionSource>,
327 ) -> StateUpdateKindJson {
328 let role = v83::Role {
329 key: v83::RoleKey {
330 id: v83::RoleId::User(user_id),
331 },
332 value: v83::RoleValue {
333 name: name.to_string(),
334 oid,
335 attributes: v83::RoleAttributes {
336 inherit: true,
337 superuser,
338 login,
339 auto_provision_source,
340 },
341 membership: v83::RoleMembership { map: vec![] },
342 vars: v83::RoleVars { entries: vec![] },
343 },
344 };
345 v83::StateUpdateKind::Role(role).into()
346 }
347
348 fn stale_role_kind_with_dropped_field(
355 user_id: u64,
356 name: &str,
357 oid: u32,
358 ) -> StateUpdateKindJson {
359 use serde_json::json;
360 let v = json!({
361 "kind": "Role",
362 "key": { "id": { "User": user_id } },
363 "value": {
364 "name": name,
365 "oid": oid,
366 "attributes": {
367 "inherit": true,
368 "superuser": null,
369 "login": null,
370 },
372 "membership": { "map": [] },
373 "vars": { "entries": [] },
374 }
375 });
376 StateUpdateKindJson::from_serde(&v)
377 }
378
379 fn database_kind(id: u64, name: &str) -> StateUpdateKindJson {
380 let db = v83::Database {
383 key: v83::DatabaseKey {
384 id: v83::DatabaseId::User(id),
385 },
386 value: v83::DatabaseValue {
387 name: name.to_string(),
388 owner_id: v83::RoleId::System(1),
389 privileges: vec![],
390 oid: 0,
391 },
392 };
393 v83::StateUpdateKind::Database(db).into()
394 }
395
396 fn snapshot(
397 rows: Vec<(StateUpdateKindJson, Diff)>,
398 ) -> Vec<(StateUpdateKindJson, Timestamp, Diff)> {
399 rows.into_iter()
400 .map(|(kind, diff)| (kind, Timestamp::new(0), diff))
401 .collect()
402 }
403
404 #[mz_ore::test]
405 #[cfg_attr(miri, ignore)] fn healthy_snapshot_is_a_noop() {
407 let snap = snapshot(vec![
408 (
409 role_kind(1, "alice@example.com", 100, Some(true), None, None),
410 Diff::ONE,
411 ),
412 (role_kind(2, "bob", 101, None, None, None), Diff::ONE),
413 ]);
414 let (repairs, stats) = compute_repairs(&snap);
415 assert!(repairs.is_empty());
416 assert_eq!(stats, RepairStats::default());
417 }
418
419 #[mz_ore::test]
420 #[cfg_attr(miri, ignore)] fn production_shape_alter_login_is_repaired() {
422 let live = role_kind(8, "alice@example.com", 20030, Some(true), None, None);
426 let dangling = role_kind(8, "alice@example.com", 20030, None, None, None);
427 let stale = stale_role_kind_with_dropped_field(8, "alice@example.com", 20030);
428
429 assert_eq!(
430 try_as_role(&stale).expect("parses as Role").value,
431 try_as_role(&dangling).expect("parses as Role").value,
432 "test fixture broken: stale and dangling must be parsed-equal",
433 );
434 assert_ne!(
435 stale, dangling,
436 "test fixture broken: stale and dangling must have distinct stored forms",
437 );
438
439 let snap = snapshot(vec![
440 (stale.clone(), Diff::ONE),
441 (dangling.clone(), Diff::MINUS_ONE),
442 (live, Diff::ONE),
443 ]);
444 let (repairs, stats) = compute_repairs(&snap);
445 assert_eq!(
446 repairs,
447 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
448 );
449 assert_eq!(
450 stats,
451 RepairStats {
452 repaired: 1,
453 stale_retracted: 1,
454 ..Default::default()
455 }
456 );
457 }
458
459 #[mz_ore::test]
460 #[cfg_attr(miri, ignore)] fn alter_changing_superuser_is_repaired() {
462 let live = role_kind(11, "ops@materialize.com", 20040, None, Some(true), None);
466 let dangling = role_kind(11, "ops@materialize.com", 20040, None, None, None);
467 let stale = stale_role_kind_with_dropped_field(11, "ops@materialize.com", 20040);
468
469 let snap = snapshot(vec![
470 (stale.clone(), Diff::ONE),
471 (dangling.clone(), Diff::MINUS_ONE),
472 (live, Diff::ONE),
473 ]);
474 let (repairs, stats) = compute_repairs(&snap);
475 assert_eq!(
476 repairs,
477 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
478 );
479 assert_eq!(stats.repaired, 1);
480 assert_eq!(stats.stale_retracted, 1);
481 }
482
483 #[mz_ore::test]
484 #[cfg_attr(miri, ignore)] fn alter_changing_name_is_repaired() {
486 let live = role_kind(12, "renamed@materialize.com", 20050, None, None, None);
488 let dangling = role_kind(12, "original@materialize.com", 20050, None, None, None);
489 let stale = stale_role_kind_with_dropped_field(12, "original@materialize.com", 20050);
490
491 let snap = snapshot(vec![
492 (stale.clone(), Diff::ONE),
493 (dangling.clone(), Diff::MINUS_ONE),
494 (live, Diff::ONE),
495 ]);
496 let (repairs, stats) = compute_repairs(&snap);
497 assert_eq!(
498 repairs,
499 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
500 );
501 assert_eq!(stats.repaired, 1);
502 assert_eq!(stats.stale_retracted, 1);
503 }
504
505 #[mz_ore::test]
506 #[cfg_attr(miri, ignore)] fn drop_role_shape_is_repaired() {
508 let dangling = role_kind(13, "dropped@materialize.com", 20060, None, None, None);
512 let stale = stale_role_kind_with_dropped_field(13, "dropped@materialize.com", 20060);
513
514 let snap = snapshot(vec![
515 (stale.clone(), Diff::ONE),
516 (dangling.clone(), Diff::MINUS_ONE),
517 ]);
518 let (repairs, stats) = compute_repairs(&snap);
519 assert_eq!(
520 repairs,
521 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
522 );
523 assert_eq!(
524 stats,
525 RepairStats {
526 repaired: 1,
527 stale_retracted: 1,
528 ..Default::default()
529 }
530 );
531 }
532
533 #[mz_ore::test]
534 #[cfg_attr(miri, ignore)] fn dangling_minus_one_with_no_parsed_equal_plus_one_is_skipped() {
536 let dangling = role_kind(20, "ghost", 200, None, None, None);
539 let snap = snapshot(vec![(dangling, Diff::MINUS_ONE)]);
540 let (repairs, stats) = compute_repairs(&snap);
541 assert!(repairs.is_empty());
542 assert_eq!(stats.skipped_role, 1);
543 }
544
545 #[mz_ore::test]
546 #[cfg_attr(miri, ignore)] fn dangling_minus_one_with_only_a_different_parsed_live_is_skipped() {
548 let dangling = role_kind(21, "alice@materialize.com", 210, None, None, None);
553 let live = role_kind(21, "alice@materialize.com", 210, Some(true), None, None);
554 let snap = snapshot(vec![(dangling, Diff::MINUS_ONE), (live, Diff::ONE)]);
555 let (repairs, stats) = compute_repairs(&snap);
556 assert!(repairs.is_empty());
557 assert_eq!(stats.skipped_role, 1);
558 }
559
560 #[mz_ore::test]
561 #[cfg_attr(miri, ignore)] fn ambiguous_two_distinct_live_rows_is_skipped() {
563 let live_a = role_kind(22, "alice", 220, Some(true), None, None);
567 let live_b = role_kind(22, "alice", 220, Some(false), Some(true), None);
568 let dangling = role_kind(22, "alice", 220, None, None, None);
569 let stale = stale_role_kind_with_dropped_field(22, "alice", 220);
570 let snap = snapshot(vec![
571 (stale, Diff::ONE),
572 (live_a, Diff::ONE),
573 (live_b, Diff::ONE),
574 (dangling, Diff::MINUS_ONE),
575 ]);
576 let (repairs, stats) = compute_repairs(&snap);
577 assert!(repairs.is_empty());
578 assert_eq!(stats.skipped_role, 1);
579 }
580
581 #[mz_ore::test]
582 #[cfg_attr(miri, ignore)] fn dangling_non_role_is_skipped() {
584 let dangling = database_kind(1, "ghostdb");
585 let snap = snapshot(vec![(dangling, Diff::MINUS_ONE)]);
586 let (repairs, stats) = compute_repairs(&snap);
587 assert!(repairs.is_empty());
588 assert_eq!(stats.skipped_non_role, 1);
589 }
590
591 #[mz_ore::test]
592 #[cfg_attr(miri, ignore)] fn dangling_diff_other_than_minus_one_is_skipped() {
594 let dangling = role_kind(30, "arjun", 20032, None, None, None);
596 let stale = stale_role_kind_with_dropped_field(30, "arjun", 20032);
597 let live = role_kind(30, "arjun", 20032, Some(true), None, None);
598 let snap = snapshot(vec![
599 (dangling, Diff::MINUS_ONE + Diff::MINUS_ONE),
600 (stale, Diff::ONE),
601 (live, Diff::ONE),
602 ]);
603 let (repairs, stats) = compute_repairs(&snap);
604 assert!(repairs.is_empty());
605 assert_eq!(stats.skipped_role, 1);
606 }
607
608 #[mz_ore::test]
609 #[cfg_attr(miri, ignore)] fn repair_with_multiple_stale_byte_forms_retracts_all() {
611 let live = role_kind(40, "alice@materialize.com", 20070, Some(true), None, None);
615 let dangling = role_kind(40, "alice@materialize.com", 20070, None, None, None);
616 let stale_a = stale_role_kind_with_dropped_field(40, "alice@materialize.com", 20070);
617 let stale_b = stale_role_kind_with_extra_whitespace(40, "alice@materialize.com", 20070);
618 assert_ne!(stale_a, stale_b);
619
620 let snap = snapshot(vec![
621 (stale_a.clone(), Diff::ONE),
622 (stale_b.clone(), Diff::ONE),
623 (dangling.clone(), Diff::MINUS_ONE),
624 (live, Diff::ONE),
625 ]);
626 let (repairs, stats) = compute_repairs(&snap);
627 let plus = repairs
630 .iter()
631 .filter(|(_, d)| *d == Diff::ONE)
632 .map(|(k, _)| k.clone())
633 .collect::<Vec<_>>();
634 let minus = repairs
635 .iter()
636 .filter(|(_, d)| *d == Diff::MINUS_ONE)
637 .map(|(k, _)| k.clone())
638 .collect::<std::collections::BTreeSet<_>>();
639 assert_eq!(plus, vec![dangling]);
640 let expected_minus: std::collections::BTreeSet<_> =
641 [stale_a, stale_b].into_iter().collect();
642 assert_eq!(minus, expected_minus);
643 assert_eq!(stats.repaired, 1);
644 assert_eq!(stats.stale_retracted, 2);
645 }
646
647 fn stale_role_kind_with_extra_whitespace(
651 user_id: u64,
652 name: &str,
653 oid: u32,
654 ) -> StateUpdateKindJson {
655 use serde_json::json;
656 let v = json!({
660 "kind": "Role",
661 "key": { "id": { "User": user_id } },
662 "value": {
663 "name": name,
664 "oid": oid,
665 "attributes": {
666 "inherit": true,
667 "superuser": null,
668 "login": null,
669 "auto_provision_source": null,
670 "password": null,
671 },
672 "membership": { "map": [] },
673 "vars": { "entries": [] },
674 }
675 });
676 StateUpdateKindJson::from_serde(&v)
677 }
678}