Skip to main content

mz_deploy/cli/commands/
stage.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Stage command - deploy to staging environment with renamed schemas and clusters.
11//!
12//! Runs the standard blue/green deployment pipeline that can later be promoted
13//! to production via [`super::promote`].
14
15use super::ObjectRef;
16use crate::cli::CliError;
17use crate::cli::executor::{self, DeploymentExecutor};
18use crate::cli::{git, progress};
19use crate::client::DeploymentMode;
20use crate::client::{
21    Client, ClusterConfig, ClusterOptions, DeploymentKind, PendingStatement, ReplacementMvRecord,
22};
23use crate::config::Settings;
24use crate::log;
25use crate::project::SchemaQualifier;
26use crate::project::analysis::changeset::ChangeSet;
27use crate::project::analysis::deployment_snapshot::{self, DeploymentSnapshot};
28use crate::project::analysis::deps::extract_external_indexes;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::{DatabaseObject, FullyQualifiedName};
31use crate::project::ir::graph::Project;
32use crate::project::ir::object_id::ObjectId;
33use crate::project::resolve::normalize::{self, NormalizingVisitor};
34use crate::verbose;
35use mz_ore::option::OptionExt;
36use std::collections::BTreeSet;
37use std::fmt;
38use std::path::Path;
39use std::time::Instant;
40
41/// Planning output produced once and consumed by all stage execution phases.
42///
43/// Keeps stage deterministic by passing one analyzed view of objects/resources through
44/// validation, metadata recording, and resource creation.
45struct StageAnalysis<'a> {
46    objects: Vec<ObjectRef<'a>>,
47    sinks: Vec<ObjectRef<'a>>,
48    replacement_mvs: Vec<ObjectRef<'a>>,
49    schema_set: BTreeSet<SchemaQualifier>,
50    cluster_set: BTreeSet<String>,
51}
52
53/// Classification result for objects considered during staging.
54///
55/// Separates deploy-now objects from deferred/special-case categories that apply handles later.
56struct PartitionedObjects<'a> {
57    objects: Vec<ObjectRef<'a>>,
58    sinks: Vec<ObjectRef<'a>>,
59    replacement_mvs: Vec<ObjectRef<'a>>,
60    table_count: usize,
61}
62
63/// Summary returned after a successful stage run, used for terminal output
64/// and `--json`.
65#[derive(serde::Serialize)]
66struct StageResult {
67    deploy_id: String,
68    objects_deployed: usize,
69    #[serde(skip)]
70    duration: std::time::Duration,
71}
72
73impl fmt::Display for StageResult {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        write!(
76            f,
77            "  \u{2713} Successfully deployed {} objects to '{}' staging environment ({:.1}s)",
78            self.objects_deployed,
79            self.deploy_id,
80            self.duration.as_secs_f64()
81        )
82    }
83}
84
85#[derive(serde::Serialize)]
86struct StagePlan {
87    deploy_id: String,
88    schemas: Vec<StagePlanSchema>,
89    clusters: Vec<StagePlanCluster>,
90    objects: Vec<StagePlanObject>,
91    sinks: Vec<StagePlanObject>,
92    replacement_mvs: Vec<StagePlanObject>,
93}
94
95#[derive(serde::Serialize)]
96struct StagePlanSchema {
97    database: String,
98    schema: String,
99    staging_schema: String,
100}
101
102#[derive(serde::Serialize)]
103struct StagePlanCluster {
104    production_cluster: String,
105    staging_cluster: String,
106}
107
108#[derive(serde::Serialize)]
109struct StagePlanObject {
110    database: String,
111    schema: String,
112    object: String,
113}
114
115impl fmt::Display for StagePlan {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        writeln!(f, "Stage plan for '{}':", self.deploy_id)?;
118
119        if !self.schemas.is_empty() {
120            writeln!(f, "\nSchemas ({}):", self.schemas.len())?;
121            for s in &self.schemas {
122                writeln!(
123                    f,
124                    "    {}.{} \u{2192} {}",
125                    s.database, s.schema, s.staging_schema
126                )?;
127            }
128        }
129
130        if !self.clusters.is_empty() {
131            writeln!(f, "\nClusters ({}):", self.clusters.len())?;
132            for c in &self.clusters {
133                writeln!(
134                    f,
135                    "    {} \u{2192} {}",
136                    c.production_cluster, c.staging_cluster
137                )?;
138            }
139        }
140
141        if !self.objects.is_empty() {
142            writeln!(f, "\nObjects ({}):", self.objects.len())?;
143            for o in &self.objects {
144                writeln!(f, "    {}.{}.{}", o.database, o.schema, o.object)?;
145            }
146        }
147
148        if !self.sinks.is_empty() {
149            writeln!(f, "\nSinks ({}):", self.sinks.len())?;
150            for s in &self.sinks {
151                writeln!(f, "    {}.{}.{}", s.database, s.schema, s.object)?;
152            }
153        }
154
155        if !self.replacement_mvs.is_empty() {
156            writeln!(f, "\nReplacement MVs ({}):", self.replacement_mvs.len())?;
157            for m in &self.replacement_mvs {
158                writeln!(f, "    {}.{}.{}", m.database, m.schema, m.object)?;
159            }
160        }
161
162        Ok(())
163    }
164}
165
166/// Deploy the project to a staging environment.
167///
168/// Creates renamed schemas and clusters alongside production, deploys the
169/// project onto them, and records metadata so a later `apply` can atomically
170/// swap staging into production.
171///
172/// # Arguments
173/// * `settings` - Resolved CLI settings (project directory, profile, etc.)
174/// * `stage_name` - Optional environment name; defaults to a git-derived or
175///   random identifier
176/// * `allow_dirty` - Allow deploying with uncommitted changes
177/// * `no_rollback` - Skip automatic rollback on failure (for debugging)
178/// * `dry_run` - Print SQL instead of executing it
179///
180/// # Returns
181/// `Ok(())` if the deployment succeeds.
182///
183/// # Errors
184/// Surfaces `CliError` variants from git checks, project compilation, and
185/// database execution.
186pub async fn run(
187    settings: &Settings,
188    stage_name: Option<&str>,
189    allow_dirty: bool,
190    no_rollback: bool,
191    dry_run: bool,
192) -> Result<(), CliError> {
193    let profile = settings.connection();
194    let directory = &settings.directory;
195    let start_time = Instant::now();
196
197    if !allow_dirty && git::is_dirty(directory) {
198        return Err(CliError::GitDirty);
199    }
200
201    let stage_name = stage_name
202        .owned()
203        .or_else(|| git::get_git_commit(directory).map(|sha| sha.chars().take(7).collect()))
204        .unwrap_or_else(executor::generate_random_env_name);
205
206    let planned_project = super::compile::run(settings, true).await?;
207    let staging_suffix = format!("_{}", stage_name);
208
209    let client = Client::connect_with_profile(profile.clone())
210        .await
211        .map_err(CliError::Connection)?;
212
213    crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
214    let role =
215        crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
216    crate::cli::commands::setup::require_deployer(role)?;
217
218    let Some(analysis) = analyze_project_changes(&client, &planned_project, &stage_name).await?
219    else {
220        return Ok(());
221    };
222
223    validate_project_for_stage(
224        &client,
225        &planned_project,
226        directory,
227        &analysis.schema_set,
228        &analysis.cluster_set,
229    )
230    .await?;
231
232    if !dry_run {
233        record_stage_metadata(
234            &client,
235            directory,
236            &stage_name,
237            &staging_suffix,
238            &analysis.objects,
239            &analysis.sinks,
240            &analysis.replacement_mvs,
241            &planned_project.replacement_schemas,
242        )
243        .await?;
244    }
245
246    if dry_run {
247        let plan = StagePlan {
248            deploy_id: stage_name.to_string(),
249            schemas: analysis
250                .schema_set
251                .iter()
252                .map(|sq| StagePlanSchema {
253                    database: sq.database.clone(),
254                    schema: sq.schema.clone(),
255                    staging_schema: format!("{}{}", sq.schema, staging_suffix),
256                })
257                .collect(),
258            clusters: analysis
259                .cluster_set
260                .iter()
261                .map(|c| StagePlanCluster {
262                    production_cluster: c.clone(),
263                    staging_cluster: format!("{}{}", c, staging_suffix),
264                })
265                .collect(),
266            objects: analysis
267                .objects
268                .iter()
269                .map(|(id, _)| StagePlanObject {
270                    database: id.expect_database().to_string(),
271                    schema: id.schema().to_string(),
272                    object: id.object().to_string(),
273                })
274                .collect(),
275            sinks: analysis
276                .sinks
277                .iter()
278                .map(|(id, _)| StagePlanObject {
279                    database: id.expect_database().to_string(),
280                    schema: id.schema().to_string(),
281                    object: id.object().to_string(),
282                })
283                .collect(),
284            replacement_mvs: analysis
285                .replacement_mvs
286                .iter()
287                .map(|(id, _)| StagePlanObject {
288                    database: id.expect_database().to_string(),
289                    schema: id.schema().to_string(),
290                    object: id.object().to_string(),
291                })
292                .collect(),
293        };
294        log::output(&plan);
295        return Ok(());
296    }
297
298    let success_count = create_resources_with_rollback(
299        &client,
300        &stage_name,
301        &staging_suffix,
302        &analysis.schema_set,
303        &analysis.cluster_set,
304        &planned_project,
305        &analysis.objects,
306        &analysis.replacement_mvs,
307        no_rollback,
308        dry_run,
309    )
310    .await?;
311
312    let result = StageResult {
313        deploy_id: stage_name.to_string(),
314        objects_deployed: success_count,
315        duration: start_time.elapsed(),
316    };
317    log::output(&result);
318    log::print_deploy_id(&stage_name);
319    Ok(())
320}
321
322/// Produces the stage deployment plan by diffing against current production snapshot.
323///
324/// Handles incremental-vs-full mode, applies stage-specific object filtering,
325/// validates table dependencies, and returns resource sets required for execution.
326async fn analyze_project_changes<'a>(
327    client: &Client,
328    planned_project: &'a Project,
329    stage_name: &str,
330) -> Result<Option<StageAnalysis<'a>>, CliError> {
331    progress::stage_start("Analyzing project changes");
332    let analyze_start = Instant::now();
333
334    if client
335        .deployments()
336        .get_deployment_metadata(stage_name)
337        .await?
338        .is_some()
339    {
340        return Err(CliError::InvalidEnvironmentName {
341            name: format!("deployment '{}' already exists", stage_name),
342        });
343    }
344
345    let new_snapshot = deployment_snapshot::build_snapshot_from_planned(planned_project)?;
346    let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
347
348    let change_set = if production_snapshot.objects.is_empty() {
349        None
350    } else {
351        Some(ChangeSet::from_deployment_snapshot_comparison(
352            &production_snapshot,
353            &new_snapshot,
354            planned_project,
355        ))
356    };
357
358    // Reject adding brand-new objects to a schema that already has production objects.
359    // During incremental deployment:
360    //
361    //   1. The changeset correctly classifies these as `new_replacement_objects`
362    //   2. But `new_replacement_objects` is never consumed by `partition_objects` —
363    //      only `changed_replacement_objects` feeds into it
364    //   3. The new MV ends up in the regular `objects` partition and deploys to the
365    //      staging schema (e.g. `core_v3`)
366    //   4. Metadata for the production schema (`core`) gets overwritten to
367    //      `DeploymentKind::Replacement` by the changed MVs
368    //   5. During promote, the staging schema is skipped from swap and dropped CASCADE
369    //      — the new MV is lost
370    //
371    // The proper long-term fix is to support `ALTER MATERIALIZED VIEW ... SET SCHEMA`.
372    // With that, new MVs could be deployed to the staging schema alongside changed MVs,
373    // then moved into the production schema during promote via `SET SCHEMA` instead of
374    // relying on schema swap. This would eliminate the need for mixed deployment kinds
375    // or special-casing in `partition_objects` — new objects simply deploy to staging
376    // and get relocated on promote, just like changed objects get swapped.
377    //
378    // A brand-new stable schema (no prior production objects) deploys fine via normal
379    // blue-green swap — only schemas with existing production objects are affected.
380    if let Some(ref cs) = change_set {
381        validate_no_new_objects_in_existing_stable_schemas(cs, &production_snapshot)?;
382    }
383
384    let objects = select_stage_objects(planned_project, change_set.as_ref())?;
385    if objects.is_empty() && change_set.as_ref().is_some_and(ChangeSet::is_empty) {
386        progress::success("No changes detected compared to production, skipping deployment");
387        return Ok(None);
388    }
389
390    let replacement_object_ids = change_set
391        .as_ref()
392        .map(|cs| cs.changed_replacement_objects.clone())
393        .unwrap_or_default();
394    let partitioned = partition_objects(objects, &replacement_object_ids);
395    log_partition_summary(&partitioned);
396
397    let object_ids: BTreeSet<_> = partitioned
398        .objects
399        .iter()
400        .map(|(id, _)| id.clone())
401        .collect();
402    client
403        .validation()
404        .validate_table_dependencies(planned_project, &object_ids)
405        .await?;
406
407    let (schema_set, cluster_set) =
408        collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
409
410    let analyze_duration = analyze_start.elapsed();
411    progress::stage_success(
412        &format!(
413            "Ready to deploy {} view(s)/materialized view(s)",
414            partitioned.objects.len()
415        ),
416        analyze_duration,
417    );
418
419    Ok(Some(StageAnalysis {
420        objects: partitioned.objects,
421        sinks: partitioned.sinks,
422        replacement_mvs: partitioned.replacement_mvs,
423        schema_set,
424        cluster_set,
425    }))
426}
427
428/// Chooses the initial object set for stage before stage-specific partitioning.
429///
430/// Incremental mode uses the change set; full mode uses all sorted project objects.
431fn select_stage_objects<'a>(
432    planned_project: &'a Project,
433    change_set: Option<&ChangeSet>,
434) -> Result<Vec<ObjectRef<'a>>, CliError> {
435    if let Some(cs) = change_set {
436        if cs.is_empty() {
437            return Ok(Vec::new());
438        }
439        verbose!("{}", cs);
440        Ok(planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?)
441    } else {
442        verbose!("Full deployment: no production deployment found");
443        Ok(planned_project.get_sorted_objects()?)
444    }
445}
446
447/// Splits objects into stage execution categories.
448///
449/// Tables/sources are excluded, sinks are deferred to apply, and changed replacement MVs
450/// are tracked for special replacement handling.
451fn partition_objects<'a>(
452    objects: Vec<ObjectRef<'a>>,
453    replacement_object_ids: &BTreeSet<ObjectId>,
454) -> PartitionedObjects<'a> {
455    let mut kept = Vec::new();
456    let mut sinks = Vec::new();
457    let mut replacement_mvs = Vec::new();
458    let mut table_count = 0;
459
460    for (object_id, typed_obj) in objects {
461        match &typed_obj.stmt {
462            Statement::CreateTable(_)
463            | Statement::CreateTableFromSource(_)
464            | Statement::CreateSource(_)
465            | Statement::CreateSecret(_)
466            | Statement::CreateConnection(_) => {
467                table_count += 1;
468            }
469            Statement::CreateSink(_) => sinks.push((object_id, typed_obj)),
470            Statement::CreateMaterializedView(_) if replacement_object_ids.contains(&object_id) => {
471                replacement_mvs.push((object_id, typed_obj));
472            }
473            _ => kept.push((object_id, typed_obj)),
474        }
475    }
476
477    PartitionedObjects {
478        objects: kept,
479        sinks,
480        replacement_mvs,
481        table_count,
482    }
483}
484
485/// Reports the partitioning decisions visible to users in verbose mode.
486fn log_partition_summary(partitioned: &PartitionedObjects<'_>) {
487    if partitioned.table_count > 0 {
488        verbose!(
489            "Skipped {} table(s)/source(s) - use 'mz-deploy apply' for those",
490            partitioned.table_count
491        );
492    }
493    if !partitioned.sinks.is_empty() {
494        verbose!(
495            "Found {} sink(s) - will be created during apply after swap",
496            partitioned.sinks.len()
497        );
498    }
499    if !partitioned.replacement_mvs.is_empty() {
500        verbose!(
501            "Found {} replacement MV(s) - will use CREATE REPLACEMENT protocol",
502            partitioned.replacement_mvs.len()
503        );
504    }
505}
506
507/// Derives schema/cluster prerequisites for resource creation.
508///
509/// Builds schema and cluster sets solely from the objects being staged.
510/// Apply-managed objects (sources, tables, secrets, connections) are excluded
511/// by `partition_objects`, so their schemas and clusters are never staged.
512fn collect_stage_resources(
513    objects: &[ObjectRef<'_>],
514    replacement_mvs: &[ObjectRef<'_>],
515) -> (BTreeSet<SchemaQualifier>, BTreeSet<String>) {
516    let mut schema_set = BTreeSet::new();
517    let mut cluster_set = BTreeSet::new();
518
519    for (object_id, typed_obj) in objects.iter().chain(replacement_mvs.iter()) {
520        schema_set.insert(SchemaQualifier::new(
521            object_id.expect_database().to_string(),
522            object_id.schema().to_string(),
523        ));
524        cluster_set.extend(typed_obj.clusters());
525    }
526
527    (schema_set, cluster_set)
528}
529
530/// Runs all preflight database validations required before mutating deployment state.
531///
532/// This is intentionally isolated so stage fails before any metadata/resource writes.
533async fn validate_project_for_stage(
534    client: &Client,
535    planned_project: &Project,
536    directory: &Path,
537    schema_set: &BTreeSet<SchemaQualifier>,
538    cluster_set: &BTreeSet<String>,
539) -> Result<(), CliError> {
540    progress::stage_start("Validating project");
541    let validate_start = Instant::now();
542    client
543        .validation()
544        .validate_project(planned_project, directory)
545        .await?;
546    client
547        .validation()
548        .validate_cluster_isolation(planned_project)
549        .await?;
550    client
551        .validation()
552        .validate_privileges(planned_project)
553        .await?;
554    client
555        .validation()
556        .validate_schema_ownership(schema_set)
557        .await?;
558    client
559        .validation()
560        .validate_cluster_ownership(cluster_set)
561        .await?;
562    client
563        .validation()
564        .validate_sink_connections_exist(planned_project)
565        .await?;
566    let validate_duration = validate_start.elapsed();
567    progress::stage_success("All validations passed", validate_duration);
568    Ok(())
569}
570
571/// Persists stage deployment state and deferred apply actions.
572///
573/// Records object hashes plus schema deployment kinds, then stores sink/replacement
574/// records that the `apply` command consumes after swap.
575async fn record_stage_metadata(
576    client: &Client,
577    directory: &Path,
578    stage_name: &str,
579    staging_suffix: &str,
580    objects: &[ObjectRef<'_>],
581    sinks: &[ObjectRef<'_>],
582    replacement_mvs: &[ObjectRef<'_>],
583    replacement_schemas: &BTreeSet<SchemaQualifier>,
584) -> Result<(), CliError> {
585    progress::stage_start("Recording deployment metadata");
586    let metadata_start = Instant::now();
587    let metadata = executor::collect_deployment_metadata(client, directory).await;
588
589    let mut staging_snapshot = DeploymentSnapshot::default();
590
591    for (object_id, typed_obj) in objects {
592        let hash = deployment_snapshot::compute_typed_hash(typed_obj);
593        staging_snapshot.objects.insert(object_id.clone(), hash);
594        staging_snapshot.schemas.insert(
595            SchemaQualifier::new(
596                object_id.expect_database().to_string(),
597                object_id.schema().to_string(),
598            ),
599            DeploymentKind::Objects,
600        );
601    }
602
603    for (object_id, typed_obj) in sinks {
604        let hash = deployment_snapshot::compute_typed_hash(typed_obj);
605        staging_snapshot.objects.insert(object_id.clone(), hash);
606        staging_snapshot
607            .schemas
608            .entry(SchemaQualifier::new(
609                object_id.expect_database().to_string(),
610                object_id.schema().to_string(),
611            ))
612            .or_insert(DeploymentKind::Sinks);
613    }
614
615    for (object_id, typed_obj) in replacement_mvs {
616        let hash = deployment_snapshot::compute_typed_hash(typed_obj);
617        staging_snapshot.objects.insert(object_id.clone(), hash);
618        staging_snapshot.schemas.insert(
619            SchemaQualifier::new(
620                object_id.expect_database().to_string(),
621                object_id.schema().to_string(),
622            ),
623            DeploymentKind::Replacement,
624        );
625    }
626
627    // Ensure replacement schemas record the correct kind.
628    // During Objects→Replacement transitions, MVs go through the regular objects
629    // path (for blue-green swap), but the metadata must reflect the final kind
630    // so future deploys know to use CREATE REPLACEMENT.
631    for sq in replacement_schemas {
632        if staging_snapshot.schemas.contains_key(sq) {
633            staging_snapshot
634                .schemas
635                .insert(sq.clone(), DeploymentKind::Replacement);
636        }
637    }
638
639    deployment_snapshot::write_to_database(
640        client,
641        &staging_snapshot,
642        stage_name,
643        &metadata,
644        None,
645        DeploymentMode::Stage,
646    )
647    .await?;
648
649    if !sinks.is_empty() {
650        let pending_statements: Vec<PendingStatement> = sinks
651            .iter()
652            .enumerate()
653            .map(|(idx, (object_id, typed_obj))| {
654                let original_fqn: FullyQualifiedName = object_id.clone().into();
655                let mut visitor = NormalizingVisitor::fully_qualifying(&original_fqn);
656                let stmt = typed_obj
657                    .stmt
658                    .clone()
659                    .normalize_name_with(&visitor, &original_fqn.to_item_name())
660                    .normalize_dependencies_with(&mut visitor);
661                let hash = deployment_snapshot::compute_typed_hash(typed_obj);
662                #[allow(clippy::as_conversions)]
663                PendingStatement {
664                    deploy_id: stage_name.to_string(),
665                    sequence_num: idx as i32,
666                    database: object_id.expect_database().to_string(),
667                    schema: object_id.schema().to_string(),
668                    object: object_id.object().to_string(),
669                    object_hash: hash,
670                    statement_sql: stmt.to_string(),
671                    statement_kind: "sink".to_string(),
672                    executed_at: None,
673                }
674            })
675            .collect();
676
677        client
678            .deployments()
679            .insert_pending_statements(&pending_statements)
680            .await?;
681        verbose!(
682            "Stored {} pending sink statement(s)",
683            pending_statements.len()
684        );
685    }
686
687    if !replacement_mvs.is_empty() {
688        let records: Vec<ReplacementMvRecord> = replacement_mvs
689            .iter()
690            .map(|(object_id, _)| ReplacementMvRecord {
691                deploy_id: stage_name.to_string(),
692                target_database: object_id.expect_database().to_string(),
693                target_schema: object_id.schema().to_string(),
694                target_name: object_id.object().to_string(),
695                replacement_schema: format!("{}{}", object_id.schema(), staging_suffix),
696            })
697            .collect();
698        client
699            .deployments()
700            .insert_replacement_mvs(&records)
701            .await?;
702        verbose!("Stored {} replacement MV record(s)", records.len());
703    }
704
705    let metadata_duration = metadata_start.elapsed();
706    progress::stage_success("Deployment metadata recorded", metadata_duration);
707    Ok(())
708}
709
710/// Top-level orchestrator for the staging deployment pipeline.
711///
712/// Provisions all databases, schemas, clusters, and objects needed for a blue-green
713/// deployment. On failure, automatically rolls back every resource created during
714/// this invocation unless the `no_rollback` flag is set.
715#[allow(clippy::too_many_arguments)]
716async fn create_resources_with_rollback<'a>(
717    client: &Client,
718    stage_name: &str,
719    staging_suffix: &str,
720    schema_set: &BTreeSet<SchemaQualifier>,
721    cluster_set: &BTreeSet<String>,
722    planned_project: &'a Project,
723    objects: &'a [(ObjectId, &'a DatabaseObject)],
724    replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
725    no_rollback: bool,
726    dry_run: bool,
727) -> Result<usize, CliError> {
728    let executor = DeploymentExecutor::with_dry_run(client, dry_run);
729
730    let result = async {
731        create_databases_and_schemas(&executor, planned_project, schema_set, staging_suffix)
732            .await?;
733        create_staging_clusters(&executor, client, stage_name, cluster_set, staging_suffix).await?;
734        deploy_objects_to_staging(
735            &executor,
736            objects,
737            replacement_mvs,
738            planned_project,
739            cluster_set,
740            staging_suffix,
741        )
742        .await
743    }
744    .await;
745
746    match result {
747        Ok(count) => Ok(count),
748        Err(e) if dry_run || no_rollback => {
749            if !dry_run {
750                progress::error("Deployment failed (skipping rollback due to --no-rollback flag)");
751            }
752            Err(e)
753        }
754        Err(e) => {
755            progress::error("Deployment failed, rolling back...");
756            let (schemas, clusters) = rollback_staging_resources(client, stage_name).await;
757
758            if schemas > 0 || clusters > 0 {
759                progress::success(&format!(
760                    "Rolled back: {} schema(s), {} cluster(s)",
761                    schemas, clusters
762                ));
763            }
764
765            Err(e)
766        }
767    }
768}
769
770/// Provision all database and schema infrastructure required for a staged deployment.
771///
772/// After this completes, both the suffixed staging schemas (where new objects will be
773/// created) and the production schemas (swap targets) are guaranteed to exist.
774async fn create_databases_and_schemas(
775    executor: &DeploymentExecutor<'_>,
776    planned_project: &Project,
777    schema_set: &BTreeSet<SchemaQualifier>,
778    staging_suffix: &str,
779) -> Result<(), CliError> {
780    // Create project databases that aren't in schema_set
781    // (schema_set databases will be created by prepare_databases_and_schemas)
782    let schema_set_dbs: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
783    for db in &planned_project.databases {
784        if !schema_set_dbs.contains(db.name.as_str()) {
785            executor.ensure_database(&db.name).await?;
786            verbose!("  Ensured database {} exists", db.name);
787        }
788    }
789
790    // Create staging schemas + apply mod_statements
791    progress::stage_start("Creating staging schemas and applying setup statements");
792    let schema_start = Instant::now();
793    executor
794        .prepare_databases_and_schemas(planned_project, schema_set, Some(staging_suffix))
795        .await?;
796    let schema_duration = schema_start.elapsed();
797    progress::stage_success(
798        &format!(
799            "Created {} staging schema(s) with setup statements",
800            schema_set.len()
801        ),
802        schema_duration,
803    );
804
805    // Create production schemas for swap
806    if !executor.is_dry_run() {
807        for sq in schema_set {
808            executor.ensure_schema(&sq.database, &sq.schema).await?;
809            verbose!("  Ensured schema {}.{} exists", sq.database, sq.schema);
810        }
811    }
812
813    Ok(())
814}
815
816/// Provision staging clusters that mirror the size and configuration of their
817/// production counterparts.
818///
819/// Clusters that already exist are skipped. Cluster names are recorded for rollback
820/// tracking before any cluster is created, so partial failures can be cleaned up.
821async fn create_staging_clusters(
822    executor: &DeploymentExecutor<'_>,
823    client: &Client,
824    stage_name: &str,
825    cluster_set: &BTreeSet<String>,
826    staging_suffix: &str,
827) -> Result<(), CliError> {
828    // Write cluster mappings BEFORE creating clusters so abort can clean up on failure
829    let cluster_names: Vec<String> = cluster_set.iter().cloned().collect();
830    executor
831        .record_deployment_clusters(stage_name, &cluster_names)
832        .await?;
833
834    progress::stage_start("Creating staging clusters");
835    let cluster_start = Instant::now();
836    let mut created_clusters = 0;
837
838    // Batch check which staging clusters already exist (skip in dry-run mode)
839    let existing_staging_clusters = if !executor.is_dry_run() {
840        let staging_cluster_names: Vec<String> = cluster_set
841            .iter()
842            .map(|name| format!("{}{}", name, staging_suffix))
843            .collect();
844        client
845            .introspection()
846            .check_clusters_exist(&staging_cluster_names)
847            .await?
848    } else {
849        BTreeSet::new()
850    };
851
852    for prod_cluster in cluster_set {
853        let staging_cluster = format!("{}{}", prod_cluster, staging_suffix);
854
855        if executor.is_dry_run() {
856            // Config is unused in dry-run mode; provide a placeholder.
857            let placeholder = ClusterConfig::Managed {
858                options: ClusterOptions {
859                    size: String::new(),
860                    replication_factor: 1,
861                },
862                grants: Vec::new(),
863            };
864            executor
865                .create_cluster(&staging_cluster, prod_cluster, &placeholder)
866                .await?;
867            created_clusters += 1;
868            continue;
869        }
870
871        // Check if staging cluster already exists using batch result
872        if existing_staging_clusters.contains(&staging_cluster) {
873            verbose!("  Cluster '{}' already exists, skipping", staging_cluster);
874            continue;
875        }
876
877        // Get production cluster configuration (handles both managed and unmanaged)
878        let config = client
879            .introspection()
880            .get_cluster_config(prod_cluster)
881            .await?;
882
883        let config = match config {
884            Some(config) => config,
885            None => {
886                return Err(CliError::ClusterNotFound {
887                    name: prod_cluster.clone(),
888                });
889            }
890        };
891
892        executor
893            .create_cluster(&staging_cluster, prod_cluster, &config)
894            .await?;
895        created_clusters += 1;
896
897        log_cluster_creation(&staging_cluster, prod_cluster, &config);
898    }
899
900    let cluster_duration = cluster_start.elapsed();
901    progress::stage_success(
902        &format!("Created {} cluster(s)", created_clusters),
903        cluster_duration,
904    );
905
906    Ok(())
907}
908
909/// Log verbose details about a newly created staging cluster.
910fn log_cluster_creation(staging_cluster: &str, prod_cluster: &str, config: &ClusterConfig) {
911    match config {
912        ClusterConfig::Managed { options, grants } => {
913            verbose!(
914                "  Created managed cluster '{}' (size: {}, replication_factor: {}, {} grant(s), cloned from '{}')",
915                staging_cluster,
916                options.size,
917                options.replication_factor,
918                grants.len(),
919                prod_cluster
920            );
921        }
922        ClusterConfig::Unmanaged { replicas, grants } => {
923            verbose!(
924                "  Created unmanaged cluster '{}' with {} replica(s), {} grant(s) (cloned from '{}')",
925                staging_cluster,
926                replicas.len(),
927                grants.len(),
928                prod_cluster
929            );
930            for replica in replicas {
931                verbose!(
932                    "    - {} (size: {}{})",
933                    replica.name,
934                    replica.size,
935                    replica
936                        .availability_zone
937                        .as_ref()
938                        .map(|az| format!(", az: {}", az))
939                        .unwrap_or_default()
940                );
941            }
942        }
943    }
944}
945
946/// Execute all object definitions (views, materialized views, indexes) into the
947/// staging schemas.
948///
949/// Regular objects are created with suffixed names; replacement materialized views
950/// are linked to their production targets via `CREATE REPLACEMENT MATERIALIZED VIEW
951/// ... FOR`. Returns the total number of successfully deployed objects.
952async fn deploy_objects_to_staging<'a>(
953    executor: &DeploymentExecutor<'_>,
954    objects: &'a [(ObjectId, &'a DatabaseObject)],
955    replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
956    planned_project: &'a Project,
957    cluster_set: &BTreeSet<String>,
958    staging_suffix: &str,
959) -> Result<usize, CliError> {
960    progress::stage_start("Deploying objects to staging");
961    let deploy_start = Instant::now();
962
963    // Collect ObjectIds from objects being deployed for the staging transformer
964    // Include both regular objects and replacement MVs
965    let objects_to_deploy_set: BTreeSet<_> = objects
966        .iter()
967        .chain(replacement_mvs.iter())
968        .map(|(oid, _)| oid.clone())
969        .collect();
970
971    // Deploy external indexes
972    let mut external_indexes: Vec<_> = planned_project
973        .iter_objects()
974        .filter(|object| !objects_to_deploy_set.contains(&object.id))
975        .flat_map(extract_external_indexes)
976        .filter_map(|(cluster, index)| cluster_set.contains(&cluster.name).then_some(index))
977        .collect();
978
979    // Transform cluster names in external indexes for staging
980    normalize::transform_cluster_names_for_staging(&mut external_indexes, staging_suffix);
981    for index in external_indexes {
982        verbose!("Creating external index {}", index);
983        executor.execute_sql(&index).await?;
984    }
985
986    // Build the set of replacement object IDs from the replacement MVs slice.
987    // Only these specific objects have their references left unsuffixed.
988    let replacement_object_ids: BTreeSet<ObjectId> =
989        replacement_mvs.iter().map(|(oid, _)| oid.clone()).collect();
990
991    let mut success_count = 0;
992
993    // Deploy regular objects
994    for (idx, (object_id, typed_obj)) in objects.iter().enumerate() {
995        verbose!(
996            "Applying {}/{}: {}{} (to schema {}{})",
997            idx + 1,
998            objects.len(),
999            object_id.object(),
1000            staging_suffix,
1001            object_id.schema(),
1002            staging_suffix
1003        );
1004
1005        deploy_single_object(
1006            executor,
1007            object_id,
1008            typed_obj,
1009            staging_suffix,
1010            planned_project,
1011            &objects_to_deploy_set,
1012            &replacement_object_ids,
1013            |stmt| stmt,
1014        )
1015        .await?;
1016        success_count += 1;
1017    }
1018
1019    // Deploy replacement MVs using CREATE REPLACEMENT MATERIALIZED VIEW ... FOR
1020    for (idx, (object_id, typed_obj)) in replacement_mvs.iter().enumerate() {
1021        verbose!(
1022            "Applying replacement MV {}/{}: {} FOR {}",
1023            idx + 1,
1024            replacement_mvs.len(),
1025            object_id.object(),
1026            object_id
1027        );
1028
1029        let production_target = object_id.to_unresolved_item_name();
1030        deploy_single_object(
1031            executor,
1032            object_id,
1033            typed_obj,
1034            staging_suffix,
1035            planned_project,
1036            &objects_to_deploy_set,
1037            &replacement_object_ids,
1038            |stmt| match stmt {
1039                Statement::CreateMaterializedView(mut mv) => {
1040                    mv.replacement_for =
1041                        Some(mz_sql_parser::ast::RawItemName::Name(production_target));
1042                    Statement::CreateMaterializedView(mv)
1043                }
1044                other => other,
1045            },
1046        )
1047        .await?;
1048        success_count += 1;
1049    }
1050
1051    let deploy_duration = deploy_start.elapsed();
1052    progress::stage_success(
1053        &format!("Deployed {} view(s)/materialized view(s)", success_count),
1054        deploy_duration,
1055    );
1056
1057    Ok(success_count)
1058}
1059
1060/// Rollback staging resources on deployment failure.
1061///
1062/// This function performs best-effort cleanup of staging resources created during
1063/// a failed deployment. It mirrors the abort command logic but uses a best-effort
1064/// approach where cleanup failures are logged rather than returning errors.
1065///
1066/// # Arguments
1067/// * `client` - Database client
1068/// * `environment` - Staging environment name
1069///
1070/// # Returns
1071/// Number of schemas and clusters that were cleaned up (for summary message)
1072async fn rollback_staging_resources(client: &Client, environment: &str) -> (usize, usize) {
1073    let staging_schemas = best_effort_fetch(
1074        client
1075            .introspection()
1076            .get_staging_schemas(environment)
1077            .await,
1078        "query staging schemas",
1079    );
1080    let staging_clusters = best_effort_fetch(
1081        client
1082            .introspection()
1083            .get_staging_clusters(environment)
1084            .await,
1085        "query staging clusters",
1086    );
1087
1088    let schema_count = staging_schemas.len();
1089    let cluster_count = staging_clusters.len();
1090
1091    if !staging_schemas.is_empty() {
1092        verbose!("Dropping staging schemas...");
1093        if let Err(e) = client
1094            .introspection()
1095            .drop_staging_schemas(&staging_schemas)
1096            .await
1097        {
1098            verbose!("Warning: Failed to drop some schemas: {}", e);
1099        } else {
1100            for sq in &staging_schemas {
1101                verbose!("  Dropped {}.{}", sq.database, sq.schema);
1102            }
1103        }
1104    }
1105
1106    if !staging_clusters.is_empty() {
1107        verbose!("Dropping staging clusters...");
1108        if let Err(e) = client
1109            .introspection()
1110            .drop_staging_clusters(&staging_clusters)
1111            .await
1112        {
1113            verbose!("Warning: Failed to drop some clusters: {}", e);
1114        } else {
1115            for cluster in &staging_clusters {
1116                verbose!("  Dropped {}", cluster);
1117            }
1118        }
1119    }
1120
1121    verbose!("Deleting deployment records...");
1122    best_effort_delete(
1123        client
1124            .deployments()
1125            .delete_deployment_clusters(environment)
1126            .await,
1127        "delete cluster records",
1128    );
1129    best_effort_delete(
1130        client
1131            .deployments()
1132            .delete_pending_statements(environment)
1133            .await,
1134        "delete pending statements",
1135    );
1136    best_effort_delete(
1137        client
1138            .deployments()
1139            .delete_replacement_mvs(environment)
1140            .await,
1141        "delete replacement MV records",
1142    );
1143    best_effort_delete(
1144        client.deployments().delete_deployment(environment).await,
1145        "delete deployment records",
1146    );
1147
1148    (schema_count, cluster_count)
1149}
1150
1151/// Best-effort fetch wrapper used by rollback.
1152///
1153/// Converts query failures into empty results so cleanup can continue and report
1154/// as much progress as possible instead of aborting midway.
1155fn best_effort_fetch<T, E: fmt::Display>(result: Result<Vec<T>, E>, action: &str) -> Vec<T> {
1156    match result {
1157        Ok(values) => values,
1158        Err(e) => {
1159            verbose!("Warning: Failed to {}: {}", action, e);
1160            vec![]
1161        }
1162    }
1163}
1164
1165/// Best-effort delete wrapper used by rollback metadata cleanup.
1166fn best_effort_delete<E: fmt::Display>(result: Result<(), E>, action: &str) {
1167    if let Err(e) = result {
1168        verbose!("Warning: Failed to {}: {}", action, e);
1169    }
1170}
1171
1172/// Deploy a single object to the staging environment.
1173///
1174/// Handles normalization, execution, and deployment of indexes/grants/comments.
1175/// The `transform` callback allows the caller to modify the normalized statement
1176/// before execution (e.g., to set `replacement_for` on replacement MVs).
1177///
1178/// `replacement_objects` is the set of specific object IDs being updated
1179/// in-place via replacement MVs. References to these objects are left
1180/// unsuffixed (pointing to production). During full deployment the set is
1181/// empty, so every reference is suffixed to point at the staging schemas.
1182async fn deploy_single_object(
1183    executor: &DeploymentExecutor<'_>,
1184    object_id: &ObjectId,
1185    typed_obj: &DatabaseObject,
1186    staging_suffix: &str,
1187    planned_project: &Project,
1188    objects_to_deploy_set: &BTreeSet<ObjectId>,
1189    replacement_objects: &BTreeSet<ObjectId>,
1190    transform: impl FnOnce(Statement) -> Statement,
1191) -> Result<(), CliError> {
1192    let original_fqn: FullyQualifiedName = object_id.clone().into();
1193
1194    let mut visitor = NormalizingVisitor::staging(
1195        &original_fqn,
1196        staging_suffix.to_string(),
1197        &planned_project.external_dependencies,
1198        Some(objects_to_deploy_set),
1199        replacement_objects,
1200    );
1201
1202    let stmt = typed_obj
1203        .stmt
1204        .clone()
1205        .normalize_name_with(&visitor, &original_fqn.to_item_name())
1206        .normalize_dependencies_with(&mut visitor)
1207        .normalize_cluster_with(&visitor);
1208
1209    let stmt = transform(stmt);
1210    executor.execute_sql(&stmt).await?;
1211
1212    // Deploy indexes, grants, and comments
1213    let mut indexes = typed_obj.indexes.clone();
1214    let mut grants = typed_obj.grants.clone();
1215    let mut comments = typed_obj.comments.clone();
1216
1217    visitor.normalize_index_references(&mut indexes);
1218    visitor.normalize_index_clusters(&mut indexes);
1219    visitor.normalize_grant_references(&mut grants);
1220    visitor.normalize_comment_references(&mut comments);
1221
1222    for index in &indexes {
1223        executor.execute_sql(index).await?;
1224    }
1225
1226    for grant in &grants {
1227        executor.execute_sql(grant).await?;
1228    }
1229
1230    for comment in &comments {
1231        executor.execute_sql(comment).await?;
1232    }
1233
1234    Ok(())
1235}
1236
1237/// Check that no new replacement objects are being added to schemas that already
1238/// have production objects.
1239fn validate_no_new_objects_in_existing_stable_schemas(
1240    change_set: &ChangeSet,
1241    production_snapshot: &DeploymentSnapshot,
1242) -> Result<(), CliError> {
1243    let blocked: Vec<_> = change_set
1244        .new_replacement_objects
1245        .iter()
1246        .filter(|obj| {
1247            !production_snapshot.objects.contains_key(obj)
1248                && production_snapshot
1249                    .objects
1250                    .keys()
1251                    .any(|prod| prod.database() == obj.database() && prod.schema() == obj.schema())
1252        })
1253        .collect();
1254
1255    if blocked.is_empty() {
1256        return Ok(());
1257    }
1258
1259    let first = blocked[0];
1260    Err(CliError::NewObjectInExistingStableSchema {
1261        database: first.expect_database().to_string(),
1262        schema: first.schema().to_string(),
1263        objects: blocked.iter().map(|o| o.object().to_string()).collect(),
1264    })
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269    use super::*;
1270    use crate::project::analysis::deployment_snapshot::build_snapshot_from_planned;
1271    use crate::project::ir::compiled;
1272    use crate::project::ir::object_id::ObjectId;
1273    use std::collections::{BTreeMap, BTreeSet};
1274
1275    /// Parse SQL strings into a compiled::DatabaseObject.
1276    ///
1277    /// The first CREATE statement becomes the main statement.
1278    /// Any CREATE INDEX statements become entries in the indexes vec.
1279    fn make_typed_object(sqls: &[&str]) -> DatabaseObject {
1280        let mut stmt = None;
1281        let mut indexes = Vec::new();
1282
1283        for sql in sqls {
1284            let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1285            for p in parsed {
1286                match p.ast {
1287                    mz_sql_parser::ast::Statement::CreateView(s) => {
1288                        stmt = Some(Statement::CreateView(s));
1289                    }
1290                    mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
1291                        stmt = Some(Statement::CreateMaterializedView(s));
1292                    }
1293                    mz_sql_parser::ast::Statement::CreateTable(s) => {
1294                        stmt = Some(Statement::CreateTable(s));
1295                    }
1296                    mz_sql_parser::ast::Statement::CreateSource(s) => {
1297                        stmt = Some(Statement::CreateSource(s));
1298                    }
1299                    mz_sql_parser::ast::Statement::CreateConnection(s) => {
1300                        stmt = Some(Statement::CreateConnection(s));
1301                    }
1302                    mz_sql_parser::ast::Statement::CreateSecret(s) => {
1303                        stmt = Some(Statement::CreateSecret(s));
1304                    }
1305                    mz_sql_parser::ast::Statement::CreateIndex(s) => {
1306                        indexes.push(s);
1307                    }
1308                    other => panic!("Unexpected statement type: {:?}", other),
1309                }
1310            }
1311        }
1312
1313        DatabaseObject {
1314            path: std::path::PathBuf::from("test.sql"),
1315            stmt: stmt.expect("Expected at least one CREATE statement"),
1316            indexes,
1317            grants: vec![],
1318            comments: vec![],
1319            tests: vec![],
1320        }
1321    }
1322
1323    /// Build a graph::Project from a list of (database, schema, object_name, typed_obj) tuples.
1324    fn make_planned_project(objects: Vec<(&str, &str, &str, DatabaseObject)>) -> Project {
1325        // Group into databases -> schemas -> objects
1326        let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1327
1328        for (database, schema, _name, typed_obj) in objects {
1329            db_map
1330                .entry(database.to_string())
1331                .or_default()
1332                .entry(schema.to_string())
1333                .or_default()
1334                .push(typed_obj);
1335        }
1336
1337        let databases: Vec<compiled::Database> = db_map
1338            .into_iter()
1339            .map(|(db_name, schemas)| compiled::Database {
1340                name: db_name,
1341                schemas: schemas
1342                    .into_iter()
1343                    .map(|(schema_name, objs)| compiled::Schema {
1344                        name: schema_name,
1345                        objects: objs,
1346                        mod_statements: None,
1347                    })
1348                    .collect(),
1349                mod_statements: None,
1350            })
1351            .collect();
1352
1353        let typed_project = compiled::Project {
1354            databases,
1355            replacement_schemas: BTreeSet::new(),
1356        };
1357
1358        Project::from(typed_project)
1359    }
1360
1361    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1362    #[mz_ore::test]
1363    fn test_full_deploy_view_not_indexed_mixed_types() {
1364        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1365        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1366        let source_obj = make_typed_object(&[
1367            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1368        ]);
1369        let conn_obj =
1370            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1371        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1372
1373        let objects: Vec<ObjectRef> = vec![
1374            (
1375                ObjectId::new("db".into(), "public".into(), "my_view".into()),
1376                &view_obj,
1377            ),
1378            (
1379                ObjectId::new("db".into(), "public".into(), "my_table".into()),
1380                &table_obj,
1381            ),
1382            (
1383                ObjectId::new("db".into(), "public".into(), "my_source".into()),
1384                &source_obj,
1385            ),
1386            (
1387                ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1388                &conn_obj,
1389            ),
1390            (
1391                ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1392                &secret_obj,
1393            ),
1394        ];
1395
1396        let replacement_ids = BTreeSet::new();
1397        let partitioned = partition_objects(objects, &replacement_ids);
1398
1399        // Only the view should be in staged objects
1400        assert_eq!(
1401            partitioned.objects.len(),
1402            1,
1403            "Only the view should be staged"
1404        );
1405        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1406
1407        // Table, source, connection, secret should be counted as skipped
1408        assert_eq!(
1409            partitioned.table_count, 4,
1410            "Table, source, connection, and secret should all be skipped"
1411        );
1412
1413        // No sinks or replacement MVs
1414        assert!(partitioned.sinks.is_empty());
1415        assert!(partitioned.replacement_mvs.is_empty());
1416
1417        // Collect stage resources
1418        let (schema_set, cluster_set) =
1419            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1420
1421        // Should have the view's schema
1422        assert_eq!(schema_set.len(), 1);
1423        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1424
1425        // View has no cluster, so cluster_set should be empty
1426        assert!(
1427            cluster_set.is_empty(),
1428            "View without index should not require any clusters"
1429        );
1430    }
1431
1432    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1433    #[mz_ore::test]
1434    fn test_full_deploy_view_indexed_different_cluster() {
1435        let view_obj = make_typed_object(&[
1436            "CREATE VIEW my_view AS SELECT 1",
1437            "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1438        ]);
1439        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1440        let source_obj = make_typed_object(&[
1441            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1442        ]);
1443        let conn_obj =
1444            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1445        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1446
1447        let objects: Vec<ObjectRef> = vec![
1448            (
1449                ObjectId::new("db".into(), "public".into(), "my_view".into()),
1450                &view_obj,
1451            ),
1452            (
1453                ObjectId::new("db".into(), "public".into(), "my_table".into()),
1454                &table_obj,
1455            ),
1456            (
1457                ObjectId::new("db".into(), "public".into(), "my_source".into()),
1458                &source_obj,
1459            ),
1460            (
1461                ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1462                &conn_obj,
1463            ),
1464            (
1465                ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1466                &secret_obj,
1467            ),
1468        ];
1469
1470        let replacement_ids = BTreeSet::new();
1471        let partitioned = partition_objects(objects, &replacement_ids);
1472
1473        // Only the view should be staged
1474        assert_eq!(partitioned.objects.len(), 1);
1475        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1476        assert_eq!(partitioned.table_count, 4);
1477
1478        // Collect stage resources
1479        let (schema_set, cluster_set) =
1480            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1481
1482        // Should have view's schema
1483        assert_eq!(schema_set.len(), 1);
1484        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1485
1486        // Should stage index_cluster (from the view's index), NOT source_cluster
1487        assert_eq!(
1488            cluster_set.len(),
1489            1,
1490            "Should only have index_cluster, got: {:?}",
1491            cluster_set
1492        );
1493        assert!(
1494            cluster_set.contains("index_cluster"),
1495            "Should stage index_cluster from the view's index"
1496        );
1497        assert!(
1498            !cluster_set.contains("source_cluster"),
1499            "Should NOT stage source_cluster (source is not staged)"
1500        );
1501    }
1502
1503    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1504    #[mz_ore::test]
1505    fn test_incremental_deploy_view_updated_not_indexed() {
1506        // Build planned project with all object types
1507        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1508        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1509        let source_obj = make_typed_object(&[
1510            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1511        ]);
1512        let conn_obj =
1513            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1514        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1515
1516        let planned_project = make_planned_project(vec![
1517            ("db", "public", "my_view", view_obj),
1518            ("db", "storage", "my_table", table_obj),
1519            ("db", "storage", "my_source", source_obj),
1520            ("db", "storage", "my_conn", conn_obj),
1521            ("db", "storage", "my_secret", secret_obj),
1522        ]);
1523
1524        // Build new snapshot from planned project
1525        let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1526
1527        // Build old snapshot: same hashes for everything EXCEPT the view
1528        let mut old_snapshot = DeploymentSnapshot::default();
1529        for (object_id, hash) in &new_snapshot.objects {
1530            if object_id.object() == "my_view" {
1531                // Different hash to simulate the view having changed
1532                old_snapshot
1533                    .objects
1534                    .insert(object_id.clone(), "old_hash".to_string());
1535            } else {
1536                old_snapshot.objects.insert(object_id.clone(), hash.clone());
1537            }
1538        }
1539
1540        // Compute changeset
1541        let change_set = ChangeSet::from_deployment_snapshot_comparison(
1542            &old_snapshot,
1543            &new_snapshot,
1544            &planned_project,
1545        );
1546
1547        // The view should be in objects_to_deploy
1548        assert!(
1549            change_set.objects_to_deploy.contains(&ObjectId::new(
1550                "db".into(),
1551                "public".into(),
1552                "my_view".into()
1553            )),
1554            "Changed view should be in objects_to_deploy"
1555        );
1556
1557        // Get filtered objects and partition
1558        let objects = planned_project
1559            .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1560            .unwrap();
1561
1562        let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1563
1564        // Only the view should be staged
1565        assert_eq!(
1566            partitioned.objects.len(),
1567            1,
1568            "Only the changed view should be staged, got: {:?}",
1569            partitioned
1570                .objects
1571                .iter()
1572                .map(|(id, _)| id.object())
1573                .collect::<Vec<_>>()
1574        );
1575        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1576
1577        let (schema_set, cluster_set) =
1578            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1579
1580        assert_eq!(schema_set.len(), 1);
1581        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1582        assert!(
1583            cluster_set.is_empty(),
1584            "View without index should not require any clusters"
1585        );
1586    }
1587
1588    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1589    #[mz_ore::test]
1590    fn test_incremental_deploy_view_updated_indexed_different_cluster() {
1591        // Build planned project with indexed view and other object types
1592        let view_obj = make_typed_object(&[
1593            "CREATE VIEW my_view AS SELECT 1",
1594            "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1595        ]);
1596        let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1597        let source_obj = make_typed_object(&[
1598            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1599        ]);
1600        let conn_obj =
1601            make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1602        let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1603
1604        let planned_project = make_planned_project(vec![
1605            ("db", "public", "my_view", view_obj),
1606            ("db", "storage", "my_table", table_obj),
1607            ("db", "storage", "my_source", source_obj),
1608            ("db", "storage", "my_conn", conn_obj),
1609            ("db", "storage", "my_secret", secret_obj),
1610        ]);
1611
1612        // Build new snapshot from planned project
1613        let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1614
1615        // Build old snapshot: same hashes except the view
1616        let mut old_snapshot = DeploymentSnapshot::default();
1617        for (object_id, hash) in &new_snapshot.objects {
1618            if object_id.object() == "my_view" {
1619                old_snapshot
1620                    .objects
1621                    .insert(object_id.clone(), "old_hash".to_string());
1622            } else {
1623                old_snapshot.objects.insert(object_id.clone(), hash.clone());
1624            }
1625        }
1626
1627        // Compute changeset
1628        let change_set = ChangeSet::from_deployment_snapshot_comparison(
1629            &old_snapshot,
1630            &new_snapshot,
1631            &planned_project,
1632        );
1633
1634        assert!(
1635            change_set.objects_to_deploy.contains(&ObjectId::new(
1636                "db".into(),
1637                "public".into(),
1638                "my_view".into()
1639            )),
1640            "Changed view should be in objects_to_deploy"
1641        );
1642
1643        // Get filtered objects and partition
1644        let objects = planned_project
1645            .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1646            .unwrap();
1647
1648        let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1649
1650        // Only the view should be staged
1651        assert_eq!(
1652            partitioned.objects.len(),
1653            1,
1654            "Only the changed view should be staged, got: {:?}",
1655            partitioned
1656                .objects
1657                .iter()
1658                .map(|(id, _)| id.object())
1659                .collect::<Vec<_>>()
1660        );
1661        assert_eq!(partitioned.objects[0].0.object(), "my_view");
1662
1663        let (schema_set, cluster_set) =
1664            collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1665
1666        assert_eq!(schema_set.len(), 1);
1667        assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1668
1669        // Should stage index_cluster only, NOT source_cluster
1670        assert_eq!(
1671            cluster_set.len(),
1672            1,
1673            "Should only have index_cluster, got: {:?}",
1674            cluster_set
1675        );
1676        assert!(
1677            cluster_set.contains("index_cluster"),
1678            "Should stage index_cluster from the view's index"
1679        );
1680        assert!(
1681            !cluster_set.contains("source_cluster"),
1682            "Should NOT stage source_cluster"
1683        );
1684    }
1685
1686    fn make_empty_change_set() -> ChangeSet {
1687        ChangeSet {
1688            changed_objects: BTreeSet::new(),
1689            dirty_schemas: BTreeSet::new(),
1690            dirty_clusters: BTreeSet::new(),
1691            objects_to_deploy: BTreeSet::new(),
1692            new_replacement_objects: BTreeSet::new(),
1693            changed_replacement_objects: BTreeSet::new(),
1694        }
1695    }
1696
1697    #[mz_ore::test]
1698    fn test_validate_no_new_replacement_objects_first_deploy() {
1699        let cs = make_empty_change_set();
1700        let snapshot = DeploymentSnapshot::default();
1701        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1702    }
1703
1704    #[mz_ore::test]
1705    fn test_validate_new_replacement_objects_in_brand_new_schema() {
1706        let mut cs = make_empty_change_set();
1707        cs.new_replacement_objects.insert(ObjectId::new(
1708            "db".into(),
1709            "analytics".into(),
1710            "new_mv".into(),
1711        ));
1712
1713        // Production has objects in a *different* schema, not analytics
1714        let mut snapshot = DeploymentSnapshot::default();
1715        snapshot.objects.insert(
1716            ObjectId::new("db".into(), "public".into(), "existing_mv".into()),
1717            "hash1".into(),
1718        );
1719
1720        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1721    }
1722
1723    #[mz_ore::test]
1724    fn test_validate_new_replacement_objects_in_existing_production_schema() {
1725        let mut cs = make_empty_change_set();
1726        cs.new_replacement_objects.insert(ObjectId::new(
1727            "db".into(),
1728            "analytics".into(),
1729            "new_mv".into(),
1730        ));
1731
1732        // Production already has objects in analytics
1733        let mut snapshot = DeploymentSnapshot::default();
1734        snapshot.objects.insert(
1735            ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1736            "hash1".into(),
1737        );
1738
1739        let result = validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot);
1740        assert!(result.is_err());
1741        match result.unwrap_err() {
1742            CliError::NewObjectInExistingStableSchema {
1743                database,
1744                schema,
1745                objects,
1746            } => {
1747                assert_eq!(database, "db");
1748                assert_eq!(schema, "analytics");
1749                assert_eq!(objects, vec!["new_mv"]);
1750            }
1751            other => panic!("Expected NewObjectInExistingStableSchema, got: {:?}", other),
1752        }
1753    }
1754
1755    #[mz_ore::test]
1756    fn test_validate_changed_replacement_objects_only() {
1757        let mut cs = make_empty_change_set();
1758        // Only changed objects, no new ones
1759        cs.changed_replacement_objects.insert(ObjectId::new(
1760            "db".into(),
1761            "analytics".into(),
1762            "changed_mv".into(),
1763        ));
1764
1765        let mut snapshot = DeploymentSnapshot::default();
1766        snapshot.objects.insert(
1767            ObjectId::new("db".into(), "analytics".into(), "changed_mv".into()),
1768            "hash1".into(),
1769        );
1770
1771        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1772    }
1773
1774    #[mz_ore::test]
1775    fn test_validate_mixed_new_in_new_schema_changed_in_existing() {
1776        let mut cs = make_empty_change_set();
1777        // New object in a brand-new schema
1778        cs.new_replacement_objects.insert(ObjectId::new(
1779            "db".into(),
1780            "new_schema".into(),
1781            "new_mv".into(),
1782        ));
1783        // Changed object in an existing schema
1784        cs.changed_replacement_objects.insert(ObjectId::new(
1785            "db".into(),
1786            "existing_schema".into(),
1787            "changed_mv".into(),
1788        ));
1789
1790        // Production has objects only in existing_schema
1791        let mut snapshot = DeploymentSnapshot::default();
1792        snapshot.objects.insert(
1793            ObjectId::new("db".into(), "existing_schema".into(), "changed_mv".into()),
1794            "hash1".into(),
1795        );
1796
1797        // Should pass: the new object is in a schema with no production objects
1798        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1799    }
1800
1801    #[mz_ore::test]
1802    fn test_validate_transitioning_objects_in_existing_schema_allowed() {
1803        let mut cs = make_empty_change_set();
1804        // Object transitioning from Objects→Replacement lands in new_replacement_objects
1805        cs.new_replacement_objects.insert(ObjectId::new(
1806            "db".into(),
1807            "analytics".into(),
1808            "existing_mv".into(),
1809        ));
1810
1811        // The same object already exists in production (it's transitioning, not new)
1812        let mut snapshot = DeploymentSnapshot::default();
1813        snapshot.objects.insert(
1814            ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1815            "hash1".into(),
1816        );
1817
1818        // Should pass: the object already exists in production, it's just changing schema kind
1819        assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1820    }
1821
1822    fn make_planned_project_with_replacement_schemas(
1823        objects: Vec<(&str, &str, &str, DatabaseObject)>,
1824        replacement_schemas: BTreeSet<SchemaQualifier>,
1825    ) -> Project {
1826        let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1827
1828        for (database, schema, _name, typed_obj) in objects {
1829            db_map
1830                .entry(database.to_string())
1831                .or_default()
1832                .entry(schema.to_string())
1833                .or_default()
1834                .push(typed_obj);
1835        }
1836
1837        let databases: Vec<compiled::Database> = db_map
1838            .into_iter()
1839            .map(|(db_name, schemas)| compiled::Database {
1840                name: db_name,
1841                schemas: schemas
1842                    .into_iter()
1843                    .map(|(schema_name, objs)| compiled::Schema {
1844                        name: schema_name,
1845                        objects: objs,
1846                        mod_statements: None,
1847                    })
1848                    .collect(),
1849                mod_statements: None,
1850            })
1851            .collect();
1852
1853        let typed_project = compiled::Project {
1854            databases,
1855            replacement_schemas,
1856        };
1857
1858        Project::from(typed_project)
1859    }
1860
1861    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1862    #[mz_ore::test]
1863    fn test_build_snapshot_replacement_schema_kind() {
1864        let mv_obj =
1865            make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1866        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1867
1868        let mut replacement_schemas = BTreeSet::new();
1869        replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
1870
1871        let planned_project = make_planned_project_with_replacement_schemas(
1872            vec![
1873                ("db", "stable", "my_mv", mv_obj),
1874                ("db", "regular", "my_view", view_obj),
1875            ],
1876            replacement_schemas,
1877        );
1878
1879        let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1880
1881        // The stable schema should be Replacement
1882        assert_eq!(
1883            snapshot
1884                .schemas
1885                .get(&SchemaQualifier::new("db".into(), "stable".into())),
1886            Some(&DeploymentKind::Replacement),
1887            "Replacement schema should have Replacement kind in snapshot"
1888        );
1889
1890        // The regular schema should be Objects
1891        assert_eq!(
1892            snapshot
1893                .schemas
1894                .get(&SchemaQualifier::new("db".into(), "regular".into())),
1895            Some(&DeploymentKind::Objects),
1896            "Regular schema should have Objects kind in snapshot"
1897        );
1898    }
1899
1900    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1901    #[mz_ore::test]
1902    fn test_build_snapshot_no_replacement_schemas_all_objects() {
1903        let mv_obj =
1904            make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1905        let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1906
1907        let planned_project = make_planned_project(vec![
1908            ("db", "stable", "my_mv", mv_obj),
1909            ("db", "regular", "my_view", view_obj),
1910        ]);
1911
1912        let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1913
1914        // Both should be Objects when no replacement_schemas configured
1915        assert_eq!(
1916            snapshot
1917                .schemas
1918                .get(&SchemaQualifier::new("db".into(), "stable".into())),
1919            Some(&DeploymentKind::Objects),
1920        );
1921        assert_eq!(
1922            snapshot
1923                .schemas
1924                .get(&SchemaQualifier::new("db".into(), "regular".into())),
1925            Some(&DeploymentKind::Objects),
1926        );
1927    }
1928
1929    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1930    #[mz_ore::test]
1931    fn test_record_stage_metadata_transition_override() {
1932        // During an Objects→Replacement transition, MVs go through the regular
1933        // objects path (not replacement_mvs), but the metadata must still record
1934        // the schema as Replacement.
1935        let mv_obj =
1936            make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1937
1938        // Objects path (transition — MV is NOT in replacement_mvs)
1939        let objects: Vec<ObjectRef> = vec![(
1940            ObjectId::new("db".into(), "stable".into(), "my_mv".into()),
1941            &mv_obj,
1942        )];
1943        let sinks: Vec<ObjectRef> = vec![];
1944        let replacement_mvs: Vec<ObjectRef> = vec![];
1945
1946        // The project declares "stable" as a replacement schema
1947        let mut replacement_schemas = BTreeSet::new();
1948        replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
1949
1950        // Simulate what record_stage_metadata does (without DB calls)
1951        let mut staging_snapshot = DeploymentSnapshot::default();
1952
1953        for (object_id, typed_obj) in &objects {
1954            let hash = deployment_snapshot::compute_typed_hash(typed_obj);
1955            staging_snapshot.objects.insert(object_id.clone(), hash);
1956            staging_snapshot.schemas.insert(
1957                SchemaQualifier::new(
1958                    object_id.expect_database().to_string(),
1959                    object_id.schema().to_string(),
1960                ),
1961                DeploymentKind::Objects,
1962            );
1963        }
1964
1965        for (object_id, typed_obj) in &sinks {
1966            let hash = deployment_snapshot::compute_typed_hash(typed_obj);
1967            staging_snapshot.objects.insert(object_id.clone(), hash);
1968            staging_snapshot
1969                .schemas
1970                .entry(SchemaQualifier::new(
1971                    object_id.expect_database().to_string(),
1972                    object_id.schema().to_string(),
1973                ))
1974                .or_insert(DeploymentKind::Sinks);
1975        }
1976
1977        for (object_id, typed_obj) in &replacement_mvs {
1978            let hash = deployment_snapshot::compute_typed_hash(typed_obj);
1979            staging_snapshot.objects.insert(object_id.clone(), hash);
1980            staging_snapshot.schemas.insert(
1981                SchemaQualifier::new(
1982                    object_id.expect_database().to_string(),
1983                    object_id.schema().to_string(),
1984                ),
1985                DeploymentKind::Replacement,
1986            );
1987        }
1988
1989        // Before the fix, the schema would remain Objects here.
1990        assert_eq!(
1991            staging_snapshot
1992                .schemas
1993                .get(&SchemaQualifier::new("db".into(), "stable".into())),
1994            Some(&DeploymentKind::Objects),
1995            "Before override, schema should be Objects (from regular objects path)"
1996        );
1997
1998        // Apply the replacement_schemas override (the fix)
1999        for sq in &replacement_schemas {
2000            if staging_snapshot.schemas.contains_key(sq) {
2001                staging_snapshot
2002                    .schemas
2003                    .insert(sq.clone(), DeploymentKind::Replacement);
2004            }
2005        }
2006
2007        // After the fix, the schema should be Replacement
2008        assert_eq!(
2009            staging_snapshot
2010                .schemas
2011                .get(&SchemaQualifier::new("db".into(), "stable".into())),
2012            Some(&DeploymentKind::Replacement),
2013            "After override, schema should be Replacement"
2014        );
2015    }
2016
2017    #[mz_ore::test]
2018    fn test_record_stage_metadata_override_only_applies_to_existing_schemas() {
2019        // The override should NOT create new schema entries — it only applies to
2020        // schemas that already have objects in the staging snapshot.
2021        let replacement_schemas =
2022            BTreeSet::from([SchemaQualifier::new("db".into(), "nonexistent".into())]);
2023
2024        let mut staging_snapshot = DeploymentSnapshot::default();
2025
2026        // Apply the replacement_schemas override
2027        for sq in &replacement_schemas {
2028            if staging_snapshot.schemas.contains_key(sq) {
2029                staging_snapshot
2030                    .schemas
2031                    .insert(sq.clone(), DeploymentKind::Replacement);
2032            }
2033        }
2034
2035        // Should NOT have created a new entry
2036        assert!(
2037            staging_snapshot.schemas.is_empty(),
2038            "Override should not create entries for schemas with no objects"
2039        );
2040    }
2041}