1mod base_facts;
75mod datalog;
76mod diff;
77mod logging;
78mod types;
79
80pub(crate) use types::ChangeSet;
81
82use crate::client::DeploymentKind;
83use crate::project::SchemaQualifier;
84use crate::project::analysis::deployment_snapshot::DeploymentSnapshot;
85use crate::project::ir::graph::Project;
86use std::collections::BTreeSet;
87
88use base_facts::extract_base_facts;
89use datalog::compute_dirty_datalog;
90use diff::find_changed_objects;
91
92impl ChangeSet {
93 pub(crate) fn from_deployment_snapshot_comparison(
109 old_snapshot: &DeploymentSnapshot,
110 new_snapshot: &DeploymentSnapshot,
111 project: &Project,
112 forced_dirty_schemas: &BTreeSet<SchemaQualifier>,
113 ) -> Self {
114 let changed_objects = find_changed_objects(old_snapshot, new_snapshot);
116
117 let base_facts = extract_base_facts(project);
119
120 let (dirty_stmts, dirty_clusters, dirty_schemas) =
122 compute_dirty_datalog(&changed_objects, &base_facts, forced_dirty_schemas);
123
124 let (new_replacement_objects, changed_replacement_objects) = dirty_stmts
130 .iter()
131 .filter(|obj| base_facts.is_replacement.contains(*obj))
132 .cloned()
133 .partition(|obj| {
134 !old_snapshot.objects.contains_key(obj)
135 || old_snapshot
136 .schemas
137 .get(&SchemaQualifier::new(
138 obj.expect_database().to_string(),
139 obj.schema().to_string(),
140 ))
141 .copied()
142 != Some(DeploymentKind::Replacement)
143 });
144
145 ChangeSet {
146 changed_objects: changed_objects.into_iter().collect(),
147 dirty_schemas: dirty_schemas.into_iter().collect(),
148 dirty_clusters: dirty_clusters.into_iter().collect(),
149 objects_to_deploy: dirty_stmts.into_iter().collect(),
150 new_replacement_objects,
151 changed_replacement_objects,
152 }
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::client::DeploymentKind;
160 use crate::project::SchemaQualifier;
161 use crate::project::analysis::deployment_snapshot::DeploymentSnapshot;
162 use crate::project::ast::Statement;
163 use crate::project::ir::compiled;
164 use crate::project::ir::graph::{Database, DatabaseObject, Project, Schema, SchemaType};
165 use crate::project::ir::object_id::ObjectId;
166 use base_facts::BaseFacts;
167 use datalog::compute_dirty_datalog;
168 use std::collections::{BTreeMap, BTreeSet};
169
170 #[mz_ore::test]
171 fn test_parse_object_file_path() {
172 let path = "materialize/public/users.sql";
173 let parts: Vec<&str> = path.split('/').collect();
174
175 match parts.as_slice() {
176 [db, schema, file] if file.ends_with(".sql") => {
177 assert_eq!(*db, "materialize");
178 assert_eq!(*schema, "public");
179 assert_eq!(file.strip_suffix(".sql").unwrap(), "users");
180 }
181 _ => panic!("Path didn't match expected pattern"),
182 }
183 }
184
185 #[mz_ore::test]
186 fn test_parse_schema_mod_file_path() {
187 let path = "materialize/public.sql";
188 let parts: Vec<&str> = path.split('/').collect();
189
190 match parts.as_slice() {
191 [db, schema_file] if schema_file.ends_with(".sql") => {
192 assert_eq!(*db, "materialize");
193 assert_eq!(schema_file.strip_suffix(".sql").unwrap(), "public");
194 }
195 _ => panic!("Path didn't match expected pattern"),
196 }
197 }
198
199 #[mz_ore::test]
200 fn test_parse_database_mod_file_path() {
201 let path = "materialize.sql";
202 let parts: Vec<&str> = path.split('/').collect();
203
204 match parts.as_slice() {
205 [db_file] if db_file.ends_with(".sql") => {
206 assert_eq!(db_file.strip_suffix(".sql").unwrap(), "materialize");
207 }
208 _ => panic!("Path didn't match expected pattern"),
209 }
210 }
211
212 #[mz_ore::test]
213 fn test_schema_propagation_all_objects_in_dirty_schema_are_dirty() {
214 let obj1 = ObjectId::new("db".to_string(), "schema".to_string(), "table1".to_string());
219 let obj2 = ObjectId::new("db".to_string(), "schema".to_string(), "table2".to_string());
220 let obj3 = ObjectId::new("db".to_string(), "schema".to_string(), "view1".to_string());
221
222 let base_facts = BaseFacts {
223 object_in_schema: vec![
224 (obj1.clone(), "db".to_string(), "schema".to_string()),
225 (obj2.clone(), "db".to_string(), "schema".to_string()),
226 (obj3.clone(), "db".to_string(), "schema".to_string()),
227 ],
228 depends_on: vec![],
229 stmt_uses_cluster: vec![],
230 index_uses_cluster: vec![],
231 is_sink: BTreeSet::new(),
232 is_replacement: BTreeSet::new(),
233 };
234
235 let mut changed_stmts = BTreeSet::new();
237 changed_stmts.insert(obj1.clone());
238
239 let (dirty_stmts, _dirty_clusters, dirty_schemas) =
241 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
242
243 assert!(
245 dirty_schemas.contains(&SchemaQualifier::new(
246 "db".to_string(),
247 "schema".to_string()
248 )),
249 "Schema should be marked dirty"
250 );
251
252 assert!(
254 dirty_stmts.contains(&obj1),
255 "obj1 (changed) should be dirty"
256 );
257 assert!(
258 dirty_stmts.contains(&obj2),
259 "obj2 (same schema as changed obj1) should be dirty"
260 );
261 assert!(
262 dirty_stmts.contains(&obj3),
263 "obj3 (same schema as changed obj1) should be dirty"
264 );
265 }
266
267 #[mz_ore::test]
268 fn test_forced_dirty_schema_redeploys_schema_and_dependents() {
269 let a1 = ObjectId::new("db".to_string(), "a".to_string(), "a1".to_string());
273 let a2 = ObjectId::new("db".to_string(), "a".to_string(), "a2".to_string());
274 let b1 = ObjectId::new("db".to_string(), "b".to_string(), "b1".to_string());
275 let c1 = ObjectId::new("db".to_string(), "c".to_string(), "c1".to_string());
276
277 let base_facts = BaseFacts {
278 object_in_schema: vec![
279 (a1.clone(), "db".to_string(), "a".to_string()),
280 (a2.clone(), "db".to_string(), "a".to_string()),
281 (b1.clone(), "db".to_string(), "b".to_string()),
282 (c1.clone(), "db".to_string(), "c".to_string()),
283 ],
284 depends_on: vec![(b1.clone(), a1.clone())],
286 stmt_uses_cluster: vec![],
287 index_uses_cluster: vec![],
288 is_sink: BTreeSet::new(),
289 is_replacement: BTreeSet::new(),
290 };
291
292 let changed_stmts = BTreeSet::new();
294 let mut forced = BTreeSet::new();
295 forced.insert(SchemaQualifier::new("db".to_string(), "a".to_string()));
296
297 let (dirty_stmts, _clusters, dirty_schemas) =
298 compute_dirty_datalog(&changed_stmts, &base_facts, &forced);
299
300 assert!(
301 dirty_stmts.contains(&a1),
302 "a1 in forced schema should be dirty"
303 );
304 assert!(
305 dirty_stmts.contains(&a2),
306 "a2 in forced schema should be dirty"
307 );
308 assert!(
309 dirty_stmts.contains(&b1),
310 "b1 (depends on a1) should be dirty"
311 );
312 assert!(
313 !dirty_stmts.contains(&c1),
314 "c1 (unrelated schema) should not be dirty"
315 );
316
317 assert!(dirty_schemas.contains(&SchemaQualifier::new("db".to_string(), "a".to_string())));
318 assert!(dirty_schemas.contains(&SchemaQualifier::new("db".to_string(), "b".to_string())));
319 assert!(!dirty_schemas.contains(&SchemaQualifier::new("db".to_string(), "c".to_string())));
320 }
321
322 #[mz_ore::test]
323 fn test_forced_dirty_schema_includes_replacement_objects() {
324 let mv = ObjectId::new("db".to_string(), "core".to_string(), "summary".to_string());
327
328 let base_facts = BaseFacts {
329 object_in_schema: vec![(mv.clone(), "db".to_string(), "core".to_string())],
330 depends_on: vec![],
331 stmt_uses_cluster: vec![],
332 index_uses_cluster: vec![],
333 is_sink: BTreeSet::new(),
334 is_replacement: BTreeSet::from([mv.clone()]),
335 };
336
337 let (unforced, _, _) =
339 compute_dirty_datalog(&BTreeSet::new(), &base_facts, &BTreeSet::new());
340 assert!(
341 !unforced.contains(&mv),
342 "replacement MV should not be dirty without a change or force"
343 );
344
345 let mut forced = BTreeSet::new();
347 forced.insert(SchemaQualifier::new("db".to_string(), "core".to_string()));
348 let (forced_dirty, _, _) = compute_dirty_datalog(&BTreeSet::new(), &base_facts, &forced);
349 assert!(
350 forced_dirty.contains(&mv),
351 "forcing the schema should redeploy its replacement MV"
352 );
353 }
354
355 #[mz_ore::test]
356 fn test_index_cluster_does_not_dirty_parent_object_cluster() {
357 let mv = ObjectId::new(
372 "db".to_string(),
373 "schema".to_string(),
374 "winning_bids".to_string(),
375 );
376 let other = ObjectId::new(
377 "db".to_string(),
378 "schema".to_string(),
379 "other_obj".to_string(),
380 );
381
382 let base_facts = BaseFacts {
383 object_in_schema: vec![
384 (mv.clone(), "db".to_string(), "schema".to_string()),
385 (other.clone(), "db".to_string(), "schema".to_string()),
386 ],
387 depends_on: vec![],
388 stmt_uses_cluster: vec![
389 (mv.clone(), "staging".into()),
390 (other.clone(), "quickstart".into()),
391 ],
392 index_uses_cluster: vec![(mv.clone(), "idx_item".to_string(), "quickstart".into())],
393 is_sink: BTreeSet::new(),
394 is_replacement: BTreeSet::new(),
395 };
396
397 let mut changed_stmts = BTreeSet::new();
399 changed_stmts.insert(other.clone());
400
401 let (dirty_stmts, dirty_clusters, _dirty_schemas) =
403 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
404
405 assert!(
407 dirty_clusters.iter().any(|c| c.name == "quickstart"),
408 "quickstart cluster should be dirty because other_obj changed"
409 );
410
411 assert!(
413 dirty_stmts.contains(&mv),
414 "winning_bids should be redeployed (its index uses dirty quickstart cluster)"
415 );
416
417 assert!(
421 !dirty_clusters.iter().any(|c| c.name == "staging"),
422 "staging cluster should NOT be dirty - winning_bids is only dirty due to its index, not its statement"
423 );
424 }
425
426 #[mz_ore::test]
427 fn test_index_cluster_does_not_dirty_schema() {
428 let table1 = ObjectId::new("db".to_string(), "schema".to_string(), "table1".to_string());
440 let table2 = ObjectId::new("db".to_string(), "schema".to_string(), "table2".to_string());
441 let other = ObjectId::new(
442 "db".to_string(),
443 "other_schema".to_string(),
444 "other_obj".to_string(),
445 );
446
447 let base_facts = BaseFacts {
448 object_in_schema: vec![
449 (table1.clone(), "db".to_string(), "schema".to_string()),
450 (table2.clone(), "db".to_string(), "schema".to_string()),
451 (other.clone(), "db".to_string(), "other_schema".to_string()),
452 ],
453 depends_on: vec![],
454 stmt_uses_cluster: vec![(other.clone(), "index_cluster".into())],
455 index_uses_cluster: vec![(table1.clone(), "idx1".to_string(), "index_cluster".into())],
456 is_sink: BTreeSet::new(),
457 is_replacement: BTreeSet::new(),
458 };
459
460 let mut changed_stmts = BTreeSet::new();
461 changed_stmts.insert(other.clone());
462
463 let (dirty_stmts, dirty_clusters, dirty_schemas) =
464 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
465
466 assert!(dirty_clusters.iter().any(|c| c.name == "index_cluster"));
468
469 assert!(
471 !dirty_stmts.contains(&table1),
472 "table1 should NOT be dirty - indexes don't cause redeployment"
473 );
474
475 assert!(
477 !dirty_schemas.contains(&SchemaQualifier::new(
478 "db".to_string(),
479 "schema".to_string()
480 )),
481 "schema should NOT be dirty"
482 );
483
484 assert!(!dirty_stmts.contains(&table2), "table2 should NOT be dirty");
486 }
487
488 #[mz_ore::test]
489 fn test_schema_propagation_does_not_dirty_index_clusters() {
490 let flippers = ObjectId::new(
501 "materialize".to_string(),
502 "public".to_string(),
503 "flippers".to_string(),
504 );
505 let flip_activities = ObjectId::new(
506 "materialize".to_string(),
507 "public".to_string(),
508 "flip_activities".to_string(),
509 );
510 let winning_bids = ObjectId::new(
511 "materialize".to_string(),
512 "internal".to_string(),
513 "winning_bids".to_string(),
514 );
515
516 let base_facts = BaseFacts {
517 object_in_schema: vec![
518 (
519 flippers.clone(),
520 "materialize".to_string(),
521 "public".to_string(),
522 ),
523 (
524 flip_activities.clone(),
525 "materialize".to_string(),
526 "public".to_string(),
527 ),
528 (
529 winning_bids.clone(),
530 "materialize".to_string(),
531 "internal".to_string(),
532 ),
533 ],
534 depends_on: vec![],
535 stmt_uses_cluster: vec![],
536 index_uses_cluster: vec![
537 (
538 flip_activities.clone(),
539 "idx_flipper".to_string(),
540 "quickstart".into(),
541 ),
542 (
543 winning_bids.clone(),
544 "idx_item".to_string(),
545 "quickstart".into(),
546 ),
547 ],
548 is_sink: BTreeSet::new(),
549 is_replacement: BTreeSet::new(),
550 };
551
552 let mut changed_stmts = BTreeSet::new();
553 changed_stmts.insert(flippers.clone());
554
555 let (dirty_stmts, dirty_clusters, dirty_schemas) =
556 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
557
558 assert!(dirty_schemas.contains(&SchemaQualifier::new(
560 "materialize".to_string(),
561 "public".to_string()
562 )));
563
564 assert!(
566 dirty_stmts.contains(&flip_activities),
567 "flip_activities should be dirty due to schema propagation"
568 );
569
570 assert!(
573 !dirty_clusters.iter().any(|c| c.name == "quickstart"),
574 "quickstart cluster should NOT be dirty - flip_activities is dirty due to schema propagation, not statement change"
575 );
576
577 assert!(
579 !dirty_stmts.contains(&winning_bids),
580 "winning_bids should NOT be dirty - quickstart cluster is not dirty"
581 );
582 }
583
584 #[mz_ore::test]
585 fn test_dependency_propagation_with_index_cluster_conflict() {
586 let winning_bids = ObjectId::new(
592 "materialize".to_string(),
593 "internal".to_string(),
594 "winning_bids".to_string(),
595 );
596 let flip_activities = ObjectId::new(
597 "materialize".to_string(),
598 "public".to_string(),
599 "flip_activities".to_string(),
600 );
601 let flippers = ObjectId::new(
602 "materialize".to_string(),
603 "public".to_string(),
604 "flippers".to_string(),
605 );
606
607 let base_facts = BaseFacts {
608 object_in_schema: vec![
609 (
610 winning_bids.clone(),
611 "materialize".to_string(),
612 "internal".to_string(),
613 ),
614 (
615 flip_activities.clone(),
616 "materialize".to_string(),
617 "public".to_string(),
618 ),
619 (
620 flippers.clone(),
621 "materialize".to_string(),
622 "public".to_string(),
623 ),
624 ],
625 depends_on: vec![
626 (flip_activities.clone(), winning_bids.clone()), (flippers.clone(), flip_activities.clone()), ],
629 stmt_uses_cluster: vec![(winning_bids.clone(), "staging".into())],
630 index_uses_cluster: vec![
631 (
632 winning_bids.clone(),
633 "idx_item".to_string(),
634 "quickstart".into(),
635 ),
636 (
637 flip_activities.clone(),
638 "idx_flipper".to_string(),
639 "quickstart".into(),
640 ),
641 ],
642 is_sink: BTreeSet::new(),
643 is_replacement: BTreeSet::new(),
644 };
645
646 let mut changed_stmts = BTreeSet::new();
647 changed_stmts.insert(winning_bids.clone());
648
649 let (dirty_stmts, dirty_clusters, dirty_schemas) =
650 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
651
652 assert!(
654 dirty_stmts.contains(&winning_bids),
655 "winning_bids should be dirty"
656 );
657
658 assert!(
660 dirty_schemas.contains(&SchemaQualifier::new(
661 "materialize".to_string(),
662 "internal".to_string()
663 )),
664 "materialize.internal schema should be dirty"
665 );
666
667 assert!(
669 dirty_clusters.iter().any(|c| c.name == "quickstart"),
670 "quickstart cluster should be dirty"
671 );
672
673 assert!(
675 dirty_stmts.contains(&flip_activities),
676 "flip_activities should be dirty - depends on winning_bids"
677 );
678
679 assert!(
681 dirty_schemas.contains(&SchemaQualifier::new(
682 "materialize".to_string(),
683 "public".to_string()
684 )),
685 "materialize.public schema should be dirty - flip_activities depends on winning_bids"
686 );
687
688 assert!(
690 dirty_stmts.contains(&flippers),
691 "flippers should be dirty - its schema (materialize.public) is dirty"
692 );
693 }
694
695 #[mz_ore::test]
696 fn test_index_cluster_does_not_cause_unnecessary_redeployment() {
697 let foo_b = ObjectId::new(
700 "materialize".to_string(),
701 "foo".to_string(),
702 "b".to_string(),
703 );
704 let winning_bids = ObjectId::new(
705 "materialize".to_string(),
706 "internal".to_string(),
707 "winning_bids".to_string(),
708 );
709 let flip_activities = ObjectId::new(
710 "materialize".to_string(),
711 "public".to_string(),
712 "flip_activities".to_string(),
713 );
714
715 let base_facts = BaseFacts {
716 object_in_schema: vec![
717 (foo_b.clone(), "materialize".to_string(), "foo".to_string()),
718 (
719 winning_bids.clone(),
720 "materialize".to_string(),
721 "internal".to_string(),
722 ),
723 (
724 flip_activities.clone(),
725 "materialize".to_string(),
726 "public".to_string(),
727 ),
728 ],
729 depends_on: vec![(flip_activities.clone(), winning_bids.clone())],
730 stmt_uses_cluster: vec![(winning_bids.clone(), "staging".into())],
733 index_uses_cluster: vec![
734 (
735 foo_b.clone(),
736 "default_idx".to_string(),
737 "quickstart".into(),
738 ),
739 (
740 winning_bids.clone(),
741 "idx1".to_string(),
742 "quickstart".into(),
743 ),
744 ],
745 is_sink: BTreeSet::new(),
746 is_replacement: BTreeSet::new(),
747 };
748
749 let mut changed_stmts = BTreeSet::new();
750 changed_stmts.insert(foo_b.clone());
751
752 let (dirty_stmts, dirty_clusters, dirty_schemas) =
753 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
754
755 assert!(dirty_stmts.contains(&foo_b), "foo.b should be dirty");
757 assert_eq!(
758 dirty_stmts.len(),
759 1,
760 "only foo.b should be dirty, got: {:?}",
761 dirty_stmts
762 );
763
764 assert!(
766 dirty_schemas.contains(&SchemaQualifier::new(
767 "materialize".to_string(),
768 "foo".to_string()
769 )),
770 "materialize.foo schema should be dirty"
771 );
772
773 assert!(
775 dirty_clusters.iter().any(|c| c.name == "quickstart"),
776 "quickstart cluster should be dirty"
777 );
778
779 assert!(
781 !dirty_clusters.iter().any(|c| c.name == "staging"),
782 "staging cluster should NOT be dirty"
783 );
784
785 assert!(
787 !dirty_schemas.contains(&SchemaQualifier::new(
788 "materialize".to_string(),
789 "internal".to_string()
790 )),
791 "materialize.internal schema should NOT be dirty"
792 );
793
794 assert!(
796 !dirty_stmts.contains(&winning_bids),
797 "winning_bids should NOT be dirty - index cluster doesn't cause redeployment"
798 );
799
800 assert!(
802 !dirty_stmts.contains(&flip_activities),
803 "flip_activities should NOT be dirty - winning_bids isn't dirty"
804 );
805 }
806
807 #[mz_ore::test]
808 fn test_replacement_mv_dirties_its_schema() {
809 let mv1 = ObjectId::new("db".to_string(), "analytics".to_string(), "mv1".to_string());
813 let mv2 = ObjectId::new("db".to_string(), "analytics".to_string(), "mv2".to_string());
814 let view1 = ObjectId::new(
815 "db".to_string(),
816 "analytics".to_string(),
817 "view1".to_string(),
818 );
819
820 let mut is_replacement = BTreeSet::new();
821 is_replacement.insert(mv1.clone());
822 is_replacement.insert(mv2.clone());
823 let base_facts = BaseFacts {
826 object_in_schema: vec![
827 (mv1.clone(), "db".to_string(), "analytics".to_string()),
828 (mv2.clone(), "db".to_string(), "analytics".to_string()),
829 (view1.clone(), "db".to_string(), "analytics".to_string()),
830 ],
831 depends_on: vec![],
832 stmt_uses_cluster: vec![],
833 index_uses_cluster: vec![],
834 is_sink: BTreeSet::new(),
835 is_replacement,
836 };
837
838 let mut changed_stmts = BTreeSet::new();
840 changed_stmts.insert(mv1.clone());
841
842 let (dirty_stmts, _dirty_clusters, dirty_schemas) =
843 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
844
845 assert!(dirty_stmts.contains(&mv1), "mv1 should be dirty");
847
848 assert!(
850 dirty_schemas.contains(&SchemaQualifier::new(
851 "db".to_string(),
852 "analytics".to_string()
853 )),
854 "analytics schema should be dirty - dirty replacement MV dirties its schema"
855 );
856
857 assert!(
859 dirty_stmts.contains(&mv2),
860 "mv2 should be dirty - schema redeploys atomically"
861 );
862
863 assert!(
865 dirty_stmts.contains(&view1),
866 "view1 should be dirty - schema redeploys atomically"
867 );
868 }
869
870 #[mz_ore::test]
871 fn test_replacement_mv_does_dirty_its_cluster() {
872 let mv1 = ObjectId::new("db".to_string(), "analytics".to_string(), "mv1".to_string());
875
876 let mut is_replacement = BTreeSet::new();
877 is_replacement.insert(mv1.clone());
878
879 let base_facts = BaseFacts {
880 object_in_schema: vec![(mv1.clone(), "db".to_string(), "analytics".to_string())],
881 depends_on: vec![],
882 stmt_uses_cluster: vec![(mv1.clone(), "analytics_cluster".into())],
883 index_uses_cluster: vec![],
884 is_sink: BTreeSet::new(),
885 is_replacement,
886 };
887
888 let mut changed_stmts = BTreeSet::new();
889 changed_stmts.insert(mv1.clone());
890
891 let (_dirty_stmts, dirty_clusters, _dirty_schemas) =
892 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
893
894 assert!(
895 dirty_clusters.iter().any(|c| c.name == "analytics_cluster"),
896 "analytics_cluster should be dirty - replacement MVs DO dirty clusters"
897 );
898 }
899
900 #[mz_ore::test]
901 fn test_replacement_mv_pulled_in_by_dirty_schema() {
902 let regular = ObjectId::new(
906 "db".to_string(),
907 "analytics".to_string(),
908 "regular".to_string(),
909 );
910 let other = ObjectId::new(
911 "db".to_string(),
912 "analytics".to_string(),
913 "other".to_string(),
914 );
915 let replacement_mv = ObjectId::new(
916 "db".to_string(),
917 "analytics".to_string(),
918 "my_mv".to_string(),
919 );
920
921 let mut is_replacement = BTreeSet::new();
922 is_replacement.insert(replacement_mv.clone());
923
924 let base_facts = BaseFacts {
925 object_in_schema: vec![
926 (regular.clone(), "db".to_string(), "analytics".to_string()),
927 (other.clone(), "db".to_string(), "analytics".to_string()),
928 (
929 replacement_mv.clone(),
930 "db".to_string(),
931 "analytics".to_string(),
932 ),
933 ],
934 depends_on: vec![],
935 stmt_uses_cluster: vec![],
936 index_uses_cluster: vec![],
937 is_sink: BTreeSet::new(),
938 is_replacement,
939 };
940
941 let mut changed_stmts = BTreeSet::new();
943 changed_stmts.insert(regular.clone());
944
945 let (dirty_stmts, _dirty_clusters, dirty_schemas) =
946 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
947
948 assert!(
950 dirty_schemas.contains(&SchemaQualifier::new(
951 "db".to_string(),
952 "analytics".to_string()
953 )),
954 "schema should be dirty from regular object change"
955 );
956
957 assert!(
959 dirty_stmts.contains(&other),
960 "other should be dirty via schema propagation"
961 );
962
963 assert!(
965 dirty_stmts.contains(&replacement_mv),
966 "replacement MV should be pulled in by dirty schema"
967 );
968 }
969
970 #[mz_ore::test]
971 fn test_replacement_mv_dirty_via_dependency() {
972 let upstream = ObjectId::new(
976 "db".to_string(),
977 "public".to_string(),
978 "source_view".to_string(),
979 );
980 let replacement_mv = ObjectId::new(
981 "db".to_string(),
982 "analytics".to_string(),
983 "my_mv".to_string(),
984 );
985
986 let mut is_replacement = BTreeSet::new();
987 is_replacement.insert(replacement_mv.clone());
988
989 let base_facts = BaseFacts {
990 object_in_schema: vec![
991 (upstream.clone(), "db".to_string(), "public".to_string()),
992 (
993 replacement_mv.clone(),
994 "db".to_string(),
995 "analytics".to_string(),
996 ),
997 ],
998 depends_on: vec![(replacement_mv.clone(), upstream.clone())],
999 stmt_uses_cluster: vec![],
1000 index_uses_cluster: vec![],
1001 is_sink: BTreeSet::new(),
1002 is_replacement,
1003 };
1004
1005 let mut changed_stmts = BTreeSet::new();
1006 changed_stmts.insert(upstream.clone());
1007
1008 let (dirty_stmts, _dirty_clusters, dirty_schemas) =
1009 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1010
1011 assert!(
1014 dirty_stmts.contains(&replacement_mv),
1015 "replacement MV should be dirty - depends on changed upstream"
1016 );
1017
1018 assert!(
1021 dirty_schemas.contains(&SchemaQualifier::new(
1022 "db".to_string(),
1023 "analytics".to_string()
1024 )),
1025 "analytics schema should be dirty - dirty replacement MV dirties its schema"
1026 );
1027 }
1028
1029 #[mz_ore::test]
1030 fn test_replacement_mv_dirty_via_cluster() {
1031 let regular = ObjectId::new(
1035 "db".to_string(),
1036 "public".to_string(),
1037 "regular_mv".to_string(),
1038 );
1039 let replacement_mv = ObjectId::new(
1040 "db".to_string(),
1041 "analytics".to_string(),
1042 "my_mv".to_string(),
1043 );
1044
1045 let mut is_replacement = BTreeSet::new();
1046 is_replacement.insert(replacement_mv.clone());
1047
1048 let base_facts = BaseFacts {
1049 object_in_schema: vec![
1050 (regular.clone(), "db".to_string(), "public".to_string()),
1051 (
1052 replacement_mv.clone(),
1053 "db".to_string(),
1054 "analytics".to_string(),
1055 ),
1056 ],
1057 depends_on: vec![],
1058 stmt_uses_cluster: vec![
1059 (regular.clone(), "shared_cluster".into()),
1060 (replacement_mv.clone(), "shared_cluster".into()),
1061 ],
1062 index_uses_cluster: vec![],
1063 is_sink: BTreeSet::new(),
1064 is_replacement,
1065 };
1066
1067 let mut changed_stmts = BTreeSet::new();
1069 changed_stmts.insert(regular.clone());
1070
1071 let (dirty_stmts, dirty_clusters, dirty_schemas) =
1072 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1073
1074 assert!(
1075 dirty_clusters.iter().any(|c| c.name == "shared_cluster"),
1076 "shared_cluster should be dirty"
1077 );
1078
1079 assert!(
1080 dirty_stmts.contains(&replacement_mv),
1081 "replacement MV should be dirty - its cluster is dirty"
1082 );
1083
1084 assert!(
1086 dirty_schemas.contains(&SchemaQualifier::new(
1087 "db".to_string(),
1088 "analytics".to_string()
1089 )),
1090 "analytics schema should be dirty - its replacement MV is dirty"
1091 );
1092 }
1093
1094 #[mz_ore::test]
1095 fn test_dependent_of_dirty_replacement_mv_not_dirtied() {
1096 let replacement_mv = ObjectId::new(
1101 "db".to_string(),
1102 "analytics".to_string(),
1103 "my_mv".to_string(),
1104 );
1105 let downstream = ObjectId::new(
1106 "db".to_string(),
1107 "consumer".to_string(),
1108 "report".to_string(),
1109 );
1110
1111 let mut is_replacement = BTreeSet::new();
1112 is_replacement.insert(replacement_mv.clone());
1113
1114 let base_facts = BaseFacts {
1115 object_in_schema: vec![
1116 (
1117 replacement_mv.clone(),
1118 "db".to_string(),
1119 "analytics".to_string(),
1120 ),
1121 (downstream.clone(), "db".to_string(), "consumer".to_string()),
1122 ],
1123 depends_on: vec![(downstream.clone(), replacement_mv.clone())],
1124 stmt_uses_cluster: vec![],
1125 index_uses_cluster: vec![],
1126 is_sink: BTreeSet::new(),
1127 is_replacement,
1128 };
1129
1130 let mut changed_stmts = BTreeSet::new();
1132 changed_stmts.insert(replacement_mv.clone());
1133
1134 let (dirty_stmts, _dirty_clusters, dirty_schemas) =
1135 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1136
1137 assert!(
1139 dirty_stmts.contains(&replacement_mv),
1140 "replacement MV is dirty"
1141 );
1142 assert!(
1143 dirty_schemas.contains(&SchemaQualifier::new(
1144 "db".to_string(),
1145 "analytics".to_string()
1146 )),
1147 "analytics schema should be dirty"
1148 );
1149
1150 assert!(
1152 !dirty_stmts.contains(&downstream),
1153 "downstream dependent should NOT be dirty - replacement MV does not fan out"
1154 );
1155 assert!(
1156 !dirty_schemas.contains(&SchemaQualifier::new(
1157 "db".to_string(),
1158 "consumer".to_string()
1159 )),
1160 "consumer schema should NOT be dirty"
1161 );
1162 }
1163
1164 #[mz_ore::test]
1165 fn test_mixed_replacement_and_regular_in_shared_cluster() {
1166 let regular_view = ObjectId::new(
1170 "db".to_string(),
1171 "public".to_string(),
1172 "my_view".to_string(),
1173 );
1174 let regular_view2 = ObjectId::new(
1175 "db".to_string(),
1176 "public".to_string(),
1177 "other_view".to_string(),
1178 );
1179 let replacement_mv1 = ObjectId::new(
1180 "db".to_string(),
1181 "analytics".to_string(),
1182 "mv_alpha".to_string(),
1183 );
1184 let replacement_mv2 = ObjectId::new(
1185 "db".to_string(),
1186 "analytics".to_string(),
1187 "mv_beta".to_string(),
1188 );
1189
1190 let mut is_replacement = BTreeSet::new();
1191 is_replacement.insert(replacement_mv1.clone());
1192 is_replacement.insert(replacement_mv2.clone());
1193
1194 let base_facts = BaseFacts {
1195 object_in_schema: vec![
1196 (regular_view.clone(), "db".to_string(), "public".to_string()),
1197 (
1198 regular_view2.clone(),
1199 "db".to_string(),
1200 "public".to_string(),
1201 ),
1202 (
1203 replacement_mv1.clone(),
1204 "db".to_string(),
1205 "analytics".to_string(),
1206 ),
1207 (
1208 replacement_mv2.clone(),
1209 "db".to_string(),
1210 "analytics".to_string(),
1211 ),
1212 ],
1213 depends_on: vec![],
1214 stmt_uses_cluster: vec![
1215 (regular_view.clone(), "shared".into()),
1216 (replacement_mv1.clone(), "shared".into()),
1217 (replacement_mv2.clone(), "shared".into()),
1218 ],
1219 index_uses_cluster: vec![],
1220 is_sink: BTreeSet::new(),
1221 is_replacement,
1222 };
1223
1224 let mut changed_stmts = BTreeSet::new();
1225 changed_stmts.insert(regular_view.clone());
1226
1227 let (dirty_stmts, dirty_clusters, dirty_schemas) =
1228 compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1229
1230 assert!(dirty_clusters.iter().any(|c| c.name == "shared"));
1232
1233 assert!(dirty_schemas.contains(&SchemaQualifier::new(
1235 "db".to_string(),
1236 "public".to_string()
1237 )));
1238
1239 assert!(dirty_stmts.contains(®ular_view2));
1241
1242 assert!(
1244 dirty_stmts.contains(&replacement_mv1),
1245 "replacement_mv1 should be dirty via cluster"
1246 );
1247 assert!(
1248 dirty_stmts.contains(&replacement_mv2),
1249 "replacement_mv2 should be dirty via cluster"
1250 );
1251
1252 assert!(
1255 dirty_schemas.contains(&SchemaQualifier::new(
1256 "db".to_string(),
1257 "analytics".to_string()
1258 )),
1259 "analytics schema should be dirty - its replacement MVs are dirty"
1260 );
1261 }
1262
1263 fn parse_materialized_view(sql: &str) -> Statement {
1265 let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1266 if let mz_sql_parser::ast::Statement::CreateMaterializedView(s) = &parsed[0].ast {
1267 Statement::CreateMaterializedView(s.clone())
1268 } else {
1269 panic!("Expected CreateMaterializedView");
1270 }
1271 }
1272
1273 fn build_project(
1275 db: &str,
1276 schema: &str,
1277 objects: Vec<(ObjectId, Statement)>,
1278 is_replacement: bool,
1279 ) -> Project {
1280 let db_objects: Vec<DatabaseObject> = objects
1281 .into_iter()
1282 .map(|(id, stmt)| DatabaseObject {
1283 id,
1284 typed_object: compiled::DatabaseObject {
1285 path: std::path::PathBuf::from("test.sql"),
1286 stmt,
1287 indexes: vec![],
1288 grants: vec![],
1289 comments: vec![],
1290 tests: vec![],
1291 },
1292 dependencies: BTreeSet::new(),
1293 })
1294 .collect();
1295
1296 let mut replacement_schemas = BTreeSet::new();
1297 if is_replacement {
1298 replacement_schemas.insert(SchemaQualifier::new(db.to_string(), schema.to_string()));
1299 }
1300
1301 Project {
1302 databases: vec![Database {
1303 name: db.to_string(),
1304 schemas: vec![Schema {
1305 name: schema.to_string(),
1306 objects: db_objects,
1307 mod_statements: None,
1308 schema_type: SchemaType::Compute,
1309 }],
1310 mod_statements: None,
1311 }],
1312 dependency_graph: BTreeMap::new(),
1313 external_dependencies: BTreeSet::new(),
1314 cluster_dependencies: BTreeSet::new(),
1315 tests: vec![],
1316 replacement_schemas,
1317 compile_dirty: BTreeSet::new(),
1318 }
1319 }
1320
1321 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1323 fn test_schema_transition_objects_to_replacement_routes_to_new_replacement() {
1324 let obj_id = ObjectId::new(
1329 "db".to_string(),
1330 "analytics".to_string(),
1331 "my_mv".to_string(),
1332 );
1333
1334 let old_snapshot = DeploymentSnapshot {
1336 objects: BTreeMap::from([(obj_id.clone(), "hash_old".to_string())]),
1337 schemas: BTreeMap::from([(
1338 SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1339 DeploymentKind::Objects,
1340 )]),
1341 };
1342
1343 let new_snapshot = DeploymentSnapshot {
1345 objects: BTreeMap::from([(obj_id.clone(), "hash_new".to_string())]),
1346 schemas: BTreeMap::from([(
1347 SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1348 DeploymentKind::Replacement,
1349 )]),
1350 };
1351
1352 let project = build_project(
1354 "db",
1355 "analytics",
1356 vec![(
1357 obj_id.clone(),
1358 parse_materialized_view("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c1 AS SELECT 1"),
1359 )],
1360 true, );
1362
1363 let changeset = ChangeSet::from_deployment_snapshot_comparison(
1364 &old_snapshot,
1365 &new_snapshot,
1366 &project,
1367 &BTreeSet::new(),
1368 );
1369
1370 assert!(
1372 changeset.new_replacement_objects.contains(&obj_id),
1373 "Object transitioning from Objects->Replacement schema should use blue/green swap"
1374 );
1375 assert!(
1376 !changeset.changed_replacement_objects.contains(&obj_id),
1377 "Object transitioning from Objects->Replacement schema should NOT use CREATE REPLACEMENT"
1378 );
1379 }
1380
1381 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1383 fn test_forced_replacement_schema_routes_to_changed_replacement() {
1384 let obj_id = ObjectId::new(
1388 "db".to_string(),
1389 "analytics".to_string(),
1390 "my_mv".to_string(),
1391 );
1392
1393 let snapshot = DeploymentSnapshot {
1395 objects: BTreeMap::from([(obj_id.clone(), "hash".to_string())]),
1396 schemas: BTreeMap::from([(
1397 SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1398 DeploymentKind::Replacement,
1399 )]),
1400 };
1401
1402 let project = build_project(
1403 "db",
1404 "analytics",
1405 vec![(
1406 obj_id.clone(),
1407 parse_materialized_view("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c1 AS SELECT 1"),
1408 )],
1409 true,
1410 );
1411
1412 let unforced = ChangeSet::from_deployment_snapshot_comparison(
1414 &snapshot,
1415 &snapshot,
1416 &project,
1417 &BTreeSet::new(),
1418 );
1419 assert!(
1420 unforced.is_empty(),
1421 "unchanged replacement schema should be a no-op"
1422 );
1423
1424 let forced = BTreeSet::from([SchemaQualifier::new(
1426 "db".to_string(),
1427 "analytics".to_string(),
1428 )]);
1429 let changeset =
1430 ChangeSet::from_deployment_snapshot_comparison(&snapshot, &snapshot, &project, &forced);
1431 assert!(changeset.objects_to_deploy.contains(&obj_id));
1432 assert!(
1433 changeset.changed_replacement_objects.contains(&obj_id),
1434 "forced unchanged replacement MV should redeploy via CREATE REPLACEMENT"
1435 );
1436 assert!(!changeset.new_replacement_objects.contains(&obj_id));
1437 }
1438
1439 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1441 fn test_steady_state_replacement_routes_to_changed_replacement() {
1442 let obj_id = ObjectId::new(
1446 "db".to_string(),
1447 "analytics".to_string(),
1448 "my_mv".to_string(),
1449 );
1450
1451 let old_snapshot = DeploymentSnapshot {
1453 objects: BTreeMap::from([(obj_id.clone(), "hash_old".to_string())]),
1454 schemas: BTreeMap::from([(
1455 SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1456 DeploymentKind::Replacement,
1457 )]),
1458 };
1459
1460 let new_snapshot = DeploymentSnapshot {
1462 objects: BTreeMap::from([(obj_id.clone(), "hash_new".to_string())]),
1463 schemas: BTreeMap::from([(
1464 SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1465 DeploymentKind::Replacement,
1466 )]),
1467 };
1468
1469 let project = build_project(
1470 "db",
1471 "analytics",
1472 vec![(
1473 obj_id.clone(),
1474 parse_materialized_view("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c1 AS SELECT 1"),
1475 )],
1476 true,
1477 );
1478
1479 let changeset = ChangeSet::from_deployment_snapshot_comparison(
1480 &old_snapshot,
1481 &new_snapshot,
1482 &project,
1483 &BTreeSet::new(),
1484 );
1485
1486 assert!(
1488 changeset.changed_replacement_objects.contains(&obj_id),
1489 "Object in steady-state Replacement schema should use CREATE REPLACEMENT"
1490 );
1491 assert!(
1492 !changeset.new_replacement_objects.contains(&obj_id),
1493 "Object in steady-state Replacement schema should NOT use blue/green swap"
1494 );
1495 }
1496}