1use std::collections::{BTreeMap, BTreeSet};
79
80use mz_repr::Diff;
81use mz_repr::adt::regex::Regex;
82
83use crate::durable::objects::state_update::{StateUpdate, StateUpdateKindJson};
84use crate::durable::persist::{Mode, Timestamp, UnopenedPersistCatalogState};
85use crate::durable::upgrade::objects_v83 as v83;
86use crate::durable::{CatalogError, initialize::USER_VERSION_KEY};
87
88const FROM_VERSION: u64 = 83;
89const TO_VERSION: u64 = 84;
90
91#[derive(Debug, Default, PartialEq, Eq)]
94pub(crate) struct RepairStats {
95 pub repaired: usize,
98 pub stale_retracted: usize,
101 pub normalized: usize,
105 pub skipped_role: usize,
107 pub skipped_non_role: usize,
110}
111
112pub async fn upgrade(
113 unopened_catalog_state: &mut UnopenedPersistCatalogState,
114 mut commit_ts: Timestamp,
115) -> Result<(u64, Timestamp), CatalogError> {
116 tracing::info!(
117 from_version = FROM_VERSION,
118 to_version = TO_VERSION,
119 "running versioned Catalog upgrade (repair Role row drift)",
120 );
121
122 let (repairs, stats) = compute_repairs(&unopened_catalog_state.snapshot);
123
124 if !repairs.is_empty() {
125 tracing::info!(
126 repaired = stats.repaired,
127 stale_retracted = stats.stale_retracted,
128 normalized = stats.normalized,
129 "repairing Role rows left inconsistent by the v80->v81 migration's non-cloud no-op (and replaying its auto_provision_source backfill)",
130 );
131 }
132 if stats.skipped_role > 0 || stats.skipped_non_role > 0 {
133 tracing::warn!(
134 skipped_role = stats.skipped_role,
135 skipped_non_role = stats.skipped_non_role,
136 "left dangling diffs that did not fit the v80-form-drift signature; review the WARN events emitted above",
137 );
138 }
139
140 let mut updates: Vec<(StateUpdateKindJson, Diff)> = repairs;
141 updates.push((version_update_kind(FROM_VERSION), Diff::MINUS_ONE));
142 updates.push((version_update_kind(TO_VERSION), Diff::ONE));
143
144 tracing::info!(
146 "catalog upgrade v{FROM_VERSION}_to_v{TO_VERSION}: about to apply updates: {updates:?}"
147 );
148
149 if matches!(unopened_catalog_state.mode, Mode::Writable) {
150 commit_ts = unopened_catalog_state
151 .compare_and_append(updates, commit_ts)
152 .await
153 .map_err(|e| e.unwrap_fence_error())?;
154 } else {
155 let ts = commit_ts;
156 let updates = updates
157 .into_iter()
158 .map(|(kind, diff)| StateUpdate { kind, ts, diff });
159 commit_ts = commit_ts.step_forward();
160 unopened_catalog_state.apply_updates_and_consolidate(updates)?;
161 }
162
163 unopened_catalog_state.consolidate();
164 Ok((TO_VERSION, commit_ts))
165}
166
167pub(crate) fn compute_repairs(
188 snapshot: &[(StateUpdateKindJson, Timestamp, Diff)],
189) -> (Vec<(StateUpdateKindJson, Diff)>, RepairStats) {
190 let mut role_plus_ones: BTreeMap<v83::RoleKey, Vec<RolePlusOne<'_>>> = BTreeMap::new();
194 for (kind_json, _, diff) in snapshot {
195 if *diff != Diff::ONE {
196 continue;
197 }
198 let Some(role) = try_as_role(kind_json) else {
199 continue;
200 };
201 role_plus_ones
202 .entry(role.key.clone())
203 .or_default()
204 .push(RolePlusOne {
205 bytes: kind_json,
206 parsed: role,
207 });
208 }
209
210 let mut repairs = Vec::new();
211 let mut stats = RepairStats::default();
212 let mut retracted_in_pass_1: BTreeSet<&StateUpdateKindJson> = BTreeSet::new();
214 let mut unrepaired_keys: BTreeSet<v83::RoleKey> = BTreeSet::new();
219 for (kind_json, _, diff) in snapshot {
220 if *diff == Diff::ONE {
221 continue;
222 }
223
224 let Some(dangling) = try_as_role(kind_json) else {
227 tracing::warn!(
228 ?kind_json,
229 %diff,
230 "non-Role dangling diff; not repaired by the v80-form-drift migration",
231 );
232 stats.skipped_non_role += 1;
233 continue;
234 };
235
236 if *diff != Diff::MINUS_ONE {
239 tracing::warn!(
240 role_name = %dangling.value.name,
241 %diff,
242 "Role row with unexpected diff magnitude; not repaired",
243 );
244 stats.skipped_role += 1;
245 unrepaired_keys.insert(dangling.key);
246 continue;
247 }
248
249 let siblings = role_plus_ones
260 .get(&dangling.key)
261 .map(Vec::as_slice)
262 .unwrap_or(&[]);
263 let mut stale: Vec<&RolePlusOne<'_>> = Vec::new();
264 let mut live: Option<&RolePlusOne<'_>> = None;
265 let mut ambiguous_live = false;
266 for sib in siblings {
267 if sib.parsed.value == dangling.value {
268 stale.push(sib);
269 } else if live.replace(sib).is_some() {
270 ambiguous_live = true;
271 }
272 }
273
274 if stale.is_empty() {
275 tracing::warn!(
276 role_name = %dangling.value.name,
277 num_siblings = siblings.len(),
278 "dangling Role -1 has no parsed-equal +1 sibling; not the v80-form-drift signature",
279 );
280 stats.skipped_role += 1;
281 unrepaired_keys.insert(dangling.key);
282 continue;
283 }
284 if ambiguous_live {
285 tracing::warn!(
286 role_name = %dangling.value.name,
287 num_siblings = siblings.len(),
288 "Role key has multiple distinct live +1 rows; refusing to auto-repair",
289 );
290 stats.skipped_role += 1;
291 unrepaired_keys.insert(dangling.key);
292 continue;
293 }
294
295 tracing::info!(
296 role_name = %dangling.value.name,
297 stale_byte_forms = stale.len(),
298 has_live = live.is_some(),
299 "repairing v80-form-drift phantom retraction",
300 );
301 repairs.push((kind_json.clone(), Diff::ONE));
303 for s in stale {
307 if s.bytes == kind_json {
310 continue;
311 }
312 repairs.push((s.bytes.clone(), Diff::MINUS_ONE));
313 retracted_in_pass_1.insert(s.bytes);
314 stats.stale_retracted += 1;
315 }
316 stats.repaired += 1;
317 }
318
319 let is_cloud = is_cloud_env(snapshot);
328 let mut inserted_canonical: BTreeSet<StateUpdateKindJson> = BTreeSet::new();
329 for (kind_json, _, diff) in snapshot {
330 if *diff != Diff::ONE {
331 continue;
332 }
333 let Some(role) = try_as_role(kind_json) else {
334 continue;
335 };
336 if retracted_in_pass_1.contains(kind_json) {
337 continue;
338 }
339 if unrepaired_keys.contains(&role.key) {
340 continue;
341 }
342 let backfilled = apply_v80_to_v81_backfill(role.clone(), is_cloud);
343 let canonical: StateUpdateKindJson = v83::StateUpdateKind::Role(backfilled).into();
344 if &canonical == kind_json {
345 continue;
346 }
347 tracing::info!(
348 role_name = %role.value.name,
349 "normalizing Role row to canonical byte form",
350 );
351 repairs.push((kind_json.clone(), Diff::MINUS_ONE));
352 if inserted_canonical.insert(canonical.clone()) {
353 repairs.push((canonical, Diff::ONE));
354 }
355 stats.normalized += 1;
356 }
357
358 (repairs, stats)
359}
360
361fn is_cloud_env(snapshot: &[(StateUpdateKindJson, Timestamp, Diff)]) -> bool {
381 let mut has_password_auth = false;
382 let mut mz_system: Option<(v83::ClusterId, v83::ClusterVariant)> = None;
383 let mut replica_counts_by_cluster: BTreeMap<v83::ClusterId, usize> = BTreeMap::new();
384 for (kind_json, _, diff) in snapshot {
385 if *diff != Diff::ONE {
386 continue;
387 }
388 let Ok(kind) = kind_json.try_to_serde::<v83::StateUpdateKind>() else {
389 continue;
390 };
391 match kind {
392 v83::StateUpdateKind::ServerConfiguration(config) => {
393 if config.key.name == "enable_password_auth" && config.value.value == "on" {
394 has_password_auth = true;
395 }
396 }
397 v83::StateUpdateKind::Cluster(cluster) if cluster.value.name == "mz_system" => {
398 mz_system = Some((cluster.key.id, cluster.value.config.variant));
399 }
400 v83::StateUpdateKind::ClusterReplica(replica) => {
401 *replica_counts_by_cluster
402 .entry(replica.value.cluster_id)
403 .or_default() += 1;
404 }
405 _ => {}
406 }
407 }
408
409 if has_password_auth {
410 return false;
411 }
412 match mz_system {
413 Some((
414 _,
415 v83::ClusterVariant::Managed(v83::ManagedCluster {
416 replication_factor, ..
417 }),
418 )) => replication_factor > 0,
419 Some((id, v83::ClusterVariant::Unmanaged)) => {
420 replica_counts_by_cluster.get(&id).copied().unwrap_or(0) > 0
421 }
422 None => false,
423 }
424}
425
426fn auto_provision_source_for_name(name: &str) -> Option<v83::AutoProvisionSource> {
431 let email_like = Regex::new(r".+@.+\..+", true).expect("valid regex");
432 if email_like.is_match(name) {
433 Some(v83::AutoProvisionSource::Frontegg)
434 } else {
435 None
436 }
437}
438
439fn apply_v80_to_v81_backfill(mut role: v83::Role, is_cloud: bool) -> v83::Role {
444 if !is_cloud {
445 return role;
446 }
447 if role.value.attributes.auto_provision_source.is_some() {
448 return role;
449 }
450 role.value.attributes.auto_provision_source = auto_provision_source_for_name(&role.value.name);
451 role
452}
453
454struct RolePlusOne<'a> {
458 bytes: &'a StateUpdateKindJson,
459 parsed: v83::Role,
460}
461
462fn try_as_role(kind_json: &StateUpdateKindJson) -> Option<v83::Role> {
467 let kind: v83::StateUpdateKind = kind_json.try_to_serde().ok()?;
468 match kind {
469 v83::StateUpdateKind::Role(role) => Some(role),
470 _ => None,
471 }
472}
473
474fn version_update_kind(version: u64) -> StateUpdateKindJson {
478 use crate::durable::objects::serialization::proto;
479 use crate::durable::objects::state_update::StateUpdateKind;
480 StateUpdateKind::Config(
481 proto::ConfigKey {
482 key: USER_VERSION_KEY.to_string(),
483 },
484 proto::ConfigValue { value: version },
485 )
486 .into()
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492 use crate::durable::upgrade::objects_v83 as v83;
493 use mz_repr::Diff;
494
495 fn role_kind(
496 user_id: u64,
497 name: &str,
498 oid: u32,
499 login: Option<bool>,
500 superuser: Option<bool>,
501 auto_provision_source: Option<v83::AutoProvisionSource>,
502 ) -> StateUpdateKindJson {
503 let role = v83::Role {
504 key: v83::RoleKey {
505 id: v83::RoleId::User(user_id),
506 },
507 value: v83::RoleValue {
508 name: name.to_string(),
509 oid,
510 attributes: v83::RoleAttributes {
511 inherit: true,
512 superuser,
513 login,
514 auto_provision_source,
515 },
516 membership: v83::RoleMembership { map: vec![] },
517 vars: v83::RoleVars { entries: vec![] },
518 },
519 };
520 v83::StateUpdateKind::Role(role).into()
521 }
522
523 fn stale_role_kind_with_dropped_field(
530 user_id: u64,
531 name: &str,
532 oid: u32,
533 ) -> StateUpdateKindJson {
534 use serde_json::json;
535 let v = json!({
536 "kind": "Role",
537 "key": { "id": { "User": user_id } },
538 "value": {
539 "name": name,
540 "oid": oid,
541 "attributes": {
542 "inherit": true,
543 "superuser": null,
544 "login": null,
545 },
547 "membership": { "map": [] },
548 "vars": { "entries": [] },
549 }
550 });
551 StateUpdateKindJson::from_serde(&v)
552 }
553
554 fn database_kind(id: u64, name: &str) -> StateUpdateKindJson {
555 let db = v83::Database {
558 key: v83::DatabaseKey {
559 id: v83::DatabaseId::User(id),
560 },
561 value: v83::DatabaseValue {
562 name: name.to_string(),
563 owner_id: v83::RoleId::System(1),
564 privileges: vec![],
565 oid: 0,
566 },
567 };
568 v83::StateUpdateKind::Database(db).into()
569 }
570
571 fn snapshot(
572 rows: Vec<(StateUpdateKindJson, Diff)>,
573 ) -> Vec<(StateUpdateKindJson, Timestamp, Diff)> {
574 rows.into_iter()
575 .map(|(kind, diff)| (kind, Timestamp::new(0), diff))
576 .collect()
577 }
578
579 #[mz_ore::test]
580 #[cfg_attr(miri, ignore)] fn healthy_snapshot_is_a_noop() {
582 let snap = snapshot(vec![
583 (
584 role_kind(1, "alice@example.com", 100, Some(true), None, None),
585 Diff::ONE,
586 ),
587 (role_kind(2, "bob", 101, None, None, None), Diff::ONE),
588 ]);
589 let (repairs, stats) = compute_repairs(&snap);
590 assert!(repairs.is_empty());
591 assert_eq!(stats, RepairStats::default());
592 }
593
594 #[mz_ore::test]
595 #[cfg_attr(miri, ignore)] fn production_shape_alter_login_is_repaired() {
597 let live = role_kind(8, "alice@example.com", 20030, Some(true), None, None);
601 let dangling = role_kind(8, "alice@example.com", 20030, None, None, None);
602 let stale = stale_role_kind_with_dropped_field(8, "alice@example.com", 20030);
603
604 assert_eq!(
605 try_as_role(&stale).expect("parses as Role").value,
606 try_as_role(&dangling).expect("parses as Role").value,
607 "test fixture broken: stale and dangling must be parsed-equal",
608 );
609 assert_ne!(
610 stale, dangling,
611 "test fixture broken: stale and dangling must have distinct stored forms",
612 );
613
614 let snap = snapshot(vec![
615 (stale.clone(), Diff::ONE),
616 (dangling.clone(), Diff::MINUS_ONE),
617 (live, Diff::ONE),
618 ]);
619 let (repairs, stats) = compute_repairs(&snap);
620 assert_eq!(
621 repairs,
622 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
623 );
624 assert_eq!(
625 stats,
626 RepairStats {
627 repaired: 1,
628 stale_retracted: 1,
629 ..Default::default()
630 }
631 );
632 }
633
634 #[mz_ore::test]
635 #[cfg_attr(miri, ignore)] fn alter_changing_superuser_is_repaired() {
637 let live = role_kind(11, "ops@materialize.com", 20040, None, Some(true), None);
641 let dangling = role_kind(11, "ops@materialize.com", 20040, None, None, None);
642 let stale = stale_role_kind_with_dropped_field(11, "ops@materialize.com", 20040);
643
644 let snap = snapshot(vec![
645 (stale.clone(), Diff::ONE),
646 (dangling.clone(), Diff::MINUS_ONE),
647 (live, Diff::ONE),
648 ]);
649 let (repairs, stats) = compute_repairs(&snap);
650 assert_eq!(
651 repairs,
652 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
653 );
654 assert_eq!(stats.repaired, 1);
655 assert_eq!(stats.stale_retracted, 1);
656 }
657
658 #[mz_ore::test]
659 #[cfg_attr(miri, ignore)] fn alter_changing_name_is_repaired() {
661 let live = role_kind(12, "renamed@materialize.com", 20050, None, None, None);
663 let dangling = role_kind(12, "original@materialize.com", 20050, None, None, None);
664 let stale = stale_role_kind_with_dropped_field(12, "original@materialize.com", 20050);
665
666 let snap = snapshot(vec![
667 (stale.clone(), Diff::ONE),
668 (dangling.clone(), Diff::MINUS_ONE),
669 (live, Diff::ONE),
670 ]);
671 let (repairs, stats) = compute_repairs(&snap);
672 assert_eq!(
673 repairs,
674 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
675 );
676 assert_eq!(stats.repaired, 1);
677 assert_eq!(stats.stale_retracted, 1);
678 }
679
680 #[mz_ore::test]
681 #[cfg_attr(miri, ignore)] fn drop_role_shape_is_repaired() {
683 let dangling = role_kind(13, "dropped@materialize.com", 20060, None, None, None);
687 let stale = stale_role_kind_with_dropped_field(13, "dropped@materialize.com", 20060);
688
689 let snap = snapshot(vec![
690 (stale.clone(), Diff::ONE),
691 (dangling.clone(), Diff::MINUS_ONE),
692 ]);
693 let (repairs, stats) = compute_repairs(&snap);
694 assert_eq!(
695 repairs,
696 vec![(dangling, Diff::ONE), (stale, Diff::MINUS_ONE)],
697 );
698 assert_eq!(
699 stats,
700 RepairStats {
701 repaired: 1,
702 stale_retracted: 1,
703 ..Default::default()
704 }
705 );
706 }
707
708 #[mz_ore::test]
709 #[cfg_attr(miri, ignore)] fn dangling_minus_one_with_no_parsed_equal_plus_one_is_skipped() {
711 let dangling = role_kind(20, "ghost", 200, None, None, None);
714 let snap = snapshot(vec![(dangling, Diff::MINUS_ONE)]);
715 let (repairs, stats) = compute_repairs(&snap);
716 assert!(repairs.is_empty());
717 assert_eq!(stats.skipped_role, 1);
718 }
719
720 #[mz_ore::test]
721 #[cfg_attr(miri, ignore)] fn dangling_minus_one_with_only_a_different_parsed_live_is_skipped() {
723 let dangling = role_kind(21, "alice@materialize.com", 210, None, None, None);
728 let live = role_kind(21, "alice@materialize.com", 210, Some(true), None, None);
729 let snap = snapshot(vec![(dangling, Diff::MINUS_ONE), (live, Diff::ONE)]);
730 let (repairs, stats) = compute_repairs(&snap);
731 assert!(repairs.is_empty());
732 assert_eq!(stats.skipped_role, 1);
733 }
734
735 #[mz_ore::test]
736 #[cfg_attr(miri, ignore)] fn ambiguous_two_distinct_live_rows_is_skipped() {
738 let live_a = role_kind(22, "alice", 220, Some(true), None, None);
742 let live_b = role_kind(22, "alice", 220, Some(false), Some(true), None);
743 let dangling = role_kind(22, "alice", 220, None, None, None);
744 let stale = stale_role_kind_with_dropped_field(22, "alice", 220);
745 let snap = snapshot(vec![
746 (stale, Diff::ONE),
747 (live_a, Diff::ONE),
748 (live_b, Diff::ONE),
749 (dangling, Diff::MINUS_ONE),
750 ]);
751 let (repairs, stats) = compute_repairs(&snap);
752 assert!(repairs.is_empty());
753 assert_eq!(stats.skipped_role, 1);
754 }
755
756 #[mz_ore::test]
757 #[cfg_attr(miri, ignore)] fn dangling_non_role_is_skipped() {
759 let dangling = database_kind(1, "ghostdb");
760 let snap = snapshot(vec![(dangling, Diff::MINUS_ONE)]);
761 let (repairs, stats) = compute_repairs(&snap);
762 assert!(repairs.is_empty());
763 assert_eq!(stats.skipped_non_role, 1);
764 }
765
766 #[mz_ore::test]
767 #[cfg_attr(miri, ignore)] fn dangling_diff_other_than_minus_one_is_skipped() {
769 let dangling = role_kind(30, "arjun", 20032, None, None, None);
771 let stale = stale_role_kind_with_dropped_field(30, "arjun", 20032);
772 let live = role_kind(30, "arjun", 20032, Some(true), None, None);
773 let snap = snapshot(vec![
774 (dangling, Diff::MINUS_ONE + Diff::MINUS_ONE),
775 (stale, Diff::ONE),
776 (live, Diff::ONE),
777 ]);
778 let (repairs, stats) = compute_repairs(&snap);
779 assert!(repairs.is_empty());
780 assert_eq!(stats.skipped_role, 1);
781 }
782
783 #[mz_ore::test]
784 #[cfg_attr(miri, ignore)] fn repair_with_multiple_stale_byte_forms_retracts_all() {
786 let live = role_kind(40, "alice@materialize.com", 20070, Some(true), None, None);
790 let dangling = role_kind(40, "alice@materialize.com", 20070, None, None, None);
791 let stale_a = stale_role_kind_with_dropped_field(40, "alice@materialize.com", 20070);
792 let stale_b = stale_role_kind_with_extra_whitespace(40, "alice@materialize.com", 20070);
793 assert_ne!(stale_a, stale_b);
794
795 let snap = snapshot(vec![
796 (stale_a.clone(), Diff::ONE),
797 (stale_b.clone(), Diff::ONE),
798 (dangling.clone(), Diff::MINUS_ONE),
799 (live, Diff::ONE),
800 ]);
801 let (repairs, stats) = compute_repairs(&snap);
802 let plus = repairs
805 .iter()
806 .filter(|(_, d)| *d == Diff::ONE)
807 .map(|(k, _)| k.clone())
808 .collect::<Vec<_>>();
809 let minus = repairs
810 .iter()
811 .filter(|(_, d)| *d == Diff::MINUS_ONE)
812 .map(|(k, _)| k.clone())
813 .collect::<std::collections::BTreeSet<_>>();
814 assert_eq!(plus, vec![dangling]);
815 let expected_minus: std::collections::BTreeSet<_> =
816 [stale_a, stale_b].into_iter().collect();
817 assert_eq!(minus, expected_minus);
818 assert_eq!(stats.repaired, 1);
819 assert_eq!(stats.stale_retracted, 2);
820 }
821
822 #[mz_ore::test]
823 #[cfg_attr(miri, ignore)] fn untouched_pre_v81_role_is_normalized() {
825 let stale = stale_role_kind_with_dropped_field(50, "carol@materialize.com", 20100);
830 let canonical = role_kind(50, "carol@materialize.com", 20100, None, None, None);
831 assert_ne!(
832 stale, canonical,
833 "test fixture broken: stale and canonical must have distinct stored forms",
834 );
835
836 let snap = snapshot(vec![(stale.clone(), Diff::ONE)]);
837 let (repairs, stats) = compute_repairs(&snap);
838 assert_eq!(
839 repairs,
840 vec![(stale, Diff::MINUS_ONE), (canonical, Diff::ONE)],
841 );
842 assert_eq!(
843 stats,
844 RepairStats {
845 normalized: 1,
846 ..Default::default()
847 },
848 );
849 }
850
851 #[mz_ore::test]
852 #[cfg_attr(miri, ignore)] fn canonical_form_role_is_not_normalized() {
854 let canonical = role_kind(51, "dave@materialize.com", 20101, Some(true), None, None);
858 let snap = snapshot(vec![(canonical, Diff::ONE)]);
859 let (repairs, stats) = compute_repairs(&snap);
860 assert!(repairs.is_empty());
861 assert_eq!(stats.normalized, 0);
862 }
863
864 #[mz_ore::test]
865 #[cfg_attr(miri, ignore)] fn multiple_stale_byte_forms_no_dangling_collapse_to_one_canonical() {
867 let stale_a = stale_role_kind_with_dropped_field(60, "ed@materialize.com", 20110);
872 let stale_b = stale_role_kind_with_extra_whitespace(60, "ed@materialize.com", 20110);
873 let canonical = role_kind(60, "ed@materialize.com", 20110, None, None, None);
874 assert_ne!(stale_a, stale_b);
875 assert_ne!(stale_a, canonical);
876 assert_ne!(stale_b, canonical);
877
878 let snap = snapshot(vec![
879 (stale_a.clone(), Diff::ONE),
880 (stale_b.clone(), Diff::ONE),
881 ]);
882 let (repairs, stats) = compute_repairs(&snap);
883
884 let plus: Vec<_> = repairs
885 .iter()
886 .filter(|(_, d)| *d == Diff::ONE)
887 .cloned()
888 .collect();
889 let minus: std::collections::BTreeSet<_> = repairs
890 .iter()
891 .filter(|(_, d)| *d == Diff::MINUS_ONE)
892 .map(|(k, _)| k.clone())
893 .collect();
894 assert_eq!(plus, vec![(canonical, Diff::ONE)]);
895 let expected_minus: std::collections::BTreeSet<_> =
896 [stale_a, stale_b].into_iter().collect();
897 assert_eq!(minus, expected_minus);
898 assert_eq!(stats.normalized, 2);
899 }
900
901 #[mz_ore::test]
902 #[cfg_attr(miri, ignore)] fn unrepaired_dangling_skips_normalize_for_same_key() {
904 let live_a = role_kind(70, "frank", 20120, Some(true), None, None);
909 let live_b = role_kind(70, "frank", 20120, Some(false), Some(true), None);
910 let dangling = role_kind(70, "frank", 20120, None, None, None);
911 let stale = stale_role_kind_with_dropped_field(70, "frank", 20120);
912
913 let snap = snapshot(vec![
914 (stale.clone(), Diff::ONE),
915 (live_a, Diff::ONE),
916 (live_b, Diff::ONE),
917 (dangling, Diff::MINUS_ONE),
918 ]);
919 let (repairs, stats) = compute_repairs(&snap);
920 assert!(repairs.is_empty());
921 assert_eq!(stats.skipped_role, 1);
922 assert_eq!(stats.normalized, 0);
923 }
924
925 #[mz_ore::test]
926 #[cfg_attr(miri, ignore)] fn normalize_does_not_double_retract_pass_1_stale_rows() {
928 let live = role_kind(80, "grace@materialize.com", 20130, Some(true), None, None);
932 let dangling = role_kind(80, "grace@materialize.com", 20130, None, None, None);
933 let stale = stale_role_kind_with_dropped_field(80, "grace@materialize.com", 20130);
934
935 let snap = snapshot(vec![
936 (stale.clone(), Diff::ONE),
937 (dangling.clone(), Diff::MINUS_ONE),
938 (live, Diff::ONE),
939 ]);
940 let (repairs, stats) = compute_repairs(&snap);
941
942 let stale_retractions = repairs
944 .iter()
945 .filter(|(k, d)| k == &stale && *d == Diff::MINUS_ONE)
946 .count();
947 assert_eq!(stale_retractions, 1);
948 assert_eq!(stats.repaired, 1);
949 assert_eq!(stats.stale_retracted, 1);
950 assert_eq!(stats.normalized, 0);
951 }
952
953 fn stale_role_kind_with_extra_whitespace(
957 user_id: u64,
958 name: &str,
959 oid: u32,
960 ) -> StateUpdateKindJson {
961 use serde_json::json;
962 let v = json!({
966 "kind": "Role",
967 "key": { "id": { "User": user_id } },
968 "value": {
969 "name": name,
970 "oid": oid,
971 "attributes": {
972 "inherit": true,
973 "superuser": null,
974 "login": null,
975 "auto_provision_source": null,
976 "password": null,
977 },
978 "membership": { "map": [] },
979 "vars": { "entries": [] },
980 }
981 });
982 StateUpdateKindJson::from_serde(&v)
983 }
984
985 fn mz_system_cluster(replication_factor: u32) -> StateUpdateKindJson {
989 let cluster = v83::Cluster {
990 key: v83::ClusterKey {
991 id: v83::ClusterId::System(1),
992 },
993 value: v83::ClusterValue {
994 name: "mz_system".to_string(),
995 owner_id: v83::RoleId::System(1),
996 privileges: vec![],
997 config: v83::ClusterConfig {
998 workload_class: None,
999 variant: v83::ClusterVariant::Managed(v83::ManagedCluster {
1000 size: "1".to_string(),
1001 replication_factor,
1002 availability_zones: vec![],
1003 logging: v83::ReplicaLogging {
1004 log_logging: false,
1005 interval: None,
1006 },
1007 optimizer_feature_overrides: vec![],
1008 schedule: v83::ClusterSchedule::Manual,
1009 }),
1010 },
1011 },
1012 };
1013 v83::StateUpdateKind::Cluster(cluster).into()
1014 }
1015
1016 fn mz_system_cluster_unmanaged() -> StateUpdateKindJson {
1019 let cluster = v83::Cluster {
1020 key: v83::ClusterKey {
1021 id: v83::ClusterId::System(1),
1022 },
1023 value: v83::ClusterValue {
1024 name: "mz_system".to_string(),
1025 owner_id: v83::RoleId::System(1),
1026 privileges: vec![],
1027 config: v83::ClusterConfig {
1028 workload_class: None,
1029 variant: v83::ClusterVariant::Unmanaged,
1030 },
1031 },
1032 };
1033 v83::StateUpdateKind::Cluster(cluster).into()
1034 }
1035
1036 fn cluster_replica(replica_id: u64, cluster_id: v83::ClusterId) -> StateUpdateKindJson {
1039 let replica = v83::ClusterReplica {
1040 key: v83::ClusterReplicaKey {
1041 id: v83::ReplicaId::User(replica_id),
1042 },
1043 value: v83::ClusterReplicaValue {
1044 cluster_id,
1045 name: format!("r{replica_id}"),
1046 config: v83::ReplicaConfig {
1047 logging: v83::ReplicaLogging {
1048 log_logging: false,
1049 interval: None,
1050 },
1051 location: v83::ReplicaLocation::Unmanaged(v83::UnmanagedLocation {
1052 storagectl_addrs: vec![],
1053 computectl_addrs: vec![],
1054 }),
1055 },
1056 owner_id: v83::RoleId::System(1),
1057 },
1058 };
1059 v83::StateUpdateKind::ClusterReplica(replica).into()
1060 }
1061
1062 fn server_config(name: &str, value: &str) -> StateUpdateKindJson {
1066 let cfg = v83::ServerConfiguration {
1067 key: v83::ServerConfigurationKey {
1068 name: name.to_string(),
1069 },
1070 value: v83::ServerConfigurationValue {
1071 value: value.to_string(),
1072 },
1073 };
1074 v83::StateUpdateKind::ServerConfiguration(cfg).into()
1075 }
1076
1077 #[mz_ore::test]
1085 #[cfg_attr(miri, ignore)] fn cloud_env_backfills_auto_provision_source_for_email_name() {
1087 let original = stale_role_kind_with_dropped_field(100, "alice@materialize.com", 30100);
1092 let backfilled = role_kind(
1093 100,
1094 "alice@materialize.com",
1095 30100,
1096 None,
1097 None,
1098 Some(v83::AutoProvisionSource::Frontegg),
1099 );
1100
1101 let snap = snapshot(vec![
1102 (mz_system_cluster(1), Diff::ONE),
1103 (server_config("enable_password_auth", "off"), Diff::ONE),
1104 (original.clone(), Diff::ONE),
1105 ]);
1106 let (repairs, stats) = compute_repairs(&snap);
1107 assert_eq!(
1108 repairs,
1109 vec![(original, Diff::MINUS_ONE), (backfilled, Diff::ONE)],
1110 );
1111 assert_eq!(stats.normalized, 1);
1112 }
1113
1114 #[mz_ore::test]
1117 #[cfg_attr(miri, ignore)] fn cloud_env_migrates_non_stale_user_role() {
1119 let original = role_kind(100, "alice@materialize.com", 30100, None, None, None);
1121 let migrated = role_kind(
1123 100,
1124 "alice@materialize.com",
1125 30100,
1126 None,
1127 None,
1128 Some(v83::AutoProvisionSource::Frontegg),
1129 );
1130
1131 let snap = snapshot(vec![
1132 (mz_system_cluster(1), Diff::ONE),
1133 (server_config("enable_password_auth", "off"), Diff::ONE),
1134 (original.clone(), Diff::ONE),
1135 ]);
1136 let (repairs, stats) = compute_repairs(&snap);
1137 assert_eq!(
1138 repairs,
1139 vec![(original, Diff::MINUS_ONE), (migrated, Diff::ONE)],
1140 );
1141 assert_eq!(stats.normalized, 1);
1142 }
1143
1144 #[mz_ore::test]
1145 #[cfg_attr(miri, ignore)] fn cloud_env_non_user_roles() {
1147 let admin = stale_role_kind_with_dropped_field(101, "admin", 30101);
1151 let admin_backfilled = role_kind(101, "admin", 30101, None, None, None);
1152
1153 let snap = snapshot(vec![
1154 (mz_system_cluster(1), Diff::ONE),
1155 (server_config("enable_password_auth", "off"), Diff::ONE),
1156 (admin.clone(), Diff::ONE),
1157 ]);
1158 let (repairs, stats) = compute_repairs(&snap);
1159 assert_eq!(
1160 repairs,
1161 vec![(admin, Diff::MINUS_ONE), (admin_backfilled, Diff::ONE)],
1162 );
1163 assert_eq!(stats.normalized, 1);
1164 }
1165
1166 #[mz_ore::test]
1167 #[cfg_attr(miri, ignore)] fn cloud_env_preserves_existing_auto_provision_source_for_user_role() {
1169 let already_migrated_user_role = role_kind(
1173 102,
1174 "ops@materialize.com",
1175 30102,
1176 None,
1177 None,
1178 Some(v83::AutoProvisionSource::Frontegg),
1179 );
1180
1181 let snap = snapshot(vec![
1182 (mz_system_cluster(1), Diff::ONE),
1183 (server_config("enable_password_auth", "off"), Diff::ONE),
1184 (already_migrated_user_role, Diff::ONE),
1185 ]);
1186 let (repairs, stats) = compute_repairs(&snap);
1187 assert!(repairs.is_empty());
1188 assert_eq!(stats.normalized, 0);
1189 }
1190
1191 #[mz_ore::test]
1192 #[cfg_attr(miri, ignore)] fn cloud_env_preserves_existing_auto_provision_source_for_non_user_role() {
1194 let already_migrated_non_user_role = role_kind(
1198 102,
1199 "non_user_role",
1200 30102,
1201 None,
1202 None,
1203 None,
1205 );
1206
1207 let snap = snapshot(vec![
1208 (mz_system_cluster(1), Diff::ONE),
1209 (server_config("enable_password_auth", "off"), Diff::ONE),
1210 (already_migrated_non_user_role, Diff::ONE),
1211 ]);
1212 let (repairs, stats) = compute_repairs(&snap);
1213 assert!(repairs.is_empty());
1214 assert_eq!(stats.normalized, 0);
1215 }
1216
1217 #[mz_ore::test]
1218 #[cfg_attr(miri, ignore)] fn self_managed_env_skips_backfill_for_email_name() {
1220 let original = role_kind(103, "alice@materialize.com", 30103, Some(true), None, None);
1224
1225 let snap = snapshot(vec![
1226 (mz_system_cluster(0), Diff::ONE),
1227 (original, Diff::ONE),
1228 ]);
1229 let (repairs, stats) = compute_repairs(&snap);
1230 assert!(repairs.is_empty());
1231 assert_eq!(stats.normalized, 0);
1232 }
1233
1234 #[mz_ore::test]
1235 #[cfg_attr(miri, ignore)] fn self_managed_with_replication_factor_one_but_password_auth_skips_backfill() {
1237 let original = role_kind(104, "alice@materialize.com", 30104, Some(true), None, None);
1240
1241 let snap = snapshot(vec![
1242 (mz_system_cluster(1), Diff::ONE),
1243 (server_config("enable_password_auth", "on"), Diff::ONE),
1244 (original, Diff::ONE),
1245 ]);
1246 let (repairs, stats) = compute_repairs(&snap);
1247 assert!(repairs.is_empty());
1248 assert_eq!(stats.normalized, 0);
1249 }
1250
1251 #[mz_ore::test]
1252 #[cfg_attr(miri, ignore)] fn cloud_env_with_unmanaged_mz_system_and_replicas_backfills() {
1254 let original = stale_role_kind_with_dropped_field(106, "alice@materialize.com", 30106);
1258 let backfilled = role_kind(
1259 106,
1260 "alice@materialize.com",
1261 30106,
1262 None,
1263 None,
1264 Some(v83::AutoProvisionSource::Frontegg),
1265 );
1266
1267 let snap = snapshot(vec![
1268 (mz_system_cluster_unmanaged(), Diff::ONE),
1269 (cluster_replica(900, v83::ClusterId::System(1)), Diff::ONE),
1270 (original.clone(), Diff::ONE),
1271 ]);
1272 let (repairs, stats) = compute_repairs(&snap);
1273 assert_eq!(
1274 repairs,
1275 vec![(original, Diff::MINUS_ONE), (backfilled, Diff::ONE)],
1276 );
1277 assert_eq!(stats.normalized, 1);
1278 }
1279
1280 #[mz_ore::test]
1281 #[cfg_attr(miri, ignore)] fn unmanaged_mz_system_with_no_replicas_is_self_managed() {
1283 let original = stale_role_kind_with_dropped_field(107, "alice@materialize.com", 30107);
1287 let backfilled = role_kind(107, "alice@materialize.com", 30107, None, None, None);
1288
1289 let snap = snapshot(vec![
1290 (mz_system_cluster_unmanaged(), Diff::ONE),
1291 (original.clone(), Diff::ONE),
1292 ]);
1293 let (repairs, stats) = compute_repairs(&snap);
1294 assert_eq!(
1295 repairs,
1296 vec![(original, Diff::MINUS_ONE), (backfilled, Diff::ONE)],
1297 );
1298 assert_eq!(stats.normalized, 1);
1299 }
1300
1301 #[mz_ore::test]
1302 #[cfg_attr(miri, ignore)] fn cloud_env_mixed_email_and_non_email_roles() {
1304 let user = stale_role_kind_with_dropped_field(110, "user@example.com", 30110);
1308 let user_backfilled = role_kind(
1309 110,
1310 "user@example.com",
1311 30110,
1312 None,
1313 None,
1314 Some(v83::AutoProvisionSource::Frontegg),
1315 );
1316 let user_already_migrated = role_kind(
1317 111,
1318 "steve@example.com",
1319 30111,
1320 None,
1321 None,
1322 Some(v83::AutoProvisionSource::Frontegg),
1323 );
1324
1325 let non_user_role = stale_role_kind_with_dropped_field(112, "non_user_role", 30113);
1326 let non_user_role_backfilled = role_kind(112, "non_user_role", 30113, None, None, None);
1327
1328 let non_user_role_already_migrated = role_kind(
1329 113,
1330 "non_user_role_already_migrated",
1331 30114,
1332 None,
1333 None,
1334 None,
1335 );
1336
1337 let snap = snapshot(vec![
1338 (mz_system_cluster(1), Diff::ONE),
1339 (server_config("enable_password_auth", "off"), Diff::ONE),
1340 (user.clone(), Diff::ONE),
1341 (user_already_migrated, Diff::ONE),
1342 (non_user_role.clone(), Diff::ONE),
1343 (non_user_role_already_migrated.clone(), Diff::ONE),
1344 ]);
1345 let (repairs, stats) = compute_repairs(&snap);
1346 assert_eq!(
1347 repairs,
1348 vec![
1349 (user, Diff::MINUS_ONE),
1350 (user_backfilled, Diff::ONE),
1351 (non_user_role, Diff::MINUS_ONE),
1352 (non_user_role_backfilled, Diff::ONE),
1353 ],
1354 );
1355 assert_eq!(stats.normalized, 2);
1356 }
1357}