Skip to main content

mz_deploy/project/analysis/
changeset.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Change detection for incremental deployment.
11//!
12//! This module implements a Dirty Propagation Algorithm to determine
13//! which database objects, schemas, and clusters need redeployment after changes.
14//!
15//! ## Algorithm Overview
16//!
17//! The algorithm computes three result sets via fixed-point iteration:
18//! - `DirtyStmt(object)` - All objects that must be reprocessed
19//! - `DirtyCluster(cluster)` - All clusters that must be refreshed
20//! - `DirtySchema(database, schema)` - All schemas containing dirty objects
21//!
22//! ## Seeds
23//!
24//! The fixed-point starts from two caller-supplied inputs:
25//! - `ChangedStmt(O)` — objects whose hashes differ between the old and new
26//!   snapshots.
27//! - `ForcedSchema(Db, Sch)` — schemas the caller marks dirty unconditionally
28//!   (`stage --redeploy-schema`), redeployed even when nothing in them changed.
29//!
30//! ## Propagation Rules
31//!
32//! ### Rule Category 1 — Statement Dirtiness
33//! ```datalog
34//! DirtyStmt(O) :- ChangedStmt(O)                             # Changed objects are dirty
35//! DirtyStmt(O) :- StmtUsesCluster(O, C), DirtyCluster(C)     # Objects on dirty statement clusters are dirty
36//! DirtyStmt(O) :- DependsOn(O, P), DirtyStmt(P), NOT IsReplacement(P)  # Downstream dependents are dirty, except through replacement MVs
37//! DirtyStmt(O) :- DirtySchema(Db, Sch), ObjectInSchema(O, Db, Sch)     # Every object in a dirty schema is dirty
38//! ```
39//!
40//! **Replacement MVs:** A replacement MV (in a stable-API schema, redeployed in
41//! place) has exactly one special property — its dirtiness does not propagate
42//! *downstream* to dependents in other schemas. Otherwise it behaves like any
43//! other compute object: a dirty replacement MV dirties its schema, a dirty
44//! stable schema redeploys all of its MVs atomically, and a dirty cluster
45//! propagates normally.
46//!
47//! **Key Insight:** Index clusters do NOT cause objects to be marked dirty. Indexes are physical
48//! optimizations that can be managed independently without redeploying the object's statement.
49//! If object A's index uses a dirty cluster, object A is NOT marked for redeployment.
50//!
51//! ### Rule Category 2 — Cluster Dirtiness
52//! ```datalog
53//! DirtyCluster(C) :- ChangedStmt(O), StmtUsesCluster(O, C), NOT IsSink(O), ClusterBoundary(C)   # Clusters of changed statements are dirty within the boundary
54//! DirtyCluster(C) :- ChangedStmt(O), IndexUsesCluster(O, _, C), NOT IsSink(O), ClusterBoundary(C) # Clusters of changed indexes are dirty within the boundary
55//! ```
56//!
57//! **Note:** Clusters are only marked dirty when the STATEMENT itself changes,
58//! not when the object is dirty for other reasons (dependencies, schema propagation, etc.).
59//! **Sinks are excluded** because they write to external systems and are created after the swap.
60//! `ClusterBoundary` is the set of clusters referenced by statements or
61//! indexes in the project. A cluster can become dirty only if it is both used
62//! by a changed object and present in that boundary.
63//!
64//! ### Rule Category 3 — Schema Dirtiness
65//! ```datalog
66//! DirtySchema(Db, Sch) :- ForcedSchema(Db, Sch)                                    # Forced schemas are dirty up front (seed)
67//! DirtySchema(Db, Sch) :- DirtyStmt(O), ObjectInSchema(O, Db, Sch), NOT IsSink(O)  # Dirty objects make their schemas dirty (excluding sinks)
68//! ```
69//!
70//! **Key Property:** All dirty objects (except sinks) contribute to schema dirtiness, which triggers
71//! schema-level atomic redeployment. Sinks are excluded because they are created after the swap
72//! during apply and shouldn't cause other objects to be redeployed.
73
74mod base_facts;
75mod datalog;
76mod diff;
77mod logging;
78mod types;
79
80pub(crate) use types::ChangeSet;
81
82use crate::client::DeploymentKind;
83use crate::project::SchemaQualifier;
84use crate::project::analysis::deployment_snapshot::DeploymentSnapshot;
85use crate::project::ir::graph::Project;
86use std::collections::BTreeSet;
87
88use base_facts::extract_base_facts;
89use datalog::compute_dirty_datalog;
90use diff::find_changed_objects;
91
92impl ChangeSet {
93    /// Create a ChangeSet by comparing old and new deployment snapshots using Datalog.
94    ///
95    /// This method uses Datalog fixed-point computation to determine the transitive
96    /// closure of all objects, clusters, and schemas affected by changes.
97    ///
98    /// # Arguments
99    /// * `old_snapshot` - Previous deployment snapshot
100    /// * `new_snapshot` - Current deployment snapshot
101    /// * `project` - MIR project with dependency information
102    /// * `forced_dirty_schemas` - Schemas to treat as dirty even when unchanged
103    ///   (e.g. `stage --redeploy-schema`); pass an empty set for pure
104    ///   change-driven dirtiness
105    ///
106    /// # Returns
107    /// A ChangeSet identifying all objects requiring redeployment
108    pub(crate) fn from_deployment_snapshot_comparison(
109        old_snapshot: &DeploymentSnapshot,
110        new_snapshot: &DeploymentSnapshot,
111        project: &Project,
112        forced_dirty_schemas: &BTreeSet<SchemaQualifier>,
113    ) -> Self {
114        // Step 1: Find changed objects by comparing hashes
115        let changed_objects = find_changed_objects(old_snapshot, new_snapshot);
116
117        // Step 2: Extract base facts from project
118        let base_facts = extract_base_facts(project);
119
120        // Step 3: Run Datalog fixed-point computation
121        let (dirty_stmts, dirty_clusters, dirty_schemas) =
122            compute_dirty_datalog(&changed_objects, &base_facts, forced_dirty_schemas);
123
124        // Step 4: Separate replacement objects into new vs changed
125        // - New: use blue-green swap. Includes objects not in old snapshot OR objects whose
126        //   old schema was not Replacement (e.g., transitioning from Objects to Replacement)
127        // - Changed: use CREATE REPLACEMENT. Only for objects that existed in old snapshot
128        //   AND whose old schema was already Replacement kind.
129        let (new_replacement_objects, changed_replacement_objects) = dirty_stmts
130            .iter()
131            .filter(|obj| base_facts.is_replacement.contains(*obj))
132            .cloned()
133            .partition(|obj| {
134                !old_snapshot.objects.contains_key(obj)
135                    || old_snapshot
136                        .schemas
137                        .get(&SchemaQualifier::new(
138                            obj.expect_database().to_string(),
139                            obj.schema().to_string(),
140                        ))
141                        .copied()
142                        != Some(DeploymentKind::Replacement)
143            });
144
145        ChangeSet {
146            changed_objects: changed_objects.into_iter().collect(),
147            dirty_schemas: dirty_schemas.into_iter().collect(),
148            dirty_clusters: dirty_clusters.into_iter().collect(),
149            objects_to_deploy: dirty_stmts.into_iter().collect(),
150            new_replacement_objects,
151            changed_replacement_objects,
152        }
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::client::DeploymentKind;
160    use crate::project::SchemaQualifier;
161    use crate::project::analysis::deployment_snapshot::DeploymentSnapshot;
162    use crate::project::ast::Statement;
163    use crate::project::ir::compiled;
164    use crate::project::ir::graph::{Database, DatabaseObject, Project, Schema, SchemaType};
165    use crate::project::ir::object_id::ObjectId;
166    use base_facts::BaseFacts;
167    use datalog::compute_dirty_datalog;
168    use std::collections::{BTreeMap, BTreeSet};
169
170    #[mz_ore::test]
171    fn test_parse_object_file_path() {
172        let path = "materialize/public/users.sql";
173        let parts: Vec<&str> = path.split('/').collect();
174
175        match parts.as_slice() {
176            [db, schema, file] if file.ends_with(".sql") => {
177                assert_eq!(*db, "materialize");
178                assert_eq!(*schema, "public");
179                assert_eq!(file.strip_suffix(".sql").unwrap(), "users");
180            }
181            _ => panic!("Path didn't match expected pattern"),
182        }
183    }
184
185    #[mz_ore::test]
186    fn test_parse_schema_mod_file_path() {
187        let path = "materialize/public.sql";
188        let parts: Vec<&str> = path.split('/').collect();
189
190        match parts.as_slice() {
191            [db, schema_file] if schema_file.ends_with(".sql") => {
192                assert_eq!(*db, "materialize");
193                assert_eq!(schema_file.strip_suffix(".sql").unwrap(), "public");
194            }
195            _ => panic!("Path didn't match expected pattern"),
196        }
197    }
198
199    #[mz_ore::test]
200    fn test_parse_database_mod_file_path() {
201        let path = "materialize.sql";
202        let parts: Vec<&str> = path.split('/').collect();
203
204        match parts.as_slice() {
205            [db_file] if db_file.ends_with(".sql") => {
206                assert_eq!(db_file.strip_suffix(".sql").unwrap(), "materialize");
207            }
208            _ => panic!("Path didn't match expected pattern"),
209        }
210    }
211
212    #[mz_ore::test]
213    fn test_schema_propagation_all_objects_in_dirty_schema_are_dirty() {
214        // Test that when one object in a schema becomes dirty,
215        // ALL objects in that schema become dirty (schema-level atomicity)
216
217        // Create base facts for a schema with 3 objects
218        let obj1 = ObjectId::new("db".to_string(), "schema".to_string(), "table1".to_string());
219        let obj2 = ObjectId::new("db".to_string(), "schema".to_string(), "table2".to_string());
220        let obj3 = ObjectId::new("db".to_string(), "schema".to_string(), "view1".to_string());
221
222        let base_facts = BaseFacts {
223            object_in_schema: vec![
224                (obj1.clone(), "db".to_string(), "schema".to_string()),
225                (obj2.clone(), "db".to_string(), "schema".to_string()),
226                (obj3.clone(), "db".to_string(), "schema".to_string()),
227            ],
228            depends_on: vec![],
229            stmt_uses_cluster: vec![],
230            index_uses_cluster: vec![],
231            is_sink: BTreeSet::new(),
232            is_replacement: BTreeSet::new(),
233        };
234
235        // Only obj1 is changed
236        let mut changed_stmts = BTreeSet::new();
237        changed_stmts.insert(obj1.clone());
238
239        // Run Datalog computation
240        let (dirty_stmts, _dirty_clusters, dirty_schemas) =
241            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
242
243        // Verify schema is dirty
244        assert!(
245            dirty_schemas.contains(&SchemaQualifier::new(
246                "db".to_string(),
247                "schema".to_string()
248            )),
249            "Schema should be marked dirty"
250        );
251
252        // CRITICAL: All objects in the dirty schema should be dirty
253        assert!(
254            dirty_stmts.contains(&obj1),
255            "obj1 (changed) should be dirty"
256        );
257        assert!(
258            dirty_stmts.contains(&obj2),
259            "obj2 (same schema as changed obj1) should be dirty"
260        );
261        assert!(
262            dirty_stmts.contains(&obj3),
263            "obj3 (same schema as changed obj1) should be dirty"
264        );
265    }
266
267    #[mz_ore::test]
268    fn test_forced_dirty_schema_redeploys_schema_and_dependents() {
269        // Nothing changed, but schema "a" is forced dirty. Every object in "a"
270        // and its downstream dependents (even in other schemas) must be dirty,
271        // while an unrelated schema stays clean.
272        let a1 = ObjectId::new("db".to_string(), "a".to_string(), "a1".to_string());
273        let a2 = ObjectId::new("db".to_string(), "a".to_string(), "a2".to_string());
274        let b1 = ObjectId::new("db".to_string(), "b".to_string(), "b1".to_string());
275        let c1 = ObjectId::new("db".to_string(), "c".to_string(), "c1".to_string());
276
277        let base_facts = BaseFacts {
278            object_in_schema: vec![
279                (a1.clone(), "db".to_string(), "a".to_string()),
280                (a2.clone(), "db".to_string(), "a".to_string()),
281                (b1.clone(), "db".to_string(), "b".to_string()),
282                (c1.clone(), "db".to_string(), "c".to_string()),
283            ],
284            // b1 depends on a1 (child, parent).
285            depends_on: vec![(b1.clone(), a1.clone())],
286            stmt_uses_cluster: vec![],
287            index_uses_cluster: vec![],
288            is_sink: BTreeSet::new(),
289            is_replacement: BTreeSet::new(),
290        };
291
292        // Nothing changed; force schema "a" dirty.
293        let changed_stmts = BTreeSet::new();
294        let mut forced = BTreeSet::new();
295        forced.insert(SchemaQualifier::new("db".to_string(), "a".to_string()));
296
297        let (dirty_stmts, _clusters, dirty_schemas) =
298            compute_dirty_datalog(&changed_stmts, &base_facts, &forced);
299
300        assert!(
301            dirty_stmts.contains(&a1),
302            "a1 in forced schema should be dirty"
303        );
304        assert!(
305            dirty_stmts.contains(&a2),
306            "a2 in forced schema should be dirty"
307        );
308        assert!(
309            dirty_stmts.contains(&b1),
310            "b1 (depends on a1) should be dirty"
311        );
312        assert!(
313            !dirty_stmts.contains(&c1),
314            "c1 (unrelated schema) should not be dirty"
315        );
316
317        assert!(dirty_schemas.contains(&SchemaQualifier::new("db".to_string(), "a".to_string())));
318        assert!(dirty_schemas.contains(&SchemaQualifier::new("db".to_string(), "b".to_string())));
319        assert!(!dirty_schemas.contains(&SchemaQualifier::new("db".to_string(), "c".to_string())));
320    }
321
322    #[mz_ore::test]
323    fn test_forced_dirty_schema_includes_replacement_objects() {
324        // Normal schema atomicity skips replacement MVs, but a schema the caller
325        // explicitly forces must redeploy them too.
326        let mv = ObjectId::new("db".to_string(), "core".to_string(), "summary".to_string());
327
328        let base_facts = BaseFacts {
329            object_in_schema: vec![(mv.clone(), "db".to_string(), "core".to_string())],
330            depends_on: vec![],
331            stmt_uses_cluster: vec![],
332            index_uses_cluster: vec![],
333            is_sink: BTreeSet::new(),
334            is_replacement: BTreeSet::from([mv.clone()]),
335        };
336
337        // Not forced: the unchanged replacement MV stays clean.
338        let (unforced, _, _) =
339            compute_dirty_datalog(&BTreeSet::new(), &base_facts, &BTreeSet::new());
340        assert!(
341            !unforced.contains(&mv),
342            "replacement MV should not be dirty without a change or force"
343        );
344
345        // Forced: the replacement MV is redeployed.
346        let mut forced = BTreeSet::new();
347        forced.insert(SchemaQualifier::new("db".to_string(), "core".to_string()));
348        let (forced_dirty, _, _) = compute_dirty_datalog(&BTreeSet::new(), &base_facts, &forced);
349        assert!(
350            forced_dirty.contains(&mv),
351            "forcing the schema should redeploy its replacement MV"
352        );
353    }
354
355    #[mz_ore::test]
356    fn test_index_cluster_does_not_dirty_parent_object_cluster() {
357        // Critical test: If an index uses a dirty cluster, the index should be redeployed,
358        // but the parent object and its cluster should NOT be marked dirty.
359        //
360        // Scenario:
361        // - winning_bids MV on "staging" cluster
362        // - Index on winning_bids using "quickstart" cluster
363        // - some_other_obj on "quickstart" cluster changes
364        //
365        // Expected:
366        // - quickstart cluster becomes dirty
367        // - Index needs redeployment
368        // - winning_bids needs redeployment (to deploy its index)
369        // - BUT staging cluster should NOT be dirty
370
371        let mv = ObjectId::new(
372            "db".to_string(),
373            "schema".to_string(),
374            "winning_bids".to_string(),
375        );
376        let other = ObjectId::new(
377            "db".to_string(),
378            "schema".to_string(),
379            "other_obj".to_string(),
380        );
381
382        let base_facts = BaseFacts {
383            object_in_schema: vec![
384                (mv.clone(), "db".to_string(), "schema".to_string()),
385                (other.clone(), "db".to_string(), "schema".to_string()),
386            ],
387            depends_on: vec![],
388            stmt_uses_cluster: vec![
389                (mv.clone(), "staging".into()),
390                (other.clone(), "quickstart".into()),
391            ],
392            index_uses_cluster: vec![(mv.clone(), "idx_item".to_string(), "quickstart".into())],
393            is_sink: BTreeSet::new(),
394            is_replacement: BTreeSet::new(),
395        };
396
397        // Only other_obj is changed
398        let mut changed_stmts = BTreeSet::new();
399        changed_stmts.insert(other.clone());
400
401        // Run Datalog computation
402        let (dirty_stmts, dirty_clusters, _dirty_schemas) =
403            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
404
405        // Verify quickstart cluster is dirty
406        assert!(
407            dirty_clusters.iter().any(|c| c.name == "quickstart"),
408            "quickstart cluster should be dirty because other_obj changed"
409        );
410
411        // Verify winning_bids needs redeployment (because its index uses dirty cluster)
412        assert!(
413            dirty_stmts.contains(&mv),
414            "winning_bids should be redeployed (its index uses dirty quickstart cluster)"
415        );
416
417        // CRITICAL: staging cluster should NOT be dirty
418        // The MV's statement uses staging, but the MV is only dirty because of its index,
419        // not because its statement changed. Therefore staging should not be marked dirty.
420        assert!(
421            !dirty_clusters.iter().any(|c| c.name == "staging"),
422            "staging cluster should NOT be dirty - winning_bids is only dirty due to its index, not its statement"
423        );
424    }
425
426    #[mz_ore::test]
427    fn test_index_cluster_does_not_dirty_schema() {
428        // Scenario:
429        // - table1 and table2 in the same schema
430        // - table1 has index on cluster "index_cluster"
431        // - some_other_obj uses "index_cluster" and changes
432        //
433        // Expected (NEW BEHAVIOR):
434        // - index_cluster becomes dirty
435        // - table1 should NOT be dirty (indexes don't cause redeployment)
436        // - schema should NOT be dirty
437        // - table2 should NOT be dirty
438
439        let table1 = ObjectId::new("db".to_string(), "schema".to_string(), "table1".to_string());
440        let table2 = ObjectId::new("db".to_string(), "schema".to_string(), "table2".to_string());
441        let other = ObjectId::new(
442            "db".to_string(),
443            "other_schema".to_string(),
444            "other_obj".to_string(),
445        );
446
447        let base_facts = BaseFacts {
448            object_in_schema: vec![
449                (table1.clone(), "db".to_string(), "schema".to_string()),
450                (table2.clone(), "db".to_string(), "schema".to_string()),
451                (other.clone(), "db".to_string(), "other_schema".to_string()),
452            ],
453            depends_on: vec![],
454            stmt_uses_cluster: vec![(other.clone(), "index_cluster".into())],
455            index_uses_cluster: vec![(table1.clone(), "idx1".to_string(), "index_cluster".into())],
456            is_sink: BTreeSet::new(),
457            is_replacement: BTreeSet::new(),
458        };
459
460        let mut changed_stmts = BTreeSet::new();
461        changed_stmts.insert(other.clone());
462
463        let (dirty_stmts, dirty_clusters, dirty_schemas) =
464            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
465
466        // index_cluster should be dirty
467        assert!(dirty_clusters.iter().any(|c| c.name == "index_cluster"));
468
469        // table1 should NOT be dirty (indexes don't cause object redeployment)
470        assert!(
471            !dirty_stmts.contains(&table1),
472            "table1 should NOT be dirty - indexes don't cause redeployment"
473        );
474
475        // Schema should NOT be dirty
476        assert!(
477            !dirty_schemas.contains(&SchemaQualifier::new(
478                "db".to_string(),
479                "schema".to_string()
480            )),
481            "schema should NOT be dirty"
482        );
483
484        // And table2 should NOT be dirty
485        assert!(!dirty_stmts.contains(&table2), "table2 should NOT be dirty");
486    }
487
488    #[mz_ore::test]
489    fn test_schema_propagation_does_not_dirty_index_clusters() {
490        // Scenario from real deployment:
491        // - flip_activities and flippers in materialize.public schema
492        // - flip_activities has index on "quickstart" cluster
493        // - winning_bids in materialize.internal schema has index on "quickstart"
494        // - When flippers changes:
495        //   - materialize.public schema becomes dirty
496        //   - flip_activities becomes dirty (schema propagation)
497        //   - BUT quickstart cluster should NOT become dirty
498        //   - winning_bids should NOT be redeployed
499
500        let flippers = ObjectId::new(
501            "materialize".to_string(),
502            "public".to_string(),
503            "flippers".to_string(),
504        );
505        let flip_activities = ObjectId::new(
506            "materialize".to_string(),
507            "public".to_string(),
508            "flip_activities".to_string(),
509        );
510        let winning_bids = ObjectId::new(
511            "materialize".to_string(),
512            "internal".to_string(),
513            "winning_bids".to_string(),
514        );
515
516        let base_facts = BaseFacts {
517            object_in_schema: vec![
518                (
519                    flippers.clone(),
520                    "materialize".to_string(),
521                    "public".to_string(),
522                ),
523                (
524                    flip_activities.clone(),
525                    "materialize".to_string(),
526                    "public".to_string(),
527                ),
528                (
529                    winning_bids.clone(),
530                    "materialize".to_string(),
531                    "internal".to_string(),
532                ),
533            ],
534            depends_on: vec![],
535            stmt_uses_cluster: vec![],
536            index_uses_cluster: vec![
537                (
538                    flip_activities.clone(),
539                    "idx_flipper".to_string(),
540                    "quickstart".into(),
541                ),
542                (
543                    winning_bids.clone(),
544                    "idx_item".to_string(),
545                    "quickstart".into(),
546                ),
547            ],
548            is_sink: BTreeSet::new(),
549            is_replacement: BTreeSet::new(),
550        };
551
552        let mut changed_stmts = BTreeSet::new();
553        changed_stmts.insert(flippers.clone());
554
555        let (dirty_stmts, dirty_clusters, dirty_schemas) =
556            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
557
558        // materialize.public schema should be dirty
559        assert!(dirty_schemas.contains(&SchemaQualifier::new(
560            "materialize".to_string(),
561            "public".to_string()
562        )));
563
564        // flip_activities should be dirty due to schema propagation
565        assert!(
566            dirty_stmts.contains(&flip_activities),
567            "flip_activities should be dirty due to schema propagation"
568        );
569
570        // CRITICAL: quickstart cluster should NOT be dirty
571        // flip_activities is dirty due to schema propagation, not because its statement changed
572        assert!(
573            !dirty_clusters.iter().any(|c| c.name == "quickstart"),
574            "quickstart cluster should NOT be dirty - flip_activities is dirty due to schema propagation, not statement change"
575        );
576
577        // winning_bids should NOT be dirty
578        assert!(
579            !dirty_stmts.contains(&winning_bids),
580            "winning_bids should NOT be dirty - quickstart cluster is not dirty"
581        );
582    }
583
584    #[mz_ore::test]
585    fn test_dependency_propagation_with_index_cluster_conflict() {
586        // Real-world bug scenario:
587        // - winning_bids changes (has index on quickstart)
588        // - flip_activities depends on winning_bids (also has index on quickstart)
589        // - flippers depends on flip_activities
590
591        let winning_bids = ObjectId::new(
592            "materialize".to_string(),
593            "internal".to_string(),
594            "winning_bids".to_string(),
595        );
596        let flip_activities = ObjectId::new(
597            "materialize".to_string(),
598            "public".to_string(),
599            "flip_activities".to_string(),
600        );
601        let flippers = ObjectId::new(
602            "materialize".to_string(),
603            "public".to_string(),
604            "flippers".to_string(),
605        );
606
607        let base_facts = BaseFacts {
608            object_in_schema: vec![
609                (
610                    winning_bids.clone(),
611                    "materialize".to_string(),
612                    "internal".to_string(),
613                ),
614                (
615                    flip_activities.clone(),
616                    "materialize".to_string(),
617                    "public".to_string(),
618                ),
619                (
620                    flippers.clone(),
621                    "materialize".to_string(),
622                    "public".to_string(),
623                ),
624            ],
625            depends_on: vec![
626                (flip_activities.clone(), winning_bids.clone()), // flip_activities depends on winning_bids
627                (flippers.clone(), flip_activities.clone()), // flippers depends on flip_activities
628            ],
629            stmt_uses_cluster: vec![(winning_bids.clone(), "staging".into())],
630            index_uses_cluster: vec![
631                (
632                    winning_bids.clone(),
633                    "idx_item".to_string(),
634                    "quickstart".into(),
635                ),
636                (
637                    flip_activities.clone(),
638                    "idx_flipper".to_string(),
639                    "quickstart".into(),
640                ),
641            ],
642            is_sink: BTreeSet::new(),
643            is_replacement: BTreeSet::new(),
644        };
645
646        let mut changed_stmts = BTreeSet::new();
647        changed_stmts.insert(winning_bids.clone());
648
649        let (dirty_stmts, dirty_clusters, dirty_schemas) =
650            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
651
652        // winning_bids should be dirty (changed)
653        assert!(
654            dirty_stmts.contains(&winning_bids),
655            "winning_bids should be dirty"
656        );
657
658        // materialize.internal schema should be dirty
659        assert!(
660            dirty_schemas.contains(&SchemaQualifier::new(
661                "materialize".to_string(),
662                "internal".to_string()
663            )),
664            "materialize.internal schema should be dirty"
665        );
666
667        // quickstart cluster should be dirty (winning_bids has index on it)
668        assert!(
669            dirty_clusters.iter().any(|c| c.name == "quickstart"),
670            "quickstart cluster should be dirty"
671        );
672
673        // flip_activities should be dirty (depends on winning_bids)
674        assert!(
675            dirty_stmts.contains(&flip_activities),
676            "flip_activities should be dirty - depends on winning_bids"
677        );
678
679        // CRITICAL: materialize.public schema should be dirty
680        assert!(
681            dirty_schemas.contains(&SchemaQualifier::new(
682                "materialize".to_string(),
683                "public".to_string()
684            )),
685            "materialize.public schema should be dirty - flip_activities depends on winning_bids"
686        );
687
688        // flippers should be dirty (materialize.public schema is dirty)
689        assert!(
690            dirty_stmts.contains(&flippers),
691            "flippers should be dirty - its schema (materialize.public) is dirty"
692        );
693    }
694
695    #[mz_ore::test]
696    fn test_index_cluster_does_not_cause_unnecessary_redeployment() {
697        // Real-world scenario from auction_house project
698
699        let foo_b = ObjectId::new(
700            "materialize".to_string(),
701            "foo".to_string(),
702            "b".to_string(),
703        );
704        let winning_bids = ObjectId::new(
705            "materialize".to_string(),
706            "internal".to_string(),
707            "winning_bids".to_string(),
708        );
709        let flip_activities = ObjectId::new(
710            "materialize".to_string(),
711            "public".to_string(),
712            "flip_activities".to_string(),
713        );
714
715        let base_facts = BaseFacts {
716            object_in_schema: vec![
717                (foo_b.clone(), "materialize".to_string(), "foo".to_string()),
718                (
719                    winning_bids.clone(),
720                    "materialize".to_string(),
721                    "internal".to_string(),
722                ),
723                (
724                    flip_activities.clone(),
725                    "materialize".to_string(),
726                    "public".to_string(),
727                ),
728            ],
729            depends_on: vec![(flip_activities.clone(), winning_bids.clone())],
730            // foo.b has default index in quickstart
731            // winning_bids has MV in staging, index in quickstart
732            stmt_uses_cluster: vec![(winning_bids.clone(), "staging".into())],
733            index_uses_cluster: vec![
734                (
735                    foo_b.clone(),
736                    "default_idx".to_string(),
737                    "quickstart".into(),
738                ),
739                (
740                    winning_bids.clone(),
741                    "idx1".to_string(),
742                    "quickstart".into(),
743                ),
744            ],
745            is_sink: BTreeSet::new(),
746            is_replacement: BTreeSet::new(),
747        };
748
749        let mut changed_stmts = BTreeSet::new();
750        changed_stmts.insert(foo_b.clone());
751
752        let (dirty_stmts, dirty_clusters, dirty_schemas) =
753            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
754
755        // Only foo.b should be dirty
756        assert!(dirty_stmts.contains(&foo_b), "foo.b should be dirty");
757        assert_eq!(
758            dirty_stmts.len(),
759            1,
760            "only foo.b should be dirty, got: {:?}",
761            dirty_stmts
762        );
763
764        // materialize.foo schema should be dirty
765        assert!(
766            dirty_schemas.contains(&SchemaQualifier::new(
767                "materialize".to_string(),
768                "foo".to_string()
769            )),
770            "materialize.foo schema should be dirty"
771        );
772
773        // quickstart cluster should be dirty (foo.b has index on it)
774        assert!(
775            dirty_clusters.iter().any(|c| c.name == "quickstart"),
776            "quickstart cluster should be dirty"
777        );
778
779        // staging cluster should NOT be dirty (no changed objects use it)
780        assert!(
781            !dirty_clusters.iter().any(|c| c.name == "staging"),
782            "staging cluster should NOT be dirty"
783        );
784
785        // materialize.internal schema should NOT be dirty
786        assert!(
787            !dirty_schemas.contains(&SchemaQualifier::new(
788                "materialize".to_string(),
789                "internal".to_string()
790            )),
791            "materialize.internal schema should NOT be dirty"
792        );
793
794        // winning_bids should NOT be dirty (even though it has index in quickstart)
795        assert!(
796            !dirty_stmts.contains(&winning_bids),
797            "winning_bids should NOT be dirty - index cluster doesn't cause redeployment"
798        );
799
800        // flip_activities should NOT be dirty (winning_bids isn't dirty)
801        assert!(
802            !dirty_stmts.contains(&flip_activities),
803            "flip_activities should NOT be dirty - winning_bids isn't dirty"
804        );
805    }
806
807    #[mz_ore::test]
808    fn test_replacement_mv_dirties_its_schema() {
809        // A changed replacement MV dirties its schema, which redeploys
810        // atomically: every other object in the schema is pulled in too.
811
812        let mv1 = ObjectId::new("db".to_string(), "analytics".to_string(), "mv1".to_string());
813        let mv2 = ObjectId::new("db".to_string(), "analytics".to_string(), "mv2".to_string());
814        let view1 = ObjectId::new(
815            "db".to_string(),
816            "analytics".to_string(),
817            "view1".to_string(),
818        );
819
820        let mut is_replacement = BTreeSet::new();
821        is_replacement.insert(mv1.clone());
822        is_replacement.insert(mv2.clone());
823        // view1 is NOT a replacement (mixed schema scenario)
824
825        let base_facts = BaseFacts {
826            object_in_schema: vec![
827                (mv1.clone(), "db".to_string(), "analytics".to_string()),
828                (mv2.clone(), "db".to_string(), "analytics".to_string()),
829                (view1.clone(), "db".to_string(), "analytics".to_string()),
830            ],
831            depends_on: vec![],
832            stmt_uses_cluster: vec![],
833            index_uses_cluster: vec![],
834            is_sink: BTreeSet::new(),
835            is_replacement,
836        };
837
838        // Only mv1 changed
839        let mut changed_stmts = BTreeSet::new();
840        changed_stmts.insert(mv1.clone());
841
842        let (dirty_stmts, _dirty_clusters, dirty_schemas) =
843            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
844
845        // mv1 should be dirty (it changed)
846        assert!(dirty_stmts.contains(&mv1), "mv1 should be dirty");
847
848        // Schema should be dirty - a dirty replacement MV dirties its schema
849        assert!(
850            dirty_schemas.contains(&SchemaQualifier::new(
851                "db".to_string(),
852                "analytics".to_string()
853            )),
854            "analytics schema should be dirty - dirty replacement MV dirties its schema"
855        );
856
857        // mv2 should be dirty (schema atomicity pulls in the sibling replacement MV)
858        assert!(
859            dirty_stmts.contains(&mv2),
860            "mv2 should be dirty - schema redeploys atomically"
861        );
862
863        // view1 should be dirty too (schema atomicity)
864        assert!(
865            dirty_stmts.contains(&view1),
866            "view1 should be dirty - schema redeploys atomically"
867        );
868    }
869
870    #[mz_ore::test]
871    fn test_replacement_mv_does_dirty_its_cluster() {
872        // Unlike sinks, replacement MVs DO make their clusters dirty.
873
874        let mv1 = ObjectId::new("db".to_string(), "analytics".to_string(), "mv1".to_string());
875
876        let mut is_replacement = BTreeSet::new();
877        is_replacement.insert(mv1.clone());
878
879        let base_facts = BaseFacts {
880            object_in_schema: vec![(mv1.clone(), "db".to_string(), "analytics".to_string())],
881            depends_on: vec![],
882            stmt_uses_cluster: vec![(mv1.clone(), "analytics_cluster".into())],
883            index_uses_cluster: vec![],
884            is_sink: BTreeSet::new(),
885            is_replacement,
886        };
887
888        let mut changed_stmts = BTreeSet::new();
889        changed_stmts.insert(mv1.clone());
890
891        let (_dirty_stmts, dirty_clusters, _dirty_schemas) =
892            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
893
894        assert!(
895            dirty_clusters.iter().any(|c| c.name == "analytics_cluster"),
896            "analytics_cluster should be dirty - replacement MVs DO dirty clusters"
897        );
898    }
899
900    #[mz_ore::test]
901    fn test_replacement_mv_pulled_in_by_dirty_schema() {
902        // When a non-replacement object makes a schema dirty, replacement MVs in
903        // that schema are pulled in too: the schema redeploys atomically (Rule 6).
904
905        let regular = ObjectId::new(
906            "db".to_string(),
907            "analytics".to_string(),
908            "regular".to_string(),
909        );
910        let other = ObjectId::new(
911            "db".to_string(),
912            "analytics".to_string(),
913            "other".to_string(),
914        );
915        let replacement_mv = ObjectId::new(
916            "db".to_string(),
917            "analytics".to_string(),
918            "my_mv".to_string(),
919        );
920
921        let mut is_replacement = BTreeSet::new();
922        is_replacement.insert(replacement_mv.clone());
923
924        let base_facts = BaseFacts {
925            object_in_schema: vec![
926                (regular.clone(), "db".to_string(), "analytics".to_string()),
927                (other.clone(), "db".to_string(), "analytics".to_string()),
928                (
929                    replacement_mv.clone(),
930                    "db".to_string(),
931                    "analytics".to_string(),
932                ),
933            ],
934            depends_on: vec![],
935            stmt_uses_cluster: vec![],
936            index_uses_cluster: vec![],
937            is_sink: BTreeSet::new(),
938            is_replacement,
939        };
940
941        // regular object changed -> schema dirty -> other pulled in
942        let mut changed_stmts = BTreeSet::new();
943        changed_stmts.insert(regular.clone());
944
945        let (dirty_stmts, _dirty_clusters, dirty_schemas) =
946            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
947
948        // Schema should be dirty (regular object changed)
949        assert!(
950            dirty_schemas.contains(&SchemaQualifier::new(
951                "db".to_string(),
952                "analytics".to_string()
953            )),
954            "schema should be dirty from regular object change"
955        );
956
957        // other (non-replacement) should be pulled in via schema propagation
958        assert!(
959            dirty_stmts.contains(&other),
960            "other should be dirty via schema propagation"
961        );
962
963        // replacement_mv should be pulled in too - the schema redeploys atomically
964        assert!(
965            dirty_stmts.contains(&replacement_mv),
966            "replacement MV should be pulled in by dirty schema"
967        );
968    }
969
970    #[mz_ore::test]
971    fn test_replacement_mv_dirty_via_dependency() {
972        // Replacement MVs should still become dirty if they depend on
973        // another dirty object (Rule 4: DependsOn propagation).
974
975        let upstream = ObjectId::new(
976            "db".to_string(),
977            "public".to_string(),
978            "source_view".to_string(),
979        );
980        let replacement_mv = ObjectId::new(
981            "db".to_string(),
982            "analytics".to_string(),
983            "my_mv".to_string(),
984        );
985
986        let mut is_replacement = BTreeSet::new();
987        is_replacement.insert(replacement_mv.clone());
988
989        let base_facts = BaseFacts {
990            object_in_schema: vec![
991                (upstream.clone(), "db".to_string(), "public".to_string()),
992                (
993                    replacement_mv.clone(),
994                    "db".to_string(),
995                    "analytics".to_string(),
996                ),
997            ],
998            depends_on: vec![(replacement_mv.clone(), upstream.clone())],
999            stmt_uses_cluster: vec![],
1000            index_uses_cluster: vec![],
1001            is_sink: BTreeSet::new(),
1002            is_replacement,
1003        };
1004
1005        let mut changed_stmts = BTreeSet::new();
1006        changed_stmts.insert(upstream.clone());
1007
1008        let (dirty_stmts, _dirty_clusters, dirty_schemas) =
1009            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1010
1011        // replacement_mv should be dirty via dependency (the skip is on the
1012        // parent, not the child)
1013        assert!(
1014            dirty_stmts.contains(&replacement_mv),
1015            "replacement MV should be dirty - depends on changed upstream"
1016        );
1017
1018        // The now-dirty replacement MV dirties its own schema like any other
1019        // compute object.
1020        assert!(
1021            dirty_schemas.contains(&SchemaQualifier::new(
1022                "db".to_string(),
1023                "analytics".to_string()
1024            )),
1025            "analytics schema should be dirty - dirty replacement MV dirties its schema"
1026        );
1027    }
1028
1029    #[mz_ore::test]
1030    fn test_replacement_mv_dirty_via_cluster() {
1031        // When a cluster becomes dirty, replacement MVs on that cluster
1032        // should become dirty (Rule 3: StmtUsesCluster propagation).
1033
1034        let regular = ObjectId::new(
1035            "db".to_string(),
1036            "public".to_string(),
1037            "regular_mv".to_string(),
1038        );
1039        let replacement_mv = ObjectId::new(
1040            "db".to_string(),
1041            "analytics".to_string(),
1042            "my_mv".to_string(),
1043        );
1044
1045        let mut is_replacement = BTreeSet::new();
1046        is_replacement.insert(replacement_mv.clone());
1047
1048        let base_facts = BaseFacts {
1049            object_in_schema: vec![
1050                (regular.clone(), "db".to_string(), "public".to_string()),
1051                (
1052                    replacement_mv.clone(),
1053                    "db".to_string(),
1054                    "analytics".to_string(),
1055                ),
1056            ],
1057            depends_on: vec![],
1058            stmt_uses_cluster: vec![
1059                (regular.clone(), "shared_cluster".into()),
1060                (replacement_mv.clone(), "shared_cluster".into()),
1061            ],
1062            index_uses_cluster: vec![],
1063            is_sink: BTreeSet::new(),
1064            is_replacement,
1065        };
1066
1067        // regular object changes -> shared_cluster dirty -> replacement_mv dirty
1068        let mut changed_stmts = BTreeSet::new();
1069        changed_stmts.insert(regular.clone());
1070
1071        let (dirty_stmts, dirty_clusters, dirty_schemas) =
1072            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1073
1074        assert!(
1075            dirty_clusters.iter().any(|c| c.name == "shared_cluster"),
1076            "shared_cluster should be dirty"
1077        );
1078
1079        assert!(
1080            dirty_stmts.contains(&replacement_mv),
1081            "replacement MV should be dirty - its cluster is dirty"
1082        );
1083
1084        // analytics schema is dirty - the dirty replacement MV dirties its schema
1085        assert!(
1086            dirty_schemas.contains(&SchemaQualifier::new(
1087                "db".to_string(),
1088                "analytics".to_string()
1089            )),
1090            "analytics schema should be dirty - its replacement MV is dirty"
1091        );
1092    }
1093
1094    #[mz_ore::test]
1095    fn test_dependent_of_dirty_replacement_mv_not_dirtied() {
1096        // The one special property of a replacement MV: its dirtiness does NOT
1097        // propagate downstream to dependents in other schemas (Rule 4 skips a
1098        // replacement parent).
1099
1100        let replacement_mv = ObjectId::new(
1101            "db".to_string(),
1102            "analytics".to_string(),
1103            "my_mv".to_string(),
1104        );
1105        let downstream = ObjectId::new(
1106            "db".to_string(),
1107            "consumer".to_string(),
1108            "report".to_string(),
1109        );
1110
1111        let mut is_replacement = BTreeSet::new();
1112        is_replacement.insert(replacement_mv.clone());
1113
1114        let base_facts = BaseFacts {
1115            object_in_schema: vec![
1116                (
1117                    replacement_mv.clone(),
1118                    "db".to_string(),
1119                    "analytics".to_string(),
1120                ),
1121                (downstream.clone(), "db".to_string(), "consumer".to_string()),
1122            ],
1123            depends_on: vec![(downstream.clone(), replacement_mv.clone())],
1124            stmt_uses_cluster: vec![],
1125            index_uses_cluster: vec![],
1126            is_sink: BTreeSet::new(),
1127            is_replacement,
1128        };
1129
1130        // The replacement MV itself changed
1131        let mut changed_stmts = BTreeSet::new();
1132        changed_stmts.insert(replacement_mv.clone());
1133
1134        let (dirty_stmts, _dirty_clusters, dirty_schemas) =
1135            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1136
1137        // The replacement MV is dirty (it changed) and dirties its own schema
1138        assert!(
1139            dirty_stmts.contains(&replacement_mv),
1140            "replacement MV is dirty"
1141        );
1142        assert!(
1143            dirty_schemas.contains(&SchemaQualifier::new(
1144                "db".to_string(),
1145                "analytics".to_string()
1146            )),
1147            "analytics schema should be dirty"
1148        );
1149
1150        // But the downstream dependent in another schema is NOT pulled in
1151        assert!(
1152            !dirty_stmts.contains(&downstream),
1153            "downstream dependent should NOT be dirty - replacement MV does not fan out"
1154        );
1155        assert!(
1156            !dirty_schemas.contains(&SchemaQualifier::new(
1157                "db".to_string(),
1158                "consumer".to_string()
1159            )),
1160            "consumer schema should NOT be dirty"
1161        );
1162    }
1163
1164    #[mz_ore::test]
1165    fn test_mixed_replacement_and_regular_in_shared_cluster() {
1166        // Real-world scenario: one cluster runs both a replacement schema (MVs)
1167        // and a regular schema (views + indexes).
1168
1169        let regular_view = ObjectId::new(
1170            "db".to_string(),
1171            "public".to_string(),
1172            "my_view".to_string(),
1173        );
1174        let regular_view2 = ObjectId::new(
1175            "db".to_string(),
1176            "public".to_string(),
1177            "other_view".to_string(),
1178        );
1179        let replacement_mv1 = ObjectId::new(
1180            "db".to_string(),
1181            "analytics".to_string(),
1182            "mv_alpha".to_string(),
1183        );
1184        let replacement_mv2 = ObjectId::new(
1185            "db".to_string(),
1186            "analytics".to_string(),
1187            "mv_beta".to_string(),
1188        );
1189
1190        let mut is_replacement = BTreeSet::new();
1191        is_replacement.insert(replacement_mv1.clone());
1192        is_replacement.insert(replacement_mv2.clone());
1193
1194        let base_facts = BaseFacts {
1195            object_in_schema: vec![
1196                (regular_view.clone(), "db".to_string(), "public".to_string()),
1197                (
1198                    regular_view2.clone(),
1199                    "db".to_string(),
1200                    "public".to_string(),
1201                ),
1202                (
1203                    replacement_mv1.clone(),
1204                    "db".to_string(),
1205                    "analytics".to_string(),
1206                ),
1207                (
1208                    replacement_mv2.clone(),
1209                    "db".to_string(),
1210                    "analytics".to_string(),
1211                ),
1212            ],
1213            depends_on: vec![],
1214            stmt_uses_cluster: vec![
1215                (regular_view.clone(), "shared".into()),
1216                (replacement_mv1.clone(), "shared".into()),
1217                (replacement_mv2.clone(), "shared".into()),
1218            ],
1219            index_uses_cluster: vec![],
1220            is_sink: BTreeSet::new(),
1221            is_replacement,
1222        };
1223
1224        let mut changed_stmts = BTreeSet::new();
1225        changed_stmts.insert(regular_view.clone());
1226
1227        let (dirty_stmts, dirty_clusters, dirty_schemas) =
1228            compute_dirty_datalog(&changed_stmts, &base_facts, &BTreeSet::new());
1229
1230        // Cluster dirty
1231        assert!(dirty_clusters.iter().any(|c| c.name == "shared"));
1232
1233        // public schema dirty (regular object changed)
1234        assert!(dirty_schemas.contains(&SchemaQualifier::new(
1235            "db".to_string(),
1236            "public".to_string()
1237        )));
1238
1239        // regular_view2 pulled in via schema propagation
1240        assert!(dirty_stmts.contains(&regular_view2));
1241
1242        // Both replacement MVs dirty via cluster
1243        assert!(
1244            dirty_stmts.contains(&replacement_mv1),
1245            "replacement_mv1 should be dirty via cluster"
1246        );
1247        assert!(
1248            dirty_stmts.contains(&replacement_mv2),
1249            "replacement_mv2 should be dirty via cluster"
1250        );
1251
1252        // analytics schema should be dirty - its replacement MVs went dirty via
1253        // the shared cluster, and a dirty replacement MV dirties its schema
1254        assert!(
1255            dirty_schemas.contains(&SchemaQualifier::new(
1256                "db".to_string(),
1257                "analytics".to_string()
1258            )),
1259            "analytics schema should be dirty - its replacement MVs are dirty"
1260        );
1261    }
1262
1263    /// Helper to parse a CREATE MATERIALIZED VIEW statement.
1264    fn parse_materialized_view(sql: &str) -> Statement {
1265        let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1266        if let mz_sql_parser::ast::Statement::CreateMaterializedView(s) = &parsed[0].ast {
1267            Statement::CreateMaterializedView(s.clone())
1268        } else {
1269            panic!("Expected CreateMaterializedView");
1270        }
1271    }
1272
1273    /// Build a minimal Project containing a single schema with given objects.
1274    fn build_project(
1275        db: &str,
1276        schema: &str,
1277        objects: Vec<(ObjectId, Statement)>,
1278        is_replacement: bool,
1279    ) -> Project {
1280        let db_objects: Vec<DatabaseObject> = objects
1281            .into_iter()
1282            .map(|(id, stmt)| DatabaseObject {
1283                id,
1284                typed_object: compiled::DatabaseObject {
1285                    path: std::path::PathBuf::from("test.sql"),
1286                    stmt,
1287                    indexes: vec![],
1288                    grants: vec![],
1289                    comments: vec![],
1290                    tests: vec![],
1291                },
1292                dependencies: BTreeSet::new(),
1293            })
1294            .collect();
1295
1296        let mut replacement_schemas = BTreeSet::new();
1297        if is_replacement {
1298            replacement_schemas.insert(SchemaQualifier::new(db.to_string(), schema.to_string()));
1299        }
1300
1301        Project {
1302            databases: vec![Database {
1303                name: db.to_string(),
1304                schemas: vec![Schema {
1305                    name: schema.to_string(),
1306                    objects: db_objects,
1307                    mod_statements: None,
1308                    schema_type: SchemaType::Compute,
1309                }],
1310                mod_statements: None,
1311            }],
1312            dependency_graph: BTreeMap::new(),
1313            external_dependencies: BTreeSet::new(),
1314            cluster_dependencies: BTreeSet::new(),
1315            tests: vec![],
1316            replacement_schemas,
1317            compile_dirty: BTreeSet::new(),
1318        }
1319    }
1320
1321    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1322    #[mz_ore::test]
1323    fn test_schema_transition_objects_to_replacement_routes_to_new_replacement() {
1324        // When a schema transitions from Objects kind to Replacement kind,
1325        // existing objects should go through blue/green swap (new_replacement_objects),
1326        // NOT CREATE REPLACEMENT (changed_replacement_objects).
1327
1328        let obj_id = ObjectId::new(
1329            "db".to_string(),
1330            "analytics".to_string(),
1331            "my_mv".to_string(),
1332        );
1333
1334        // Old snapshot: object existed in a schema with Objects kind
1335        let old_snapshot = DeploymentSnapshot {
1336            objects: BTreeMap::from([(obj_id.clone(), "hash_old".to_string())]),
1337            schemas: BTreeMap::from([(
1338                SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1339                DeploymentKind::Objects,
1340            )]),
1341        };
1342
1343        // New snapshot: object changed (different hash)
1344        let new_snapshot = DeploymentSnapshot {
1345            objects: BTreeMap::from([(obj_id.clone(), "hash_new".to_string())]),
1346            schemas: BTreeMap::from([(
1347                SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1348                DeploymentKind::Replacement,
1349            )]),
1350        };
1351
1352        // Project now treats the schema as replacement
1353        let project = build_project(
1354            "db",
1355            "analytics",
1356            vec![(
1357                obj_id.clone(),
1358                parse_materialized_view("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c1 AS SELECT 1"),
1359            )],
1360            true, // is_replacement
1361        );
1362
1363        let changeset = ChangeSet::from_deployment_snapshot_comparison(
1364            &old_snapshot,
1365            &new_snapshot,
1366            &project,
1367            &BTreeSet::new(),
1368        );
1369
1370        // Object should be in new_replacement_objects (blue/green), NOT changed_replacement_objects
1371        assert!(
1372            changeset.new_replacement_objects.contains(&obj_id),
1373            "Object transitioning from Objects->Replacement schema should use blue/green swap"
1374        );
1375        assert!(
1376            !changeset.changed_replacement_objects.contains(&obj_id),
1377            "Object transitioning from Objects->Replacement schema should NOT use CREATE REPLACEMENT"
1378        );
1379    }
1380
1381    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1382    #[mz_ore::test]
1383    fn test_forced_replacement_schema_routes_to_changed_replacement() {
1384        // Forcing an *unchanged* replacement schema (stage --redeploy-schema)
1385        // redeploys its MV through the CREATE REPLACEMENT path
1386        // (changed_replacement_objects), even though the hash is identical.
1387        let obj_id = ObjectId::new(
1388            "db".to_string(),
1389            "analytics".to_string(),
1390            "my_mv".to_string(),
1391        );
1392
1393        // Identical old/new snapshot: nothing changed; schema is Replacement.
1394        let snapshot = DeploymentSnapshot {
1395            objects: BTreeMap::from([(obj_id.clone(), "hash".to_string())]),
1396            schemas: BTreeMap::from([(
1397                SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1398                DeploymentKind::Replacement,
1399            )]),
1400        };
1401
1402        let project = build_project(
1403            "db",
1404            "analytics",
1405            vec![(
1406                obj_id.clone(),
1407                parse_materialized_view("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c1 AS SELECT 1"),
1408            )],
1409            true,
1410        );
1411
1412        // Without forcing, an unchanged schema is a no-op.
1413        let unforced = ChangeSet::from_deployment_snapshot_comparison(
1414            &snapshot,
1415            &snapshot,
1416            &project,
1417            &BTreeSet::new(),
1418        );
1419        assert!(
1420            unforced.is_empty(),
1421            "unchanged replacement schema should be a no-op"
1422        );
1423
1424        // Forcing the schema redeploys the MV via CREATE REPLACEMENT.
1425        let forced = BTreeSet::from([SchemaQualifier::new(
1426            "db".to_string(),
1427            "analytics".to_string(),
1428        )]);
1429        let changeset =
1430            ChangeSet::from_deployment_snapshot_comparison(&snapshot, &snapshot, &project, &forced);
1431        assert!(changeset.objects_to_deploy.contains(&obj_id));
1432        assert!(
1433            changeset.changed_replacement_objects.contains(&obj_id),
1434            "forced unchanged replacement MV should redeploy via CREATE REPLACEMENT"
1435        );
1436        assert!(!changeset.new_replacement_objects.contains(&obj_id));
1437    }
1438
1439    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1440    #[mz_ore::test]
1441    fn test_steady_state_replacement_routes_to_changed_replacement() {
1442        // When a schema was already Replacement kind and stays Replacement,
1443        // existing objects should use CREATE REPLACEMENT (changed_replacement_objects).
1444
1445        let obj_id = ObjectId::new(
1446            "db".to_string(),
1447            "analytics".to_string(),
1448            "my_mv".to_string(),
1449        );
1450
1451        // Old snapshot: object existed in a schema already with Replacement kind
1452        let old_snapshot = DeploymentSnapshot {
1453            objects: BTreeMap::from([(obj_id.clone(), "hash_old".to_string())]),
1454            schemas: BTreeMap::from([(
1455                SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1456                DeploymentKind::Replacement,
1457            )]),
1458        };
1459
1460        // New snapshot: object changed
1461        let new_snapshot = DeploymentSnapshot {
1462            objects: BTreeMap::from([(obj_id.clone(), "hash_new".to_string())]),
1463            schemas: BTreeMap::from([(
1464                SchemaQualifier::new("db".to_string(), "analytics".to_string()),
1465                DeploymentKind::Replacement,
1466            )]),
1467        };
1468
1469        let project = build_project(
1470            "db",
1471            "analytics",
1472            vec![(
1473                obj_id.clone(),
1474                parse_materialized_view("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c1 AS SELECT 1"),
1475            )],
1476            true,
1477        );
1478
1479        let changeset = ChangeSet::from_deployment_snapshot_comparison(
1480            &old_snapshot,
1481            &new_snapshot,
1482            &project,
1483            &BTreeSet::new(),
1484        );
1485
1486        // Object should be in changed_replacement_objects (CREATE REPLACEMENT)
1487        assert!(
1488            changeset.changed_replacement_objects.contains(&obj_id),
1489            "Object in steady-state Replacement schema should use CREATE REPLACEMENT"
1490        );
1491        assert!(
1492            !changeset.new_replacement_objects.contains(&obj_id),
1493            "Object in steady-state Replacement schema should NOT use blue/green swap"
1494        );
1495    }
1496}