Skip to main content

mz_deploy/cli/commands/
dev.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! `mz-deploy dev` — developer inner-loop overlay command.
11//!
12//! Creates per-developer overlay databases (`<base_db>__<profile>`) from
13//! the dirty subset of the project's views, materialized views, and indexes.
14//! Every overlay materialized view and index is rewritten to run on a single
15//! user-supplied target cluster. The overlay is drop-and-rebuilt on every
16//! invocation.
17//!
18//! Requires the `materialize_developer` role plus `CREATEDB` at run time.
19
20use std::collections::BTreeSet;
21
22use crate::cli::commands::ObjectRef;
23use crate::cli::error::CliError;
24use crate::client::{Client, quote_identifier};
25use crate::config::Settings;
26use crate::project::SchemaQualifier;
27use crate::project::analysis::changeset::ChangeSet;
28use crate::project::analysis::deployment_snapshot;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::FullyQualifiedName;
31use crate::project::resolve::normalize::NormalizingVisitor;
32use crate::{info, verbose};
33
34/// Overlay database name convention: `<base_db>__<profile>`.
35fn overlay_db_name(base_db: &str, profile: &str) -> String {
36    format!("{}__{}", base_db, profile)
37}
38
39/// Refuse to proceed if the user-supplied target cluster hosts a promoted
40/// deployment.
41async fn refuse_if_targets_production_cluster(
42    client: &Client,
43    cluster: &str,
44) -> Result<(), CliError> {
45    let production = client.deployments().list_production_clusters().await?;
46    if let Some(rec) = production.into_iter().find(|r| r.cluster_name == cluster) {
47        return Err(CliError::DevTargetsProductionCluster { cluster: rec });
48    }
49    Ok(())
50}
51
52/// Top-level entry point for `mz-deploy dev`.
53///
54/// Orchestrates role/privilege validation, dirty-set computation, plan
55/// printing, and the drop+create DDL phases.
56///
57/// * `cluster` — target cluster for every overlay MV and index. Required
58///   unless `down` is set (clap enforces this).
59/// * `down` — when `true`, only run the drop phase and exit immediately.
60/// * `dry_run` — when `true`, print the plan but issue no DDL.
61pub async fn run(
62    settings: &Settings,
63    cluster: Option<String>,
64    down: bool,
65    dry_run: bool,
66) -> Result<(), CliError> {
67    let profile = settings.connection();
68    // `dev` always loads with `needs_connection: true`, so a profile must be set.
69    let profile_name = settings
70        .profile_name
71        .clone()
72        .expect("dev requires an active profile");
73    let project_name = settings
74        .directory
75        .file_name()
76        .and_then(|s| s.to_str())
77        .filter(|s| !s.is_empty())
78        .ok_or_else(|| CliError::InvalidProjectDirectory {
79            path: settings.directory.display().to_string(),
80        })?
81        .to_string();
82
83    let planned_project = super::compile::run(settings, true).await?;
84
85    let in_project_databases: BTreeSet<String> = planned_project
86        .databases
87        .iter()
88        .map(|db| db.name.clone())
89        .collect();
90
91    let client = Client::connect_with_profile(profile.clone())
92        .await
93        .map_err(CliError::Connection)?;
94
95    crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
96    let role =
97        crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
98    crate::cli::commands::setup::require_developer(role)?;
99
100    if in_project_databases.is_empty() {
101        info!("Project has no databases — nothing to overlay.");
102        return Ok(());
103    }
104
105    let sample_overlay_db = overlay_db_name(
106        in_project_databases.iter().next().expect("non-empty"),
107        &profile_name,
108    );
109    crate::cli::commands::setup::require_createdb(&client, &profile.username, &sample_overlay_db)
110        .await?;
111
112    if down {
113        drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
114        info!("Overlay removed.");
115        return Ok(());
116    }
117
118    // clap guarantees `cluster` is `Some` whenever `down` is false.
119    let target_cluster = cluster.expect("cluster required unless --down");
120    refuse_if_targets_production_cluster(&client, &target_cluster).await?;
121
122    let new_snapshot = deployment_snapshot::build_snapshot_from_planned(&planned_project)?;
123    let production_snapshot = deployment_snapshot::load_from_database(&client, None).await?;
124
125    // Empty production → full overlay (first-run semantics matching stage).
126    let change_set = if production_snapshot.objects.is_empty() {
127        verbose!("Full deployment: no production deployment found");
128        None
129    } else {
130        Some(ChangeSet::from_deployment_snapshot_comparison(
131            &production_snapshot,
132            &new_snapshot,
133            &planned_project,
134        ))
135    };
136
137    let all_objects = match change_set.as_ref() {
138        Some(cs) if cs.is_empty() => Vec::new(),
139        Some(cs) => {
140            verbose!("{}", cs);
141            planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?
142        }
143        None => planned_project.get_sorted_objects()?,
144    };
145
146    let mut skipped = 0usize;
147    let overlay_objects: Vec<ObjectRef<'_>> = all_objects
148        .into_iter()
149        .filter(|(_, typed_obj)| match &typed_obj.stmt {
150            Statement::CreateView(_) | Statement::CreateMaterializedView(_) => true,
151            _ => {
152                skipped += 1;
153                false
154            }
155        })
156        .collect();
157    if skipped > 0 {
158        verbose!(
159            "skipped {} object(s) of unsupported type (tables/sources/sinks)",
160            skipped
161        );
162    }
163
164    let dirty_schemas: BTreeSet<SchemaQualifier> = overlay_objects
165        .iter()
166        .map(|(id, _)| {
167            SchemaQualifier::new(id.expect_database().to_string(), id.schema().to_string())
168        })
169        .collect();
170
171    print_plan(&dirty_schemas, &profile_name);
172
173    if dry_run {
174        return Ok(());
175    }
176
177    drop_phase(&client, &profile_name, &project_name, &in_project_databases).await?;
178
179    if dirty_schemas.is_empty() {
180        info!("Dev overlay ready (nothing to overlay).");
181        return Ok(());
182    }
183
184    create_phase(
185        &client,
186        &profile_name,
187        &project_name,
188        &in_project_databases,
189        &dirty_schemas,
190        &overlay_objects,
191        &target_cluster,
192    )
193    .await?;
194
195    info!("Dev overlay ready.");
196    Ok(())
197}
198
199fn print_plan(dirty_schemas: &BTreeSet<SchemaQualifier>, profile_name: &str) {
200    if dirty_schemas.is_empty() {
201        info!("Dirty set is empty — nothing to overlay.");
202        return;
203    }
204    info!("→ Dirty schemas:");
205    for qual in dirty_schemas {
206        info!("    {}.{}", qual.database, qual.schema);
207    }
208
209    let overlay_dbs: BTreeSet<String> = dirty_schemas
210        .iter()
211        .map(|q| overlay_db_name(&q.database, profile_name))
212        .collect();
213    info!("→ Overlay databases:");
214    for db in &overlay_dbs {
215        info!("    {}", db);
216    }
217}
218
219/// Phase 1 of the dev rebuild: drop every overlay database recorded for
220/// this `(profile, project)` pair, then purge the manifest rows. Finally
221/// sweep any in-project `<base_db>__<profile>` names not in the manifest
222/// (catalog restore, interrupted prior run).
223pub(crate) async fn drop_phase(
224    client: &Client,
225    profile_name: &str,
226    project_name: &str,
227    in_project_databases: &BTreeSet<String>,
228) -> Result<(), CliError> {
229    let overlays = client.dev_overlays();
230
231    let existing: BTreeSet<String> = overlays
232        .list_overlays(profile_name, project_name)
233        .await?
234        .into_iter()
235        .collect();
236    for db in &existing {
237        drop_database(client, db).await?;
238    }
239    overlays.delete_overlays(profile_name, project_name).await?;
240
241    for base_db in in_project_databases {
242        let overlay_db = overlay_db_name(base_db, profile_name);
243        if !existing.contains(&overlay_db) {
244            drop_database(client, &overlay_db).await?;
245        }
246    }
247
248    Ok(())
249}
250
251async fn drop_database(client: &Client, database: &str) -> Result<(), CliError> {
252    let sql = format!(
253        "DROP DATABASE IF EXISTS {} CASCADE",
254        quote_identifier(database),
255    );
256    client.execute(&sql, &[]).await?;
257    Ok(())
258}
259
260/// Phase 2 of the dev rebuild: create overlay databases, schemas, and objects.
261///
262/// Per dirty schema we issue `CREATE DATABASE IF NOT EXISTS <overlay_db>`,
263/// insert a manifest row (so `drop_phase` can always reach it even if we crash
264/// mid-run), then `CREATE SCHEMA IF NOT EXISTS`. Objects are emitted in
265/// dependency order with references rewritten through `OverlayTransformer`
266/// and every `IN CLUSTER` clause rewritten to `target_cluster`.
267pub(crate) async fn create_phase(
268    client: &Client,
269    profile_name: &str,
270    project_name: &str,
271    in_project_databases: &BTreeSet<String>,
272    dirty_schemas: &BTreeSet<SchemaQualifier>,
273    overlay_objects: &[ObjectRef<'_>],
274    target_cluster: &str,
275) -> Result<(), CliError> {
276    let provisioning = client.provisioning();
277    let overlays = client.dev_overlays();
278
279    let mut created_overlay_dbs: BTreeSet<String> = BTreeSet::new();
280    for qualifier in dirty_schemas {
281        let overlay_db = overlay_db_name(&qualifier.database, profile_name);
282        if created_overlay_dbs.insert(overlay_db.clone()) {
283            provisioning.create_database(&overlay_db).await?;
284            overlays
285                .insert_overlay(profile_name, project_name, &overlay_db)
286                .await?;
287        }
288    }
289
290    for qualifier in dirty_schemas {
291        let overlay_db = overlay_db_name(&qualifier.database, profile_name);
292        provisioning
293            .create_schema(&overlay_db, &qualifier.schema)
294            .await?;
295    }
296
297    for (object_id, typed_object) in overlay_objects {
298        let original_fqn: FullyQualifiedName = object_id.clone().into();
299        let mut visitor = NormalizingVisitor::overlay(
300            &original_fqn,
301            profile_name,
302            in_project_databases,
303            dirty_schemas,
304            target_cluster,
305        );
306
307        let stmt = typed_object
308            .stmt
309            .clone()
310            .normalize_name_with(&visitor, &original_fqn.to_item_name())
311            .normalize_dependencies_with(&mut visitor)
312            .normalize_cluster_with(&visitor);
313
314        client.execute(&stmt.to_string(), &[]).await?;
315
316        let mut indexes = typed_object.indexes.clone();
317        visitor.normalize_index_references(&mut indexes);
318        visitor.normalize_index_clusters(&mut indexes);
319        for index in &indexes {
320            client.execute(&index.to_string(), &[]).await?;
321        }
322    }
323
324    Ok(())
325}