Skip to main content

mz_deploy/cli/commands/
stage.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Stage command - deploy to staging environment with renamed schemas and clusters.
11//!
12//! Runs the standard blue/green deployment pipeline that can later be promoted
13//! to production via [`super::promote`].
14
15use super::ObjectRef;
16use crate::cli::CliError;
17use crate::cli::executor::{self, DeploymentExecutor};
18use crate::cli::{git, progress};
19use crate::client::DeploymentMode;
20use crate::client::{
21    Client, ClusterConfig, ClusterOptions, DeploymentKind, PendingStatement, ReplacementMvRecord,
22};
23use crate::config::Settings;
24use crate::log;
25use crate::project::SchemaQualifier;
26use crate::project::analysis::changeset::ChangeSet;
27use crate::project::analysis::deployment_snapshot::{self, DeploymentSnapshot};
28use crate::project::analysis::deps::extract_external_indexes;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::{DatabaseObject, FullyQualifiedName};
31use crate::project::ir::graph::Project;
32use crate::project::ir::object_id::ObjectId;
33use crate::project::resolve::normalize::{self, NormalizingVisitor};
34use crate::verbose;
35use mz_ore::option::OptionExt;
36use std::collections::BTreeSet;
37use std::fmt;
38use std::path::Path;
39use std::time::Instant;
40
41/// Planning output produced once and consumed by all stage execution phases.
42///
43/// Keeps stage deterministic by passing one analyzed view of objects/resources through
44/// validation, metadata recording, and resource creation.
45struct StageAnalysis<'a> {
46    objects: Vec<ObjectRef<'a>>,
47    sinks: Vec<ObjectRef<'a>>,
48    replacement_mvs: Vec<ObjectRef<'a>>,
49    schema_set: BTreeSet<SchemaQualifier>,
50    cluster_set: BTreeSet<String>,
51}
52
53/// Classification result for objects considered during staging.
54///
55/// Separates deploy-now objects from deferred/special-case categories that apply handles later.
56struct PartitionedObjects<'a> {
57    objects: Vec<ObjectRef<'a>>,
58    sinks: Vec<ObjectRef<'a>>,
59    replacement_mvs: Vec<ObjectRef<'a>>,
60    table_count: usize,
61}
62
63/// Summary returned after a successful stage run, used for terminal output
64/// and `--json`.
65#[derive(serde::Serialize)]
66struct StageResult {
67    deploy_id: String,
68    objects_deployed: usize,
69    #[serde(skip)]
70    duration: std::time::Duration,
71}
72
73impl fmt::Display for StageResult {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        write!(
76            f,
77            "  \u{2713} Successfully deployed {} objects to '{}' staging environment ({:.1}s)",
78            self.objects_deployed,
79            self.deploy_id,
80            self.duration.as_secs_f64()
81        )
82    }
83}
84
85#[derive(serde::Serialize)]
86struct StagePlan {
87    deploy_id: String,
88    schemas: Vec<StagePlanSchema>,
89    clusters: Vec<StagePlanCluster>,
90    objects: Vec<StagePlanObject>,
91    sinks: Vec<StagePlanObject>,
92    replacement_mvs: Vec<StagePlanObject>,
93}
94
95#[derive(serde::Serialize)]
96struct StagePlanSchema {
97    database: String,
98    schema: String,
99    staging_schema: String,
100}
101
102#[derive(serde::Serialize)]
103struct StagePlanCluster {
104    production_cluster: String,
105    staging_cluster: String,
106}
107
108#[derive(serde::Serialize)]
109struct StagePlanObject {
110    database: String,
111    schema: String,
112    object: String,
113}
114
115impl fmt::Display for StagePlan {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        writeln!(f, "Stage plan for '{}':", self.deploy_id)?;
118
119        if !self.schemas.is_empty() {
120            writeln!(f, "\nSchemas ({}):", self.schemas.len())?;
121            for s in &self.schemas {
122                writeln!(
123                    f,
124                    "    {}.{} \u{2192} {}",
125                    s.database, s.schema, s.staging_schema
126                )?;
127            }
128        }
129
130        if !self.clusters.is_empty() {
131            writeln!(f, "\nClusters ({}):", self.clusters.len())?;
132            for c in &self.clusters {
133                writeln!(
134                    f,
135                    "    {} \u{2192} {}",
136                    c.production_cluster, c.staging_cluster
137                )?;
138            }
139        }
140
141        if !self.objects.is_empty() {
142            writeln!(f, "\nObjects ({}):", self.objects.len())?;
143            for o in &self.objects {
144                writeln!(f, "    {}.{}.{}", o.database, o.schema, o.object)?;
145            }
146        }
147
148        if !self.sinks.is_empty() {
149            writeln!(f, "\nSinks ({}):", self.sinks.len())?;
150            for s in &self.sinks {
151                writeln!(f, "    {}.{}.{}", s.database, s.schema, s.object)?;
152            }
153        }
154
155        if !self.replacement_mvs.is_empty() {
156            writeln!(f, "\nReplacement MVs ({}):", self.replacement_mvs.len())?;
157            for m in &self.replacement_mvs {
158                writeln!(f, "    {}.{}.{}", m.database, m.schema, m.object)?;
159            }
160        }
161
162        Ok(())
163    }
164}
165
166/// Deploy the project to a staging environment.
167///
168/// Creates renamed schemas and clusters alongside production, deploys the
169/// project onto them, and records metadata so a later `apply` can atomically
170/// swap staging into production.
171///
172/// # Arguments
173/// * `settings` - Resolved CLI settings (project directory, profile, etc.)
174/// * `stage_name` - Optional environment name; defaults to a git-derived or
175///   random identifier
176/// * `allow_dirty` - Allow deploying with uncommitted changes
177/// * `no_rollback` - Skip automatic rollback on failure (for debugging)
178/// * `dry_run` - Print SQL instead of executing it
179///
180/// # Returns
181/// `Ok(())` if the deployment succeeds.
182///
183/// # Errors
184/// Surfaces `CliError` variants from git checks, project compilation, and
185/// database execution.
186pub async fn run(
187    settings: &Settings,
188    stage_name: Option<&str>,
189    allow_dirty: bool,
190    no_rollback: bool,
191    dry_run: bool,
192    redeploy_schemas: &[String],
193    redeploy_all: bool,
194) -> Result<(), CliError> {
195    let profile = settings.connection();
196    let directory = &settings.directory;
197    let start_time = Instant::now();
198
199    if !allow_dirty && git::is_dirty(directory) {
200        return Err(CliError::GitDirty);
201    }
202
203    let stage_name = stage_name
204        .owned()
205        .or_else(|| git::get_git_commit(directory).map(|sha| sha.chars().take(7).collect()))
206        .unwrap_or_else(executor::generate_random_env_name);
207
208    let planned_project = super::compile::run(settings, true).await?;
209    let staging_suffix = format!("_{}", stage_name);
210
211    let client = Client::connect_with_profile(profile.clone())
212        .await
213        .map_err(CliError::Connection)?;
214
215    crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
216    let role =
217        crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
218    crate::cli::commands::setup::require_deployer(role)?;
219
220    let forced_dirty_schemas = resolve_redeploy_schemas(&planned_project, redeploy_schemas)?;
221
222    let Some(analysis) = analyze_project_changes(
223        &client,
224        &planned_project,
225        &stage_name,
226        forced_dirty_schemas,
227        redeploy_all,
228    )
229    .await?
230    else {
231        return Ok(());
232    };
233
234    validate_project_for_stage(
235        &client,
236        &planned_project,
237        directory,
238        &analysis.schema_set,
239        &analysis.cluster_set,
240    )
241    .await?;
242
243    if !dry_run {
244        record_stage_metadata(
245            &client,
246            directory,
247            &stage_name,
248            &staging_suffix,
249            &analysis.objects,
250            &analysis.sinks,
251            &analysis.replacement_mvs,
252            &planned_project.replacement_schemas,
253        )
254        .await?;
255    }
256
257    if dry_run {
258        let plan = StagePlan {
259            deploy_id: stage_name.to_string(),
260            schemas: analysis
261                .schema_set
262                .iter()
263                .map(|sq| StagePlanSchema {
264                    database: sq.database.clone(),
265                    schema: sq.schema.clone(),
266                    staging_schema: format!("{}{}", sq.schema, staging_suffix),
267                })
268                .collect(),
269            clusters: analysis
270                .cluster_set
271                .iter()
272                .map(|c| StagePlanCluster {
273                    production_cluster: c.clone(),
274                    staging_cluster: format!("{}{}", c, staging_suffix),
275                })
276                .collect(),
277            objects: analysis
278                .objects
279                .iter()
280                .map(|(id, _)| StagePlanObject {
281                    database: id.expect_database().to_string(),
282                    schema: id.schema().to_string(),
283                    object: id.object().to_string(),
284                })
285                .collect(),
286            sinks: analysis
287                .sinks
288                .iter()
289                .map(|(id, _)| StagePlanObject {
290                    database: id.expect_database().to_string(),
291                    schema: id.schema().to_string(),
292                    object: id.object().to_string(),
293                })
294                .collect(),
295            replacement_mvs: analysis
296                .replacement_mvs
297                .iter()
298                .map(|(id, _)| StagePlanObject {
299                    database: id.expect_database().to_string(),
300                    schema: id.schema().to_string(),
301                    object: id.object().to_string(),
302                })
303                .collect(),
304        };
305        log::output(&plan);
306        return Ok(());
307    }
308
309    let success_count = create_resources_with_rollback(
310        &client,
311        &stage_name,
312        &staging_suffix,
313        &analysis.schema_set,
314        &analysis.cluster_set,
315        &planned_project,
316        &analysis.objects,
317        &analysis.replacement_mvs,
318        no_rollback,
319        dry_run,
320    )
321    .await?;
322
323    let result = StageResult {
324        deploy_id: stage_name.to_string(),
325        objects_deployed: success_count,
326        duration: start_time.elapsed(),
327    };
328    log::output(&result);
329    log::print_deploy_id(&stage_name);
330    Ok(())
331}
332
333/// Parse one `--redeploy-schema` value, which must be fully qualified as
334/// `database.schema`. Reuses the SQL parser so reserved-word components quote
335/// correctly.
336fn parse_qualified_schema(raw: &str) -> Result<SchemaQualifier, CliError> {
337    let unqualified = || {
338        CliError::Message(format!(
339            "invalid --redeploy-schema '{}': expected a qualified 'database.schema' name",
340            raw
341        ))
342    };
343    let name = mz_sql_parser::parser::parse_item_name(raw).map_err(|_| unqualified())?;
344    let [database, schema] = name.0.as_slice() else {
345        return Err(unqualified());
346    };
347    Ok(SchemaQualifier::new(
348        database.as_str().to_string(),
349        schema.as_str().to_string(),
350    ))
351}
352
353/// Resolve `--redeploy-schema` values into `SchemaQualifier`s, validating each
354/// against the project's schemas.
355fn resolve_redeploy_schemas(
356    planned_project: &Project,
357    redeploy_schemas: &[String],
358) -> Result<BTreeSet<SchemaQualifier>, CliError> {
359    if redeploy_schemas.is_empty() {
360        return Ok(BTreeSet::new());
361    }
362
363    let objects: Vec<_> = planned_project.iter_objects().collect();
364    let project_schemas = SchemaQualifier::collect_from(&objects);
365
366    let mut resolved = BTreeSet::new();
367    for raw in redeploy_schemas {
368        let sq = parse_qualified_schema(raw)?;
369        if !project_schemas.contains(&sq) {
370            let available = project_schemas
371                .iter()
372                .map(|s| format!("{}.{}", s.database, s.schema))
373                .collect::<Vec<_>>()
374                .join(", ");
375            return Err(CliError::Message(format!(
376                "--redeploy-schema '{}.{}' is not a schema in this project; available: {}",
377                sq.database, sq.schema, available
378            )));
379        }
380        resolved.insert(sq);
381    }
382    Ok(resolved)
383}
384
385/// Produces the stage deployment plan by diffing against current production snapshot.
386///
387/// Handles incremental-vs-full mode, applies stage-specific object filtering,
388/// validates table dependencies, and returns resource sets required for execution.
389async fn analyze_project_changes<'a>(
390    client: &Client,
391    planned_project: &'a Project,
392    stage_name: &str,
393    forced_dirty_schemas: BTreeSet<SchemaQualifier>,
394    redeploy_all: bool,
395) -> Result<Option<StageAnalysis<'a>>, CliError> {
396    progress::stage_start("Analyzing project changes");
397    let analyze_start = Instant::now();
398
399    if client
400        .deployments()
401        .get_deployment_metadata(stage_name)
402        .await?
403        .is_some()
404    {
405        return Err(CliError::InvalidEnvironmentName {
406            name: format!("deployment '{}' already exists", stage_name),
407        });
408    }
409
410    let new_snapshot = deployment_snapshot::build_snapshot_from_planned(planned_project)?;
411    let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
412
413    let dirty_schemas = if redeploy_all {
414        new_snapshot.schemas.keys().cloned().collect()
415    } else {
416        forced_dirty_schemas
417    };
418
419    let change_set = if production_snapshot.objects.is_empty() {
420        None
421    } else {
422        Some(ChangeSet::from_deployment_snapshot_comparison(
423            &production_snapshot,
424            &new_snapshot,
425            planned_project,
426            &dirty_schemas,
427        ))
428    };
429
430    // Reject adding brand-new objects to a schema that already has production objects.
431    // During incremental deployment:
432    //
433    //   1. The changeset correctly classifies these as `new_replacement_objects`
434    //   2. But `new_replacement_objects` is never consumed by `partition_objects` —
435    //      only `changed_replacement_objects` feeds into it
436    //   3. The new MV ends up in the regular `objects` partition and deploys to the
437    //      staging schema (e.g. `core_v3`)
438    //   4. Metadata for the production schema (`core`) gets overwritten to
439    //      `DeploymentKind::Replacement` by the changed MVs
440    //   5. During promote, the staging schema is skipped from swap and dropped CASCADE
441    //      — the new MV is lost
442    //
443    // The proper long-term fix is to support `ALTER MATERIALIZED VIEW ... SET SCHEMA`.
444    // With that, new MVs could be deployed to the staging schema alongside changed MVs,
445    // then moved into the production schema during promote via `SET SCHEMA` instead of
446    // relying on schema swap. This would eliminate the need for mixed deployment kinds
447    // or special-casing in `partition_objects` — new objects simply deploy to staging
448    // and get relocated on promote, just like changed objects get swapped.
449    //
450    // A brand-new stable schema (no prior production objects) deploys fine via normal
451    // blue-green swap — only schemas with existing production objects are affected.
452    if let Some(ref cs) = change_set {
453        validate_no_new_objects_in_existing_stable_schemas(cs, &production_snapshot)?;
454    }
455
456    let objects = select_stage_objects(planned_project, change_set.as_ref())?;
457    if objects.is_empty() && change_set.as_ref().is_some_and(ChangeSet::is_empty) {
458        progress::success("No changes detected compared to production, skipping deployment");
459        return Ok(None);
460    }
461
462    let replacement_object_ids = change_set
463        .as_ref()
464        .map(|cs| cs.changed_replacement_objects.clone())
465        .unwrap_or_default();
466    let partitioned = partition_objects(objects, &replacement_object_ids);
467    log_partition_summary(&partitioned);
468
469    let object_ids: BTreeSet<_> = partitioned
470        .objects
471        .iter()
472        .map(|(id, _)| id.clone())
473        .collect();
474    client
475        .validation()
476        .validate_table_dependencies(planned_project, &object_ids)
477        .await?;
478
479    let (schema_set, cluster_set) =
480        collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
481
482    let analyze_duration = analyze_start.elapsed();
483    progress::stage_success(
484        &format!(
485            "Ready to deploy {} view(s)/materialized view(s)",
486            partitioned.objects.len()
487        ),
488        analyze_duration,
489    );
490
491    Ok(Some(StageAnalysis {
492        objects: partitioned.objects,
493        sinks: partitioned.sinks,
494        replacement_mvs: partitioned.replacement_mvs,
495        schema_set,
496        cluster_set,
497    }))
498}
499
500/// Chooses the initial object set for stage before stage-specific partitioning.
501///
502/// Incremental mode uses the change set; full mode uses all sorted project objects.
503fn select_stage_objects<'a>(
504    planned_project: &'a Project,
505    change_set: Option<&ChangeSet>,
506) -> Result<Vec<ObjectRef<'a>>, CliError> {
507    if let Some(cs) = change_set {
508        if cs.is_empty() {
509            return Ok(Vec::new());
510        }
511        verbose!("{}", cs);
512        Ok(planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?)
513    } else {
514        verbose!("Full deployment: no production deployment found");
515        Ok(planned_project.get_sorted_objects()?)
516    }
517}
518
519/// Splits objects into stage execution categories.
520///
521/// Tables/sources are excluded, sinks are deferred to apply, and changed replacement MVs
522/// are tracked for special replacement handling.
523fn partition_objects<'a>(
524    objects: Vec<ObjectRef<'a>>,
525    replacement_object_ids: &BTreeSet<ObjectId>,
526) -> PartitionedObjects<'a> {
527    let mut kept = Vec::new();
528    let mut sinks = Vec::new();
529    let mut replacement_mvs = Vec::new();
530    let mut table_count = 0;
531
532    for (object_id, typed_obj) in objects {
533        match &typed_obj.stmt {
534            Statement::CreateTable(_)
535            | Statement::CreateTableFromSource(_)
536            | Statement::CreateSource(_)
537            | Statement::CreateSecret(_)
538            | Statement::CreateConnection(_) => {
539                table_count += 1;
540            }
541            Statement::CreateSink(_) => sinks.push((object_id, typed_obj)),
542            Statement::CreateMaterializedView(_) if replacement_object_ids.contains(&object_id) => {
543                replacement_mvs.push((object_id, typed_obj));
544            }
545            _ => kept.push((object_id, typed_obj)),
546        }
547    }
548
549    PartitionedObjects {
550        objects: kept,
551        sinks,
552        replacement_mvs,
553        table_count,
554    }
555}
556
557/// Reports the partitioning decisions visible to users in verbose mode.
558fn log_partition_summary(partitioned: &PartitionedObjects<'_>) {
559    if partitioned.table_count > 0 {
560        verbose!(
561            "Skipped {} table(s)/source(s) - use 'mz-deploy apply' for those",
562            partitioned.table_count
563        );
564    }
565    if !partitioned.sinks.is_empty() {
566        verbose!(
567            "Found {} sink(s) - will be created during apply after swap",
568            partitioned.sinks.len()
569        );
570    }
571    if !partitioned.replacement_mvs.is_empty() {
572        verbose!(
573            "Found {} replacement MV(s) - will use CREATE REPLACEMENT protocol",
574            partitioned.replacement_mvs.len()
575        );
576    }
577}
578
579/// Derives schema/cluster prerequisites for resource creation.
580///
581/// Builds schema and cluster sets solely from the objects being staged.
582/// Apply-managed objects (sources, tables, secrets, connections) are excluded
583/// by `partition_objects`, so their schemas and clusters are never staged.
584fn collect_stage_resources(
585    objects: &[ObjectRef<'_>],
586    replacement_mvs: &[ObjectRef<'_>],
587) -> (BTreeSet<SchemaQualifier>, BTreeSet<String>) {
588    let mut schema_set = BTreeSet::new();
589    let mut cluster_set = BTreeSet::new();
590
591    for (object_id, typed_obj) in objects.iter().chain(replacement_mvs.iter()) {
592        schema_set.insert(SchemaQualifier::new(
593            object_id.expect_database().to_string(),
594            object_id.schema().to_string(),
595        ));
596        cluster_set.extend(typed_obj.clusters());
597    }
598
599    (schema_set, cluster_set)
600}
601
602/// Runs all preflight database validations required before mutating deployment state.
603///
604/// This is intentionally isolated so stage fails before any metadata/resource writes.
605async fn validate_project_for_stage(
606    client: &Client,
607    planned_project: &Project,
608    directory: &Path,
609    schema_set: &BTreeSet<SchemaQualifier>,
610    cluster_set: &BTreeSet<String>,
611) -> Result<(), CliError> {
612    progress::stage_start("Validating project");
613    let validate_start = Instant::now();
614    client
615        .validation()
616        .validate_project(planned_project, directory)
617        .await?;
618    client
619        .validation()
620        .validate_cluster_isolation(planned_project)
621        .await?;
622    client
623        .validation()
624        .validate_privileges(planned_project)
625        .await?;
626    client
627        .validation()
628        .validate_schema_ownership(schema_set)
629        .await?;
630    client
631        .validation()
632        .validate_cluster_ownership(cluster_set)
633        .await?;
634    client
635        .validation()
636        .validate_sink_connections_exist(planned_project)
637        .await?;
638    let validate_duration = validate_start.elapsed();
639    progress::stage_success("All validations passed", validate_duration);
640    Ok(())
641}
642
643/// Persists stage deployment state and deferred apply actions.
644///
645/// Records object hashes plus schema deployment kinds, then stores sink/replacement
646/// records that the `apply` command consumes after swap.
647async fn record_stage_metadata(
648    client: &Client,
649    directory: &Path,
650    stage_name: &str,
651    staging_suffix: &str,
652    objects: &[ObjectRef<'_>],
653    sinks: &[ObjectRef<'_>],
654    replacement_mvs: &[ObjectRef<'_>],
655    replacement_schemas: &BTreeSet<SchemaQualifier>,
656) -> Result<(), CliError> {
657    progress::stage_start("Recording deployment metadata");
658    let metadata_start = Instant::now();
659    let metadata = executor::collect_deployment_metadata(client, directory).await;
660
661    let mut staging_snapshot = DeploymentSnapshot::default();
662
663    for (object_id, typed_obj) in objects {
664        let hash = deployment_snapshot::compute_typed_hash(typed_obj);
665        staging_snapshot.objects.insert(object_id.clone(), hash);
666        staging_snapshot.schemas.insert(
667            SchemaQualifier::new(
668                object_id.expect_database().to_string(),
669                object_id.schema().to_string(),
670            ),
671            DeploymentKind::Objects,
672        );
673    }
674
675    for (object_id, typed_obj) in sinks {
676        let hash = deployment_snapshot::compute_typed_hash(typed_obj);
677        staging_snapshot.objects.insert(object_id.clone(), hash);
678        staging_snapshot
679            .schemas
680            .entry(SchemaQualifier::new(
681                object_id.expect_database().to_string(),
682                object_id.schema().to_string(),
683            ))
684            .or_insert(DeploymentKind::Sinks);
685    }
686
687    for (object_id, typed_obj) in replacement_mvs {
688        let hash = deployment_snapshot::compute_typed_hash(typed_obj);
689        staging_snapshot.objects.insert(object_id.clone(), hash);
690        staging_snapshot.schemas.insert(
691            SchemaQualifier::new(
692                object_id.expect_database().to_string(),
693                object_id.schema().to_string(),
694            ),
695            DeploymentKind::Replacement,
696        );
697    }
698
699    // Ensure replacement schemas record the correct kind.
700    // During Objects→Replacement transitions, MVs go through the regular objects
701    // path (for blue-green swap), but the metadata must reflect the final kind
702    // so future deploys know to use CREATE REPLACEMENT.
703    for sq in replacement_schemas {
704        if staging_snapshot.schemas.contains_key(sq) {
705            staging_snapshot
706                .schemas
707                .insert(sq.clone(), DeploymentKind::Replacement);
708        }
709    }
710
711    deployment_snapshot::write_to_database(
712        client,
713        &staging_snapshot,
714        stage_name,
715        &metadata,
716        None,
717        DeploymentMode::Stage,
718    )
719    .await?;
720
721    if !sinks.is_empty() {
722        let pending_statements: Vec<PendingStatement> = sinks
723            .iter()
724            .enumerate()
725            .map(|(idx, (object_id, typed_obj))| {
726                let original_fqn: FullyQualifiedName = object_id.clone().into();
727                let mut visitor = NormalizingVisitor::fully_qualifying(&original_fqn);
728                let stmt = typed_obj
729                    .stmt
730                    .clone()
731                    .normalize_name_with(&visitor, &original_fqn.to_item_name())
732                    .normalize_dependencies_with(&mut visitor);
733                let hash = deployment_snapshot::compute_typed_hash(typed_obj);
734                #[allow(clippy::as_conversions)]
735                PendingStatement {
736                    deploy_id: stage_name.to_string(),
737                    sequence_num: idx as i32,
738                    database: object_id.expect_database().to_string(),
739                    schema: object_id.schema().to_string(),
740                    object: object_id.object().to_string(),
741                    object_hash: hash,
742                    statement_sql: stmt.to_string(),
743                    statement_kind: "sink".to_string(),
744                    executed_at: None,
745                }
746            })
747            .collect();
748
749        client
750            .deployments()
751            .insert_pending_statements(&pending_statements)
752            .await?;
753        verbose!(
754            "Stored {} pending sink statement(s)",
755            pending_statements.len()
756        );
757    }
758
759    if !replacement_mvs.is_empty() {
760        let records: Vec<ReplacementMvRecord> = replacement_mvs
761            .iter()
762            .map(|(object_id, _)| ReplacementMvRecord {
763                deploy_id: stage_name.to_string(),
764                target_database: object_id.expect_database().to_string(),
765                target_schema: object_id.schema().to_string(),
766                target_name: object_id.object().to_string(),
767                replacement_schema: format!("{}{}", object_id.schema(), staging_suffix),
768            })
769            .collect();
770        client
771            .deployments()
772            .insert_replacement_mvs(&records)
773            .await?;
774        verbose!("Stored {} replacement MV record(s)", records.len());
775    }
776
777    let metadata_duration = metadata_start.elapsed();
778    progress::stage_success("Deployment metadata recorded", metadata_duration);
779    Ok(())
780}
781
782/// Top-level orchestrator for the staging deployment pipeline.
783///
784/// Provisions all databases, schemas, clusters, and objects needed for a blue-green
785/// deployment. On failure, automatically rolls back every resource created during
786/// this invocation unless the `no_rollback` flag is set.
787#[allow(clippy::too_many_arguments)]
788async fn create_resources_with_rollback<'a>(
789    client: &Client,
790    stage_name: &str,
791    staging_suffix: &str,
792    schema_set: &BTreeSet<SchemaQualifier>,
793    cluster_set: &BTreeSet<String>,
794    planned_project: &'a Project,
795    objects: &'a [(ObjectId, &'a DatabaseObject)],
796    replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
797    no_rollback: bool,
798    dry_run: bool,
799) -> Result<usize, CliError> {
800    let executor = DeploymentExecutor::with_dry_run(client, dry_run);
801
802    let result = async {
803        create_databases_and_schemas(&executor, planned_project, schema_set, staging_suffix)
804            .await?;
805        create_staging_clusters(&executor, client, stage_name, cluster_set, staging_suffix).await?;
806        deploy_objects_to_staging(
807            &executor,
808            objects,
809            replacement_mvs,
810            planned_project,
811            cluster_set,
812            staging_suffix,
813        )
814        .await
815    }
816    .await;
817
818    match result {
819        Ok(count) => Ok(count),
820        Err(e) if dry_run || no_rollback => {
821            if !dry_run {
822                progress::error("Deployment failed (skipping rollback due to --no-rollback flag)");
823            }
824            Err(e)
825        }
826        Err(e) => {
827            progress::error("Deployment failed, rolling back...");
828            let (schemas, clusters) = rollback_staging_resources(client, stage_name).await;
829
830            if schemas > 0 || clusters > 0 {
831                progress::success(&format!(
832                    "Rolled back: {} schema(s), {} cluster(s)",
833                    schemas, clusters
834                ));
835            }
836
837            Err(e)
838        }
839    }
840}
841
842/// Provision all database and schema infrastructure required for a staged deployment.
843///
844/// After this completes, both the suffixed staging schemas (where new objects will be
845/// created) and the production schemas (swap targets) are guaranteed to exist.
846async fn create_databases_and_schemas(
847    executor: &DeploymentExecutor<'_>,
848    planned_project: &Project,
849    schema_set: &BTreeSet<SchemaQualifier>,
850    staging_suffix: &str,
851) -> Result<(), CliError> {
852    // Create project databases that aren't in schema_set
853    // (schema_set databases will be created by prepare_databases_and_schemas)
854    let schema_set_dbs: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
855    for db in &planned_project.databases {
856        if !schema_set_dbs.contains(db.name.as_str()) {
857            executor.ensure_database(&db.name).await?;
858            verbose!("  Ensured database {} exists", db.name);
859        }
860    }
861
862    // Create staging schemas + apply mod_statements
863    progress::stage_start("Creating staging schemas and applying setup statements");
864    let schema_start = Instant::now();
865    executor
866        .prepare_databases_and_schemas(planned_project, schema_set, Some(staging_suffix))
867        .await?;
868    let schema_duration = schema_start.elapsed();
869    progress::stage_success(
870        &format!(
871            "Created {} staging schema(s) with setup statements",
872            schema_set.len()
873        ),
874        schema_duration,
875    );
876
877    // Create production schemas for swap
878    if !executor.is_dry_run() {
879        for sq in schema_set {
880            executor.ensure_schema(&sq.database, &sq.schema).await?;
881            verbose!("  Ensured schema {}.{} exists", sq.database, sq.schema);
882        }
883    }
884
885    Ok(())
886}
887
888/// Provision staging clusters that mirror the size and configuration of their
889/// production counterparts.
890///
891/// Clusters that already exist are skipped. Cluster names are recorded for rollback
892/// tracking before any cluster is created, so partial failures can be cleaned up.
893async fn create_staging_clusters(
894    executor: &DeploymentExecutor<'_>,
895    client: &Client,
896    stage_name: &str,
897    cluster_set: &BTreeSet<String>,
898    staging_suffix: &str,
899) -> Result<(), CliError> {
900    // Write cluster mappings BEFORE creating clusters so abort can clean up on failure
901    let cluster_names: Vec<String> = cluster_set.iter().cloned().collect();
902    executor
903        .record_deployment_clusters(stage_name, &cluster_names)
904        .await?;
905
906    progress::stage_start("Creating staging clusters");
907    let cluster_start = Instant::now();
908    let mut created_clusters = 0;
909
910    // Batch check which staging clusters already exist (skip in dry-run mode)
911    let existing_staging_clusters = if !executor.is_dry_run() {
912        let staging_cluster_names: Vec<String> = cluster_set
913            .iter()
914            .map(|name| format!("{}{}", name, staging_suffix))
915            .collect();
916        client
917            .introspection()
918            .check_clusters_exist(&staging_cluster_names)
919            .await?
920    } else {
921        BTreeSet::new()
922    };
923
924    for prod_cluster in cluster_set {
925        let staging_cluster = format!("{}{}", prod_cluster, staging_suffix);
926
927        if executor.is_dry_run() {
928            // Config is unused in dry-run mode; provide a placeholder.
929            let placeholder = ClusterConfig::Managed {
930                options: ClusterOptions {
931                    size: String::new(),
932                    replication_factor: 1,
933                },
934                grants: Vec::new(),
935            };
936            executor
937                .create_cluster(&staging_cluster, prod_cluster, &placeholder)
938                .await?;
939            created_clusters += 1;
940            continue;
941        }
942
943        // Check if staging cluster already exists using batch result
944        if existing_staging_clusters.contains(&staging_cluster) {
945            verbose!("  Cluster '{}' already exists, skipping", staging_cluster);
946            continue;
947        }
948
949        // Get production cluster configuration (handles both managed and unmanaged)
950        let config = client
951            .introspection()
952            .get_cluster_config(prod_cluster)
953            .await?;
954
955        let config = match config {
956            Some(config) => config,
957            None => {
958                return Err(CliError::ClusterNotFound {
959                    name: prod_cluster.clone(),
960                });
961            }
962        };
963
964        executor
965            .create_cluster(&staging_cluster, prod_cluster, &config)
966            .await?;
967        created_clusters += 1;
968
969        log_cluster_creation(&staging_cluster, prod_cluster, &config);
970    }
971
972    let cluster_duration = cluster_start.elapsed();
973    progress::stage_success(
974        &format!("Created {} cluster(s)", created_clusters),
975        cluster_duration,
976    );
977
978    Ok(())
979}
980
981/// Log verbose details about a newly created staging cluster.
982fn log_cluster_creation(staging_cluster: &str, prod_cluster: &str, config: &ClusterConfig) {
983    match config {
984        ClusterConfig::Managed { options, grants } => {
985            verbose!(
986                "  Created managed cluster '{}' (size: {}, replication_factor: {}, {} grant(s), cloned from '{}')",
987                staging_cluster,
988                options.size,
989                options.replication_factor,
990                grants.len(),
991                prod_cluster
992            );
993        }
994        ClusterConfig::Unmanaged { replicas, grants } => {
995            verbose!(
996                "  Created unmanaged cluster '{}' with {} replica(s), {} grant(s) (cloned from '{}')",
997                staging_cluster,
998                replicas.len(),
999                grants.len(),
1000                prod_cluster
1001            );
1002            for replica in replicas {
1003                verbose!(
1004                    "    - {} (size: {}{})",
1005                    replica.name,
1006                    replica.size,
1007                    replica
1008                        .availability_zone
1009                        .as_ref()
1010                        .map(|az| format!(", az: {}", az))
1011                        .unwrap_or_default()
1012                );
1013            }
1014        }
1015    }
1016}
1017
1018/// Execute all object definitions (views, materialized views, indexes) into the
1019/// staging schemas.
1020///
1021/// Regular objects are created with suffixed names; replacement materialized views
1022/// are linked to their production targets via `CREATE REPLACEMENT MATERIALIZED VIEW
1023/// ... FOR`. Returns the total number of successfully deployed objects.
1024async fn deploy_objects_to_staging<'a>(
1025    executor: &DeploymentExecutor<'_>,
1026    objects: &'a [(ObjectId, &'a DatabaseObject)],
1027    replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
1028    planned_project: &'a Project,
1029    cluster_set: &BTreeSet<String>,
1030    staging_suffix: &str,
1031) -> Result<usize, CliError> {
1032    progress::stage_start("Deploying objects to staging");
1033    let deploy_start = Instant::now();
1034
1035    // Collect ObjectIds from objects being deployed for the staging transformer
1036    // Include both regular objects and replacement MVs
1037    let objects_to_deploy_set: BTreeSet<_> = objects
1038        .iter()
1039        .chain(replacement_mvs.iter())
1040        .map(|(oid, _)| oid.clone())
1041        .collect();
1042
1043    // Deploy external indexes
1044    let mut external_indexes: Vec<_> = planned_project
1045        .iter_objects()
1046        .filter(|object| !objects_to_deploy_set.contains(&object.id))
1047        .flat_map(extract_external_indexes)
1048        .filter_map(|(cluster, index)| cluster_set.contains(&cluster.name).then_some(index))
1049        .collect();
1050
1051    // Transform cluster names in external indexes for staging
1052    normalize::transform_cluster_names_for_staging(&mut external_indexes, staging_suffix);
1053    for index in external_indexes {
1054        verbose!("Creating external index {}", index);
1055        executor.execute_sql(&index).await?;
1056    }
1057
1058    // Build the set of replacement object IDs from the replacement MVs slice.
1059    // Only these specific objects have their references left unsuffixed.
1060    let replacement_object_ids: BTreeSet<ObjectId> =
1061        replacement_mvs.iter().map(|(oid, _)| oid.clone()).collect();
1062
1063    let mut success_count = 0;
1064
1065    // Deploy regular objects
1066    for (idx, (object_id, typed_obj)) in objects.iter().enumerate() {
1067        verbose!(
1068            "Applying {}/{}: {}{} (to schema {}{})",
1069            idx + 1,
1070            objects.len(),
1071            object_id.object(),
1072            staging_suffix,
1073            object_id.schema(),
1074            staging_suffix
1075        );
1076
1077        deploy_single_object(
1078            executor,
1079            object_id,
1080            typed_obj,
1081            staging_suffix,
1082            planned_project,
1083            &objects_to_deploy_set,
1084            &replacement_object_ids,
1085            |stmt| stmt,
1086        )
1087        .await?;
1088        success_count += 1;
1089    }
1090
1091    // Deploy replacement MVs using CREATE REPLACEMENT MATERIALIZED VIEW ... FOR
1092    for (idx, (object_id, typed_obj)) in replacement_mvs.iter().enumerate() {
1093        verbose!(
1094            "Applying replacement MV {}/{}: {} FOR {}",
1095            idx + 1,
1096            replacement_mvs.len(),
1097            object_id.object(),
1098            object_id
1099        );
1100
1101        let production_target = object_id.to_unresolved_item_name();
1102        deploy_single_object(
1103            executor,
1104            object_id,
1105            typed_obj,
1106            staging_suffix,
1107            planned_project,
1108            &objects_to_deploy_set,
1109            &replacement_object_ids,
1110            |stmt| match stmt {
1111                Statement::CreateMaterializedView(mut mv) => {
1112                    mv.replacement_for =
1113                        Some(mz_sql_parser::ast::RawItemName::Name(production_target));
1114                    Statement::CreateMaterializedView(mv)
1115                }
1116                other => other,
1117            },
1118        )
1119        .await?;
1120        success_count += 1;
1121    }
1122
1123    let deploy_duration = deploy_start.elapsed();
1124    progress::stage_success(
1125        &format!("Deployed {} view(s)/materialized view(s)", success_count),
1126        deploy_duration,
1127    );
1128
1129    Ok(success_count)
1130}
1131
1132/// Rollback staging resources on deployment failure.
1133///
1134/// This function performs best-effort cleanup of staging resources created during
1135/// a failed deployment. It mirrors the abort command logic but uses a best-effort
1136/// approach where cleanup failures are logged rather than returning errors.
1137///
1138/// # Arguments
1139/// * `client` - Database client
1140/// * `environment` - Staging environment name
1141///
1142/// # Returns
1143/// Number of schemas and clusters that were cleaned up (for summary message)
1144async fn rollback_staging_resources(client: &Client, environment: &str) -> (usize, usize) {
1145    let staging_schemas = best_effort_fetch(
1146        client
1147            .introspection()
1148            .get_staging_schemas(environment)
1149            .await,
1150        "query staging schemas",
1151    );
1152    let staging_clusters = best_effort_fetch(
1153        client
1154            .introspection()
1155            .get_staging_clusters(environment)
1156            .await,
1157        "query staging clusters",
1158    );
1159
1160    let schema_count = staging_schemas.len();
1161    let cluster_count = staging_clusters.len();
1162
1163    if !staging_schemas.is_empty() {
1164        verbose!("Dropping staging schemas...");
1165        if let Err(e) = client
1166            .introspection()
1167            .drop_staging_schemas(&staging_schemas)
1168            .await
1169        {
1170            verbose!("Warning: Failed to drop some schemas: {}", e);
1171        } else {
1172            for sq in &staging_schemas {
1173                verbose!("  Dropped {}.{}", sq.database, sq.schema);
1174            }
1175        }
1176    }
1177
1178    if !staging_clusters.is_empty() {
1179        verbose!("Dropping staging clusters...");
1180        if let Err(e) = client
1181            .introspection()
1182            .drop_staging_clusters(&staging_clusters)
1183            .await
1184        {
1185            verbose!("Warning: Failed to drop some clusters: {}", e);
1186        } else {
1187            for cluster in &staging_clusters {
1188                verbose!("  Dropped {}", cluster);
1189            }
1190        }
1191    }
1192
1193    verbose!("Deleting deployment records...");
1194    best_effort_delete(
1195        client
1196            .deployments()
1197            .delete_deployment_clusters(environment)
1198            .await,
1199        "delete cluster records",
1200    );
1201    best_effort_delete(
1202        client
1203            .deployments()
1204            .delete_pending_statements(environment)
1205            .await,
1206        "delete pending statements",
1207    );
1208    best_effort_delete(
1209        client
1210            .deployments()
1211            .delete_replacement_mvs(environment)
1212            .await,
1213        "delete replacement MV records",
1214    );
1215    best_effort_delete(
1216        client.deployments().delete_deployment(environment).await,
1217        "delete deployment records",
1218    );
1219
1220    (schema_count, cluster_count)
1221}
1222
1223/// Best-effort fetch wrapper used by rollback.
1224///
1225/// Converts query failures into empty results so cleanup can continue and report
1226/// as much progress as possible instead of aborting midway.
1227fn best_effort_fetch<T, E: fmt::Display>(result: Result<Vec<T>, E>, action: &str) -> Vec<T> {
1228    match result {
1229        Ok(values) => values,
1230        Err(e) => {
1231            verbose!("Warning: Failed to {}: {}", action, e);
1232            vec![]
1233        }
1234    }
1235}
1236
1237/// Best-effort delete wrapper used by rollback metadata cleanup.
1238fn best_effort_delete<E: fmt::Display>(result: Result<(), E>, action: &str) {
1239    if let Err(e) = result {
1240        verbose!("Warning: Failed to {}: {}", action, e);
1241    }
1242}
1243
1244/// Deploy a single object to the staging environment.
1245///
1246/// Handles normalization, execution, and deployment of indexes/grants/comments.
1247/// The `transform` callback allows the caller to modify the normalized statement
1248/// before execution (e.g., to set `replacement_for` on replacement MVs).
1249///
1250/// `replacement_objects` is the set of specific object IDs being updated
1251/// in-place via replacement MVs. References to these objects are left
1252/// unsuffixed (pointing to production). During full deployment the set is
1253/// empty, so every reference is suffixed to point at the staging schemas.
1254async fn deploy_single_object(
1255    executor: &DeploymentExecutor<'_>,
1256    object_id: &ObjectId,
1257    typed_obj: &DatabaseObject,
1258    staging_suffix: &str,
1259    planned_project: &Project,
1260    objects_to_deploy_set: &BTreeSet<ObjectId>,
1261    replacement_objects: &BTreeSet<ObjectId>,
1262    transform: impl FnOnce(Statement) -> Statement,
1263) -> Result<(), CliError> {
1264    let original_fqn: FullyQualifiedName = object_id.clone().into();
1265
1266    let mut visitor = NormalizingVisitor::staging(
1267        &original_fqn,
1268        staging_suffix.to_string(),
1269        &planned_project.external_dependencies,
1270        Some(objects_to_deploy_set),
1271        replacement_objects,
1272    );
1273
1274    let stmt = typed_obj
1275        .stmt
1276        .clone()
1277        .normalize_name_with(&visitor, &original_fqn.to_item_name())
1278        .normalize_dependencies_with(&mut visitor)
1279        .normalize_cluster_with(&visitor);
1280
1281    let stmt = transform(stmt);
1282    executor.execute_sql(&stmt).await?;
1283
1284    // Deploy indexes, grants, and comments
1285    let mut indexes = typed_obj.indexes.clone();
1286    let mut grants = typed_obj.grants.clone();
1287    let mut comments = typed_obj.comments.clone();
1288
1289    visitor.normalize_index_references(&mut indexes);
1290    visitor.normalize_index_clusters(&mut indexes);
1291    visitor.normalize_grant_references(&mut grants);
1292    visitor.normalize_comment_references(&mut comments);
1293
1294    for index in &indexes {
1295        executor.execute_sql(index).await?;
1296    }
1297
1298    for grant in &grants {
1299        executor.execute_sql(grant).await?;
1300    }
1301
1302    for comment in &comments {
1303        executor.execute_sql(comment).await?;
1304    }
1305
1306    Ok(())
1307}
1308
1309/// Check that no new replacement objects are being added to schemas that already
1310/// have production objects.
1311fn validate_no_new_objects_in_existing_stable_schemas(
1312    change_set: &ChangeSet,
1313    production_snapshot: &DeploymentSnapshot,
1314) -> Result<(), CliError> {
1315    let blocked: Vec<_> = change_set
1316        .new_replacement_objects
1317        .iter()
1318        .filter(|obj| {
1319            !production_snapshot.objects.contains_key(obj)
1320                && production_snapshot
1321                    .objects
1322                    .keys()
1323                    .any(|prod| prod.database() == obj.database() && prod.schema() == obj.schema())
1324        })
1325        .collect();
1326
1327    if blocked.is_empty() {
1328        return Ok(());
1329    }
1330
1331    let first = blocked[0];
1332    Err(CliError::NewObjectInExistingStableSchema {
1333        database: first.expect_database().to_string(),
1334        schema: first.schema().to_string(),
1335        objects: blocked.iter().map(|o| o.object().to_string()).collect(),
1336    })
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341    use super::*;
1342    use crate::project::analysis::deployment_snapshot::build_snapshot_from_planned;
1343    use crate::project::ir::compiled;
1344    use crate::project::ir::object_id::ObjectId;
1345    use std::collections::{BTreeMap, BTreeSet};
1346
1347    #[mz_ore::test]
1348    fn parse_qualified_schema_requires_two_parts() {
1349        // Fully qualified parses to (database, schema).
1350        let sq = parse_qualified_schema("app.core").expect("qualified name parses");
1351        assert_eq!(
1352            sq,
1353            SchemaQualifier::new("app".to_string(), "core".to_string())
1354        );
1355
1356        // A reserved-word component is handled via the SQL parser.
1357        let sq = parse_qualified_schema("app.\"select\"").expect("quoted keyword parses");
1358        assert_eq!(
1359            sq,
1360            SchemaQualifier::new("app".to_string(), "select".to_string())
1361        );
1362
1363        // Unqualified (1-part) and over-qualified (3-part) are rejected.
1364        assert!(parse_qualified_schema("core").is_err());
1365        assert!(parse_qualified_schema("app.core.orders").is_err());
1366    }
1367
1368    /// Parse SQL strings into a compiled::DatabaseObject.
1369    ///
1370    /// The first CREATE statement becomes the main statement.
1371    /// Any CREATE INDEX statements become entries in the indexes vec.
1372    fn make_typed_object(sqls: &[&str]) -> DatabaseObject {
1373        let mut stmt = None;
1374        let mut indexes = Vec::new();
1375
1376        for sql in sqls {
1377            let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1378            for p in parsed {
1379                match p.ast {
1380                    mz_sql_parser::ast::Statement::CreateView(s) => {
1381                        stmt = Some(Statement::CreateView(s));
1382                    }
1383                    mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
1384                        stmt = Some(Statement::CreateMaterializedView(s));
1385                    }
1386                    mz_sql_parser::ast::Statement::CreateTable(s) => {
1387                        stmt = Some(Statement::CreateTable(s));
1388                    }
1389                    mz_sql_parser::ast::Statement::CreateSource(s) => {
1390                        stmt = Some(Statement::CreateSource(s));
1391                    }
1392                    mz_sql_parser::ast::Statement::CreateConnection(s) => {
1393                        stmt = Some(Statement::CreateConnection(s));
1394                    }
1395                    mz_sql_parser::ast::Statement::CreateSecret(s) => {
1396                        stmt = Some(Statement::CreateSecret(s));
1397                    }
1398                    mz_sql_parser::ast::Statement::CreateIndex(s) => {
1399                        indexes.push(s);
1400                    }
1401                    other => panic!("Unexpected statement type: {:?}", other),
1402                }
1403            }
1404        }
1405
1406        DatabaseObject {
1407            path: std::path::PathBuf::from("test.sql"),
1408            stmt: stmt.expect("Expected at least one CREATE statement"),
1409            indexes,
1410            grants: vec![],
1411            comments: vec![],
1412            tests: vec![],
1413        }
1414    }
1415
1416    /// Build a graph::Project from a list of (database, schema, object_name, typed_obj) tuples.
1417    fn make_planned_project(objects: Vec<(&str, &str, &str, DatabaseObject)>) -> Project {
1418        // Group into databases -> schemas -> objects
1419        let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1420
1421        for (database, schema, _name, typed_obj) in objects {
1422            db_map
1423                .entry(database.to_string())
1424                .or_default()
1425                .entry(schema.to_string())
1426                .or_default()
1427                .push(typed_obj);
1428        }
1429
1430        let databases: Vec<compiled::Database> = db_map
1431            .into_iter()
1432            .map(|(db_name, schemas)| compiled::Database {
1433                name: db_name,
1434                schemas: schemas
1435                    .into_iter()
1436                    .map(|(schema_name, objs)| compiled::Schema {
1437                        name: schema_name,
1438                        objects: objs,
1439                        mod_statements: None,
1440                    })
1441                    .collect(),
1442                mod_statements: None,
1443            })
1444            .collect();
1445
1446        let typed_project = compiled::Project {
1447            databases,
1448            replacement_schemas: BTreeSet::new(),
1449        };
1450
1451        Project::from(typed_project)
1452    }
1453
1454    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1455    #[mz_ore::test]
1456    fn test_full_deploy_view_not_indexed_mixed_types() {
1457        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1458        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1459        let source_obj = make_typed_object(&[
1460            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1461        ]);
1462        let conn_obj =
1463            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1464        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1465
1466        let objects: Vec<ObjectRef> = vec![
1467            (
1468                ObjectId::new("db".into(), "public".into(), "my_view".into()),
1469                &view_obj,
1470            ),
1471            (
1472                ObjectId::new("db".into(), "public".into(), "my_table".into()),
1473                &table_obj,
1474            ),
1475            (
1476                ObjectId::new("db".into(), "public".into(), "my_source".into()),
1477                &source_obj,
1478            ),
1479            (
1480                ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1481                &conn_obj,
1482            ),
1483            (
1484                ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1485                &secret_obj,
1486            ),
1487        ];
1488
1489        let replacement_ids = BTreeSet::new();
1490        let partitioned = partition_objects(objects, &replacement_ids);
1491
1492        // Only the view should be in staged objects
1493        assert_eq!(
1494            partitioned.objects.len(),
1495            1,
1496            "Only the view should be staged"
1497        );
1498        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1499
1500        // Table, source, connection, secret should be counted as skipped
1501        assert_eq!(
1502            partitioned.table_count, 4,
1503            "Table, source, connection, and secret should all be skipped"
1504        );
1505
1506        // No sinks or replacement MVs
1507        assert!(partitioned.sinks.is_empty());
1508        assert!(partitioned.replacement_mvs.is_empty());
1509
1510        // Collect stage resources
1511        let (schema_set, cluster_set) =
1512            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1513
1514        // Should have the view's schema
1515        assert_eq!(schema_set.len(), 1);
1516        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1517
1518        // View has no cluster, so cluster_set should be empty
1519        assert!(
1520            cluster_set.is_empty(),
1521            "View without index should not require any clusters"
1522        );
1523    }
1524
1525    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1526    #[mz_ore::test]
1527    fn test_full_deploy_view_indexed_different_cluster() {
1528        let view_obj = make_typed_object(&[
1529            "CREATE VIEW my_view AS SELECT 1",
1530            "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1531        ]);
1532        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1533        let source_obj = make_typed_object(&[
1534            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1535        ]);
1536        let conn_obj =
1537            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1538        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1539
1540        let objects: Vec<ObjectRef> = vec![
1541            (
1542                ObjectId::new("db".into(), "public".into(), "my_view".into()),
1543                &view_obj,
1544            ),
1545            (
1546                ObjectId::new("db".into(), "public".into(), "my_table".into()),
1547                &table_obj,
1548            ),
1549            (
1550                ObjectId::new("db".into(), "public".into(), "my_source".into()),
1551                &source_obj,
1552            ),
1553            (
1554                ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1555                &conn_obj,
1556            ),
1557            (
1558                ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1559                &secret_obj,
1560            ),
1561        ];
1562
1563        let replacement_ids = BTreeSet::new();
1564        let partitioned = partition_objects(objects, &replacement_ids);
1565
1566        // Only the view should be staged
1567        assert_eq!(partitioned.objects.len(), 1);
1568        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1569        assert_eq!(partitioned.table_count, 4);
1570
1571        // Collect stage resources
1572        let (schema_set, cluster_set) =
1573            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1574
1575        // Should have view's schema
1576        assert_eq!(schema_set.len(), 1);
1577        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1578
1579        // Should stage index_cluster (from the view's index), NOT source_cluster
1580        assert_eq!(
1581            cluster_set.len(),
1582            1,
1583            "Should only have index_cluster, got: {:?}",
1584            cluster_set
1585        );
1586        assert!(
1587            cluster_set.contains("index_cluster"),
1588            "Should stage index_cluster from the view's index"
1589        );
1590        assert!(
1591            !cluster_set.contains("source_cluster"),
1592            "Should NOT stage source_cluster (source is not staged)"
1593        );
1594    }
1595
1596    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1597    #[mz_ore::test]
1598    fn test_incremental_deploy_view_updated_not_indexed() {
1599        // Build planned project with all object types
1600        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1601        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1602        let source_obj = make_typed_object(&[
1603            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1604        ]);
1605        let conn_obj =
1606            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1607        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1608
1609        let planned_project = make_planned_project(vec![
1610            ("db", "public", "my_view", view_obj),
1611            ("db", "storage", "my_table", table_obj),
1612            ("db", "storage", "my_source", source_obj),
1613            ("db", "storage", "my_conn", conn_obj),
1614            ("db", "storage", "my_secret", secret_obj),
1615        ]);
1616
1617        // Build new snapshot from planned project
1618        let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1619
1620        // Build old snapshot: same hashes for everything EXCEPT the view
1621        let mut old_snapshot = DeploymentSnapshot::default();
1622        for (object_id, hash) in &new_snapshot.objects {
1623            if object_id.object() == "my_view" {
1624                // Different hash to simulate the view having changed
1625                old_snapshot
1626                    .objects
1627                    .insert(object_id.clone(), "old_hash".to_string());
1628            } else {
1629                old_snapshot.objects.insert(object_id.clone(), hash.clone());
1630            }
1631        }
1632
1633        // Compute changeset
1634        let change_set = ChangeSet::from_deployment_snapshot_comparison(
1635            &old_snapshot,
1636            &new_snapshot,
1637            &planned_project,
1638            &BTreeSet::new(),
1639        );
1640
1641        // The view should be in objects_to_deploy
1642        assert!(
1643            change_set.objects_to_deploy.contains(&ObjectId::new(
1644                "db".into(),
1645                "public".into(),
1646                "my_view".into()
1647            )),
1648            "Changed view should be in objects_to_deploy"
1649        );
1650
1651        // Get filtered objects and partition
1652        let objects = planned_project
1653            .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1654            .unwrap();
1655
1656        let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1657
1658        // Only the view should be staged
1659        assert_eq!(
1660            partitioned.objects.len(),
1661            1,
1662            "Only the changed view should be staged, got: {:?}",
1663            partitioned
1664                .objects
1665                .iter()
1666                .map(|(id, _)| id.object())
1667                .collect::<Vec<_>>()
1668        );
1669        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1670
1671        let (schema_set, cluster_set) =
1672            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1673
1674        assert_eq!(schema_set.len(), 1);
1675        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1676        assert!(
1677            cluster_set.is_empty(),
1678            "View without index should not require any clusters"
1679        );
1680    }
1681
1682    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1683    #[mz_ore::test]
1684    fn test_incremental_deploy_view_updated_indexed_different_cluster() {
1685        // Build planned project with indexed view and other object types
1686        let view_obj = make_typed_object(&[
1687            "CREATE VIEW my_view AS SELECT 1",
1688            "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1689        ]);
1690        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1691        let source_obj = make_typed_object(&[
1692            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1693        ]);
1694        let conn_obj =
1695            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1696        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1697
1698        let planned_project = make_planned_project(vec![
1699            ("db", "public", "my_view", view_obj),
1700            ("db", "storage", "my_table", table_obj),
1701            ("db", "storage", "my_source", source_obj),
1702            ("db", "storage", "my_conn", conn_obj),
1703            ("db", "storage", "my_secret", secret_obj),
1704        ]);
1705
1706        // Build new snapshot from planned project
1707        let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1708
1709        // Build old snapshot: same hashes except the view
1710        let mut old_snapshot = DeploymentSnapshot::default();
1711        for (object_id, hash) in &new_snapshot.objects {
1712            if object_id.object() == "my_view" {
1713                old_snapshot
1714                    .objects
1715                    .insert(object_id.clone(), "old_hash".to_string());
1716            } else {
1717                old_snapshot.objects.insert(object_id.clone(), hash.clone());
1718            }
1719        }
1720
1721        // Compute changeset
1722        let change_set = ChangeSet::from_deployment_snapshot_comparison(
1723            &old_snapshot,
1724            &new_snapshot,
1725            &planned_project,
1726            &BTreeSet::new(),
1727        );
1728
1729        assert!(
1730            change_set.objects_to_deploy.contains(&ObjectId::new(
1731                "db".into(),
1732                "public".into(),
1733                "my_view".into()
1734            )),
1735            "Changed view should be in objects_to_deploy"
1736        );
1737
1738        // Get filtered objects and partition
1739        let objects = planned_project
1740            .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1741            .unwrap();
1742
1743        let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1744
1745        // Only the view should be staged
1746        assert_eq!(
1747            partitioned.objects.len(),
1748            1,
1749            "Only the changed view should be staged, got: {:?}",
1750            partitioned
1751                .objects
1752                .iter()
1753                .map(|(id, _)| id.object())
1754                .collect::<Vec<_>>()
1755        );
1756        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1757
1758        let (schema_set, cluster_set) =
1759            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1760
1761        assert_eq!(schema_set.len(), 1);
1762        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1763
1764        // Should stage index_cluster only, NOT source_cluster
1765        assert_eq!(
1766            cluster_set.len(),
1767            1,
1768            "Should only have index_cluster, got: {:?}",
1769            cluster_set
1770        );
1771        assert!(
1772            cluster_set.contains("index_cluster"),
1773            "Should stage index_cluster from the view's index"
1774        );
1775        assert!(
1776            !cluster_set.contains("source_cluster"),
1777            "Should NOT stage source_cluster"
1778        );
1779    }
1780
1781    fn make_empty_change_set() -> ChangeSet {
1782        ChangeSet {
1783            changed_objects: BTreeSet::new(),
1784            dirty_schemas: BTreeSet::new(),
1785            dirty_clusters: BTreeSet::new(),
1786            objects_to_deploy: BTreeSet::new(),
1787            new_replacement_objects: BTreeSet::new(),
1788            changed_replacement_objects: BTreeSet::new(),
1789        }
1790    }
1791
1792    #[mz_ore::test]
1793    fn test_validate_no_new_replacement_objects_first_deploy() {
1794        let cs = make_empty_change_set();
1795        let snapshot = DeploymentSnapshot::default();
1796        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1797    }
1798
1799    #[mz_ore::test]
1800    fn test_validate_new_replacement_objects_in_brand_new_schema() {
1801        let mut cs = make_empty_change_set();
1802        cs.new_replacement_objects.insert(ObjectId::new(
1803            "db".into(),
1804            "analytics".into(),
1805            "new_mv".into(),
1806        ));
1807
1808        // Production has objects in a *different* schema, not analytics
1809        let mut snapshot = DeploymentSnapshot::default();
1810        snapshot.objects.insert(
1811            ObjectId::new("db".into(), "public".into(), "existing_mv".into()),
1812            "hash1".into(),
1813        );
1814
1815        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1816    }
1817
1818    #[mz_ore::test]
1819    fn test_validate_new_replacement_objects_in_existing_production_schema() {
1820        let mut cs = make_empty_change_set();
1821        cs.new_replacement_objects.insert(ObjectId::new(
1822            "db".into(),
1823            "analytics".into(),
1824            "new_mv".into(),
1825        ));
1826
1827        // Production already has objects in analytics
1828        let mut snapshot = DeploymentSnapshot::default();
1829        snapshot.objects.insert(
1830            ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1831            "hash1".into(),
1832        );
1833
1834        let result = validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot);
1835        assert!(result.is_err());
1836        match result.unwrap_err() {
1837            CliError::NewObjectInExistingStableSchema {
1838                database,
1839                schema,
1840                objects,
1841            } => {
1842                assert_eq!(database, "db");
1843                assert_eq!(schema, "analytics");
1844                assert_eq!(objects, vec!["new_mv"]);
1845            }
1846            other => panic!("Expected NewObjectInExistingStableSchema, got: {:?}", other),
1847        }
1848    }
1849
1850    #[mz_ore::test]
1851    fn test_validate_changed_replacement_objects_only() {
1852        let mut cs = make_empty_change_set();
1853        // Only changed objects, no new ones
1854        cs.changed_replacement_objects.insert(ObjectId::new(
1855            "db".into(),
1856            "analytics".into(),
1857            "changed_mv".into(),
1858        ));
1859
1860        let mut snapshot = DeploymentSnapshot::default();
1861        snapshot.objects.insert(
1862            ObjectId::new("db".into(), "analytics".into(), "changed_mv".into()),
1863            "hash1".into(),
1864        );
1865
1866        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1867    }
1868
1869    #[mz_ore::test]
1870    fn test_validate_mixed_new_in_new_schema_changed_in_existing() {
1871        let mut cs = make_empty_change_set();
1872        // New object in a brand-new schema
1873        cs.new_replacement_objects.insert(ObjectId::new(
1874            "db".into(),
1875            "new_schema".into(),
1876            "new_mv".into(),
1877        ));
1878        // Changed object in an existing schema
1879        cs.changed_replacement_objects.insert(ObjectId::new(
1880            "db".into(),
1881            "existing_schema".into(),
1882            "changed_mv".into(),
1883        ));
1884
1885        // Production has objects only in existing_schema
1886        let mut snapshot = DeploymentSnapshot::default();
1887        snapshot.objects.insert(
1888            ObjectId::new("db".into(), "existing_schema".into(), "changed_mv".into()),
1889            "hash1".into(),
1890        );
1891
1892        // Should pass: the new object is in a schema with no production objects
1893        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1894    }
1895
1896    #[mz_ore::test]
1897    fn test_validate_transitioning_objects_in_existing_schema_allowed() {
1898        let mut cs = make_empty_change_set();
1899        // Object transitioning from Objects→Replacement lands in new_replacement_objects
1900        cs.new_replacement_objects.insert(ObjectId::new(
1901            "db".into(),
1902            "analytics".into(),
1903            "existing_mv".into(),
1904        ));
1905
1906        // The same object already exists in production (it's transitioning, not new)
1907        let mut snapshot = DeploymentSnapshot::default();
1908        snapshot.objects.insert(
1909            ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1910            "hash1".into(),
1911        );
1912
1913        // Should pass: the object already exists in production, it's just changing schema kind
1914        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1915    }
1916
1917    fn make_planned_project_with_replacement_schemas(
1918        objects: Vec<(&str, &str, &str, DatabaseObject)>,
1919        replacement_schemas: BTreeSet<SchemaQualifier>,
1920    ) -> Project {
1921        let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1922
1923        for (database, schema, _name, typed_obj) in objects {
1924            db_map
1925                .entry(database.to_string())
1926                .or_default()
1927                .entry(schema.to_string())
1928                .or_default()
1929                .push(typed_obj);
1930        }
1931
1932        let databases: Vec<compiled::Database> = db_map
1933            .into_iter()
1934            .map(|(db_name, schemas)| compiled::Database {
1935                name: db_name,
1936                schemas: schemas
1937                    .into_iter()
1938                    .map(|(schema_name, objs)| compiled::Schema {
1939                        name: schema_name,
1940                        objects: objs,
1941                        mod_statements: None,
1942                    })
1943                    .collect(),
1944                mod_statements: None,
1945            })
1946            .collect();
1947
1948        let typed_project = compiled::Project {
1949            databases,
1950            replacement_schemas,
1951        };
1952
1953        Project::from(typed_project)
1954    }
1955
1956    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1957    #[mz_ore::test]
1958    fn test_build_snapshot_replacement_schema_kind() {
1959        let mv_obj =
1960            make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1961        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1962
1963        let mut replacement_schemas = BTreeSet::new();
1964        replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
1965
1966        let planned_project = make_planned_project_with_replacement_schemas(
1967            vec![
1968                ("db", "stable", "my_mv", mv_obj),
1969                ("db", "regular", "my_view", view_obj),
1970            ],
1971            replacement_schemas,
1972        );
1973
1974        let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1975
1976        // The stable schema should be Replacement
1977        assert_eq!(
1978            snapshot
1979                .schemas
1980                .get(&SchemaQualifier::new("db".into(), "stable".into())),
1981            Some(&DeploymentKind::Replacement),
1982            "Replacement schema should have Replacement kind in snapshot"
1983        );
1984
1985        // The regular schema should be Objects
1986        assert_eq!(
1987            snapshot
1988                .schemas
1989                .get(&SchemaQualifier::new("db".into(), "regular".into())),
1990            Some(&DeploymentKind::Objects),
1991            "Regular schema should have Objects kind in snapshot"
1992        );
1993    }
1994
1995    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1996    #[mz_ore::test]
1997    fn test_build_snapshot_no_replacement_schemas_all_objects() {
1998        let mv_obj =
1999            make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
2000        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
2001
2002        let planned_project = make_planned_project(vec![
2003            ("db", "stable", "my_mv", mv_obj),
2004            ("db", "regular", "my_view", view_obj),
2005        ]);
2006
2007        let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
2008
2009        // Both should be Objects when no replacement_schemas configured
2010        assert_eq!(
2011            snapshot
2012                .schemas
2013                .get(&SchemaQualifier::new("db".into(), "stable".into())),
2014            Some(&DeploymentKind::Objects),
2015        );
2016        assert_eq!(
2017            snapshot
2018                .schemas
2019                .get(&SchemaQualifier::new("db".into(), "regular".into())),
2020            Some(&DeploymentKind::Objects),
2021        );
2022    }
2023
2024    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
2025    #[mz_ore::test]
2026    fn test_record_stage_metadata_transition_override() {
2027        // During an Objects→Replacement transition, MVs go through the regular
2028        // objects path (not replacement_mvs), but the metadata must still record
2029        // the schema as Replacement.
2030        let mv_obj =
2031            make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
2032
2033        // Objects path (transition — MV is NOT in replacement_mvs)
2034        let objects: Vec<ObjectRef> = vec![(
2035            ObjectId::new("db".into(), "stable".into(), "my_mv".into()),
2036            &mv_obj,
2037        )];
2038        let sinks: Vec<ObjectRef> = vec![];
2039        let replacement_mvs: Vec<ObjectRef> = vec![];
2040
2041        // The project declares "stable" as a replacement schema
2042        let mut replacement_schemas = BTreeSet::new();
2043        replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
2044
2045        // Simulate what record_stage_metadata does (without DB calls)
2046        let mut staging_snapshot = DeploymentSnapshot::default();
2047
2048        for (object_id, typed_obj) in &objects {
2049            let hash = deployment_snapshot::compute_typed_hash(typed_obj);
2050            staging_snapshot.objects.insert(object_id.clone(), hash);
2051            staging_snapshot.schemas.insert(
2052                SchemaQualifier::new(
2053                    object_id.expect_database().to_string(),
2054                    object_id.schema().to_string(),
2055                ),
2056                DeploymentKind::Objects,
2057            );
2058        }
2059
2060        for (object_id, typed_obj) in &sinks {
2061            let hash = deployment_snapshot::compute_typed_hash(typed_obj);
2062            staging_snapshot.objects.insert(object_id.clone(), hash);
2063            staging_snapshot
2064                .schemas
2065                .entry(SchemaQualifier::new(
2066                    object_id.expect_database().to_string(),
2067                    object_id.schema().to_string(),
2068                ))
2069                .or_insert(DeploymentKind::Sinks);
2070        }
2071
2072        for (object_id, typed_obj) in &replacement_mvs {
2073            let hash = deployment_snapshot::compute_typed_hash(typed_obj);
2074            staging_snapshot.objects.insert(object_id.clone(), hash);
2075            staging_snapshot.schemas.insert(
2076                SchemaQualifier::new(
2077                    object_id.expect_database().to_string(),
2078                    object_id.schema().to_string(),
2079                ),
2080                DeploymentKind::Replacement,
2081            );
2082        }
2083
2084        // Before the fix, the schema would remain Objects here.
2085        assert_eq!(
2086            staging_snapshot
2087                .schemas
2088                .get(&SchemaQualifier::new("db".into(), "stable".into())),
2089            Some(&DeploymentKind::Objects),
2090            "Before override, schema should be Objects (from regular objects path)"
2091        );
2092
2093        // Apply the replacement_schemas override (the fix)
2094        for sq in &replacement_schemas {
2095            if staging_snapshot.schemas.contains_key(sq) {
2096                staging_snapshot
2097                    .schemas
2098                    .insert(sq.clone(), DeploymentKind::Replacement);
2099            }
2100        }
2101
2102        // After the fix, the schema should be Replacement
2103        assert_eq!(
2104            staging_snapshot
2105                .schemas
2106                .get(&SchemaQualifier::new("db".into(), "stable".into())),
2107            Some(&DeploymentKind::Replacement),
2108            "After override, schema should be Replacement"
2109        );
2110    }
2111
2112    #[mz_ore::test]
2113    fn test_record_stage_metadata_override_only_applies_to_existing_schemas() {
2114        // The override should NOT create new schema entries — it only applies to
2115        // schemas that already have objects in the staging snapshot.
2116        let replacement_schemas =
2117            BTreeSet::from([SchemaQualifier::new("db".into(), "nonexistent".into())]);
2118
2119        let mut staging_snapshot = DeploymentSnapshot::default();
2120
2121        // Apply the replacement_schemas override
2122        for sq in &replacement_schemas {
2123            if staging_snapshot.schemas.contains_key(sq) {
2124                staging_snapshot
2125                    .schemas
2126                    .insert(sq.clone(), DeploymentKind::Replacement);
2127            }
2128        }
2129
2130        // Should NOT have created a new entry
2131        assert!(
2132            staging_snapshot.schemas.is_empty(),
2133            "Override should not create entries for schemas with no objects"
2134        );
2135    }
2136}