Skip to main content

mz_deploy/cli/commands/
dev.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! `mz-deploy dev` — developer inner-loop overlay command.
11//!
12//! Creates per-developer overlay databases (`<base_db>__<profile>`) from
13//! the dirty subset of the project's views, materialized views, and indexes.
14//! Every overlay materialized view and index is rewritten to run on a single
15//! user-supplied target cluster. The overlay is drop-and-rebuilt on every
16//! invocation.
17//!
18//! Requires the `materialize_developer` role plus `CREATEDB` at run time.
19
20use std::collections::BTreeSet;
21
22use crate::cli::commands::ObjectRef;
23use crate::cli::error::CliError;
24use crate::client::{Client, quote_identifier};
25use crate::config::Settings;
26use crate::project::SchemaQualifier;
27use crate::project::analysis::changeset::ChangeSet;
28use crate::project::analysis::deployment_snapshot;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::FullyQualifiedName;
31use crate::project::resolve::normalize::NormalizingVisitor;
32use crate::{info, verbose};
33
34/// Overlay database name convention: `<base_db>__<profile>`.
35fn overlay_db_name(base_db: &str, profile: &str) -> String {
36    format!("{}__{}", base_db, profile)
37}
38
39/// Refuse to proceed if the user-supplied target cluster hosts a promoted
40/// deployment.
41async fn refuse_if_targets_production_cluster(
42    client: &Client,
43    cluster: &str,
44) -> Result<(), CliError> {
45    let production = client.deployments().list_production_clusters().await?;
46    if let Some(rec) = production.into_iter().find(|r| r.cluster_name == cluster) {
47        return Err(CliError::DevTargetsProductionCluster { cluster: rec });
48    }
49    Ok(())
50}
51
52/// Top-level entry point for `mz-deploy dev`.
53///
54/// Orchestrates role/privilege validation, dirty-set computation, plan
55/// printing, and the drop+create DDL phases.
56///
57/// * `cluster` — target cluster for every overlay MV and index. Required
58///   unless `down` is set (clap enforces this).
59/// * `down` — when `true`, only run the drop phase and exit immediately.
60/// * `dry_run` — when `true`, print the plan but issue no DDL.
61pub async fn run(
62    settings: &Settings,
63    cluster: Option<String>,
64    down: bool,
65    dry_run: bool,
66) -> Result<(), CliError> {
67    let profile = settings.connection();
68    // `dev` always loads with `needs_connection: true`, so a profile must be set.
69    let profile_name = settings
70        .profile_name
71        .clone()
72        .expect("dev requires an active profile");
73    let project_name = settings
74        .directory
75        .file_name()
76        .and_then(|s| s.to_str())
77        .filter(|s| !s.is_empty())
78        .ok_or_else(|| CliError::InvalidProjectDirectory {
79            path: settings.directory.display().to_string(),
80        })?
81        .to_string();
82
83    let planned_project = super::compile::run(settings, true).await?;
84
85    let in_project_databases: BTreeSet<String> = planned_project
86        .databases
87        .iter()
88        .map(|db| db.name.clone())
89        .collect();
90
91    let client = Client::connect_with_profile(profile.clone())
92        .await
93        .map_err(CliError::Connection)?;
94
95    crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
96    let role =
97        crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
98    crate::cli::commands::setup::require_developer(role)?;
99
100    if in_project_databases.is_empty() {
101        info!("Project has no databases — nothing to overlay.");
102        return Ok(());
103    }
104
105    let sample_overlay_db = overlay_db_name(
106        in_project_databases.iter().next().expect("non-empty"),
107        &profile_name,
108    );
109    crate::cli::commands::setup::require_createdb(&client, &profile.username, &sample_overlay_db)
110        .await?;
111
112    if down {
113        drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
114        info!("Overlay removed.");
115        return Ok(());
116    }
117
118    // clap guarantees `cluster` is `Some` whenever `down` is false.
119    let target_cluster = cluster.expect("cluster required unless --down");
120    refuse_if_targets_production_cluster(&client, &target_cluster).await?;
121
122    let new_snapshot = deployment_snapshot::build_snapshot_from_planned(&planned_project)?;
123    let production_snapshot = deployment_snapshot::load_from_database(&client, None).await?;
124
125    // Empty production → full overlay (first-run semantics matching stage).
126    let change_set = if production_snapshot.objects.is_empty() {
127        verbose!("Full deployment: no production deployment found");
128        None
129    } else {
130        Some(ChangeSet::from_deployment_snapshot_comparison(
131            &production_snapshot,
132            &new_snapshot,
133            &planned_project,
134            &BTreeSet::new(),
135        ))
136    };
137
138    let all_objects = match change_set.as_ref() {
139        Some(cs) if cs.is_empty() => Vec::new(),
140        Some(cs) => {
141            verbose!("{}", cs);
142            planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?
143        }
144        None => planned_project.get_sorted_objects()?,
145    };
146
147    let mut skipped = 0usize;
148    let overlay_objects: Vec<ObjectRef<'_>> = all_objects
149        .into_iter()
150        .filter(|(_, typed_obj)| match &typed_obj.stmt {
151            Statement::CreateView(_) | Statement::CreateMaterializedView(_) => true,
152            _ => {
153                skipped += 1;
154                false
155            }
156        })
157        .collect();
158    if skipped > 0 {
159        verbose!(
160            "skipped {} object(s) of unsupported type (tables/sources/sinks)",
161            skipped
162        );
163    }
164
165    let dirty_schemas: BTreeSet<SchemaQualifier> = overlay_objects
166        .iter()
167        .map(|(id, _)| {
168            SchemaQualifier::new(id.expect_database().to_string(), id.schema().to_string())
169        })
170        .collect();
171
172    print_plan(&dirty_schemas, &profile_name);
173
174    if dry_run {
175        return Ok(());
176    }
177
178    drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
179
180    if dirty_schemas.is_empty() {
181        info!("Dev overlay ready (nothing to overlay).");
182        return Ok(());
183    }
184
185    create_phase(
186        &client,
187        &profile_name,
188        &project_name,
189        &in_project_databases,
190        &dirty_schemas,
191        &overlay_objects,
192        &target_cluster,
193    )
194    .await?;
195
196    info!("Dev overlay ready.");
197    Ok(())
198}
199
200fn print_plan(dirty_schemas: &BTreeSet<SchemaQualifier>, profile_name: &str) {
201    if dirty_schemas.is_empty() {
202        info!("Dirty set is empty — nothing to overlay.");
203        return;
204    }
205    info!("→ Dirty schemas:");
206    for qual in dirty_schemas {
207        info!("    {}.{}", qual.database, qual.schema);
208    }
209
210    let overlay_dbs: BTreeSet<String> = dirty_schemas
211        .iter()
212        .map(|q| overlay_db_name(&q.database, profile_name))
213        .collect();
214    info!("→ Overlay databases:");
215    for db in &overlay_dbs {
216        info!("    {}", db);
217    }
218}
219
220/// Phase 1 of the dev rebuild: drop every overlay database recorded for
221/// this `(profile, project)` pair, then purge the manifest rows. Finally
222/// sweep any in-project `<base_db>__<profile>` names not in the manifest
223/// (catalog restore, interrupted prior run).
224pub(crate) async fn drop_phase(
225    client: &Client,
226    profile_name: &str,
227    project_name: &str,
228    in_project_databases: &BTreeSet<String>,
229) -> Result<(), CliError> {
230    let overlays = client.dev_overlays();
231
232    let existing: BTreeSet<String> = overlays
233        .list_overlays(profile_name, project_name)
234        .await?
235        .into_iter()
236        .collect();
237    for db in &existing {
238        drop_database(client, db).await?;
239    }
240    overlays.delete_overlays(profile_name, project_name).await?;
241
242    for base_db in in_project_databases {
243        let overlay_db = overlay_db_name(base_db, profile_name);
244        if !existing.contains(&overlay_db) {
245            drop_database(client, &overlay_db).await?;
246        }
247    }
248
249    Ok(())
250}
251
252async fn drop_database(client: &Client, database: &str) -> Result<(), CliError> {
253    let sql = format!(
254        "DROP DATABASE IF EXISTS {} CASCADE",
255        quote_identifier(database),
256    );
257    client.execute(&sql, &[]).await?;
258    Ok(())
259}
260
261/// Phase 2 of the dev rebuild: create overlay databases, schemas, and objects.
262///
263/// Per dirty schema we issue `CREATE DATABASE IF NOT EXISTS <overlay_db>`,
264/// insert a manifest row (so `drop_phase` can always reach it even if we crash
265/// mid-run), then `CREATE SCHEMA IF NOT EXISTS`. Objects are emitted in
266/// dependency order with references rewritten through `OverlayTransformer`
267/// and every `IN CLUSTER` clause rewritten to `target_cluster`.
268pub(crate) async fn create_phase(
269    client: &Client,
270    profile_name: &str,
271    project_name: &str,
272    in_project_databases: &BTreeSet<String>,
273    dirty_schemas: &BTreeSet<SchemaQualifier>,
274    overlay_objects: &[ObjectRef<'_>],
275    target_cluster: &str,
276) -> Result<(), CliError> {
277    let provisioning = client.provisioning();
278    let overlays = client.dev_overlays();
279
280    let mut created_overlay_dbs: BTreeSet<String> = BTreeSet::new();
281    for qualifier in dirty_schemas {
282        let overlay_db = overlay_db_name(&qualifier.database, profile_name);
283        if created_overlay_dbs.insert(overlay_db.clone()) {
284            provisioning.create_database(&overlay_db).await?;
285            overlays
286                .insert_overlay(profile_name, project_name, &overlay_db)
287                .await?;
288        }
289    }
290
291    for qualifier in dirty_schemas {
292        let overlay_db = overlay_db_name(&qualifier.database, profile_name);
293        provisioning
294            .create_schema(&overlay_db, &qualifier.schema)
295            .await?;
296    }
297
298    for (object_id, typed_object) in overlay_objects {
299        let original_fqn: FullyQualifiedName = object_id.clone().into();
300        let mut visitor = NormalizingVisitor::overlay(
301            &original_fqn,
302            profile_name,
303            in_project_databases,
304            dirty_schemas,
305            target_cluster,
306        );
307
308        let stmt = typed_object
309            .stmt
310            .clone()
311            .normalize_name_with(&visitor, &original_fqn.to_item_name())
312            .normalize_dependencies_with(&mut visitor)
313            .normalize_cluster_with(&visitor);
314
315        client.execute(&stmt.to_string(), &[]).await?;
316
317        let mut indexes = typed_object.indexes.clone();
318        visitor.normalize_index_references(&mut indexes);
319        visitor.normalize_index_clusters(&mut indexes);
320        for index in &indexes {
321            client.execute(&index.to_string(), &[]).await?;
322        }
323    }
324
325    Ok(())
326}