Skip to main content

mz_deploy/cli/
executor.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment execution utilities.
11//!
12//! This module contains the `DeploymentExecutor` for running SQL statements
13//! during deployment, along with helper functions for collecting deployment
14//! metadata and generating environment names.
15//!
16//! ## ApplyPlan Lifecycle
17//!
18//! ```text
19//! compile project → connect client → plan phases → execute
20//!                                        │
21//!                     ┌──────────────────┼───────────────────┐
22//!                     ▼                  ▼                   ▼
23//!              prepare_schemas     add_phase(...)      execute(&client)
24//!              (CREATE DB/SCHEMA)  (dry-run SQL)       (run all SQL)
25//! ```
26//!
27//! 1. **Compile** — `compile_apply_project_and_connect` loads and validates the project.
28//! 2. **Plan** — Each apply subcommand calls `DeploymentExecutor::new_dry_run()` to
29//!    collect SQL without executing it, then packages results as `ApplyResult` phases.
30//! 3. **Execute** — `ApplyPlan::execute()` runs setup statements first, then per-phase
31//!    object statements. Objects sharing a `transaction_group` key are wrapped in a
32//!    single `BEGIN`/`COMMIT` block with automatic `ROLLBACK` on failure.
33
34use crate::cli::CliError;
35use crate::cli::git::get_git_commit;
36use crate::client::{Client, ClusterConfig, quote_identifier};
37use crate::config::Settings;
38use crate::project::analysis::deployment_snapshot::DeploymentMetadata;
39use crate::project::ir::graph::Project;
40use crate::project::resolve::normalize;
41use crate::project::{self, ir::compiled};
42use crate::{info, verbose};
43use owo_colors::{OwoColorize, Stream};
44use serde::Serialize;
45use std::cell::RefCell;
46use std::collections::BTreeSet;
47use std::fmt;
48use std::path::Path;
49
50/// What happened when applying a single object.
51#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
52#[serde(rename_all = "snake_case")]
53pub enum ObjectAction {
54    Created,
55    Altered,
56    UpToDate,
57    Skipped,
58}
59
60impl fmt::Display for ObjectAction {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        match self {
63            ObjectAction::Created => write!(f, "created"),
64            ObjectAction::Altered => write!(f, "altered"),
65            ObjectAction::UpToDate => write!(f, "up_to_date"),
66            ObjectAction::Skipped => write!(f, "skipped"),
67        }
68    }
69}
70
71/// Result of applying a single object (cluster, role, connection, etc.).
72#[derive(Clone, Serialize)]
73pub struct ObjectResult {
74    /// Fully-qualified object name, e.g. "materialize.raw.pgconn".
75    pub object: String,
76    /// What happened: created, altered, up_to_date, skipped.
77    pub action: ObjectAction,
78    /// SQL statements that were (or would be) executed.
79    pub statements: Vec<String>,
80    /// SQL statements that must be executed but contain sensitive values
81    /// (e.g. CREATE SECRET, ALTER SECRET). These are never serialized or displayed.
82    #[serde(skip)]
83    pub redacted_statements: Vec<String>,
84    /// Optional transaction group key. Objects with the same group key are
85    /// executed inside a single BEGIN/COMMIT transaction block.
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub transaction_group: Option<String>,
88}
89
90impl fmt::Display for ObjectResult {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        match self.action {
93            ObjectAction::Created | ObjectAction::Altered => {
94                write!(
95                    f,
96                    "  {} {}",
97                    "✓".if_supports_color(Stream::Stderr, |t| t.green()),
98                    self.object
99                )
100            }
101            ObjectAction::UpToDate => write!(
102                f,
103                "  {} {} ({})",
104                "=".if_supports_color(Stream::Stderr, |t| t.dimmed()),
105                self.object,
106                "up to date".if_supports_color(Stream::Stderr, |t| t.dimmed())
107            ),
108            ObjectAction::Skipped => write!(
109                f,
110                "  {} {} ({})",
111                "-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
112                self.object,
113                "skipped".if_supports_color(Stream::Stderr, |t| t.dimmed())
114            ),
115        }
116    }
117}
118
119/// Result of applying one phase (e.g. clusters, connections, etc.).
120#[derive(Clone, Serialize)]
121pub struct ApplyResult {
122    /// Phase name: "clusters", "roles", "connections", etc.
123    pub phase: String,
124    /// Per-object results.
125    pub results: Vec<ObjectResult>,
126}
127
128impl fmt::Display for ApplyResult {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        if self.results.is_empty() {
131            return Ok(());
132        }
133
134        let created = self
135            .results
136            .iter()
137            .filter(|r| r.action == ObjectAction::Created)
138            .count();
139        let altered = self
140            .results
141            .iter()
142            .filter(|r| r.action == ObjectAction::Altered)
143            .count();
144        let up_to_date = self
145            .results
146            .iter()
147            .filter(|r| r.action == ObjectAction::UpToDate)
148            .count();
149
150        let label = &self.phase;
151        let mut lines = Vec::new();
152
153        if up_to_date > 0 && created == 0 && altered == 0 {
154            lines.push(format!(
155                "  {} {} {} up to date",
156                "✓".if_supports_color(Stream::Stderr, |t| t.green()),
157                up_to_date,
158                label
159            ));
160        }
161
162        for r in &self.results {
163            if r.action != ObjectAction::UpToDate {
164                lines.push(format!("{}", r));
165            }
166        }
167
168        write!(f, "{}", lines.join("\n"))
169    }
170}
171
172/// A complete apply plan: global setup + ordered phase results.
173/// Built incrementally by adding phases, then executed as a unit.
174#[derive(Serialize)]
175pub struct ApplyPlan {
176    /// Global setup SQL (CREATE DATABASE/SCHEMA, mod_statements).
177    /// Deduplicated across all phases.
178    pub setup_statements: Vec<String>,
179    /// Per-phase results in dependency order.
180    pub phases: Vec<ApplyResult>,
181    /// Tracks which schemas have already been prepared (for deduplication).
182    #[serde(skip)]
183    prepared_schemas: BTreeSet<project::SchemaQualifier>,
184}
185
186/// A batch of objects to execute together, optionally inside a transaction.
187struct ExecutionBatch<'a> {
188    /// If Some, these objects are wrapped in BEGIN/COMMIT.
189    transaction_group: Option<&'a str>,
190    /// The objects in this batch.
191    objects: Vec<&'a ObjectResult>,
192}
193
194impl ApplyPlan {
195    pub fn new() -> Self {
196        Self {
197            setup_statements: Vec::new(),
198            phases: Vec::new(),
199            prepared_schemas: BTreeSet::new(),
200        }
201    }
202
203    /// Prepare databases and schemas for the given schema set.
204    /// Deduplicates against previously prepared schemas.
205    pub async fn prepare_schemas(
206        &mut self,
207        executor: &DeploymentExecutor<'_>,
208        planned_project: &Project,
209        schema_set: &BTreeSet<project::SchemaQualifier>,
210    ) -> Result<(), CliError> {
211        let new_schemas: BTreeSet<_> = schema_set
212            .difference(&self.prepared_schemas)
213            .cloned()
214            .collect();
215        if new_schemas.is_empty() {
216            return Ok(());
217        }
218        executor
219            .prepare_databases_and_schemas(planned_project, &new_schemas, None)
220            .await?;
221        self.setup_statements.extend(executor.take_statements());
222        self.prepared_schemas.extend(new_schemas);
223        Ok(())
224    }
225
226    /// Add a completed phase result.
227    pub fn add_phase(&mut self, result: ApplyResult) {
228        self.phases.push(result);
229    }
230
231    /// Execute: run global setup, then each phase's per-object SQL.
232    ///
233    /// Objects that share a `transaction_group` key are wrapped in a single
234    /// BEGIN/COMMIT block. On error inside a transaction, ROLLBACK is issued
235    /// before returning the error.
236    pub async fn execute(&self, client: &Client) -> Result<(), CliError> {
237        // Phase 1: setup statements
238        for sql in &self.setup_statements {
239            client
240                .execute(sql, &[])
241                .await
242                .map_err(|source| CliError::SqlExecutionFailed {
243                    statement: sql.clone(),
244                    source,
245                })?;
246        }
247
248        // Phase 2: Group objects into execution batches
249        let mut batches: Vec<ExecutionBatch<'_>> = Vec::new();
250        for phase in &self.phases {
251            for obj in &phase.results {
252                let obj_txn = obj.transaction_group.as_deref();
253                match batches.last_mut() {
254                    Some(batch) if batch.transaction_group == obj_txn && obj_txn.is_some() => {
255                        // Same transaction group — append to current batch
256                        batch.objects.push(obj);
257                    }
258                    _ => {
259                        // New batch (different group, no group, or first object)
260                        batches.push(ExecutionBatch {
261                            transaction_group: obj_txn,
262                            objects: vec![obj],
263                        });
264                    }
265                }
266            }
267        }
268
269        // Phase 3: Execute batches
270        for batch in &batches {
271            let in_txn = batch.transaction_group.is_some();
272            if in_txn {
273                client.execute("BEGIN", &[]).await.map_err(|source| {
274                    CliError::SqlExecutionFailed {
275                        statement: "BEGIN".to_string(),
276                        source,
277                    }
278                })?;
279            }
280
281            for obj in &batch.objects {
282                for sql in obj.redacted_statements.iter().chain(&obj.statements) {
283                    if let Err(e) = client.execute(sql, &[]).await {
284                        if in_txn {
285                            let _ = client.execute("ROLLBACK", &[]).await;
286                        }
287                        return Err(CliError::SqlExecutionFailed {
288                            statement: if obj.redacted_statements.contains(sql) {
289                                "[REDACTED — contains secret value]".to_string()
290                            } else {
291                                sql.clone()
292                            },
293                            source: e,
294                        });
295                    }
296                }
297            }
298
299            if in_txn {
300                client.execute("COMMIT", &[]).await.map_err(|source| {
301                    CliError::SqlExecutionFailed {
302                        statement: "COMMIT".to_string(),
303                        source,
304                    }
305                })?;
306            }
307        }
308
309        Ok(())
310    }
311}
312
313impl fmt::Display for ApplyPlan {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        let mut first = true;
316        for phase in &self.phases {
317            if phase.results.is_empty() {
318                continue;
319            }
320            if !first {
321                writeln!(f)?;
322            }
323            write!(f, "{}", phase)?;
324            first = false;
325        }
326        Ok(())
327    }
328}
329
330/// Collect deployment metadata (user and git commit).
331///
332/// This function retrieves the current database user and git commit hash
333/// for recording deployment provenance. If the current user cannot be
334/// determined, it defaults to "unknown".
335pub async fn collect_deployment_metadata(client: &Client, directory: &Path) -> DeploymentMetadata {
336    let deployed_by = client
337        .introspection()
338        .get_current_user()
339        .await
340        .unwrap_or_else(|e| {
341            info!("warning: failed to get current user: {}", e);
342            "unknown".to_string()
343        });
344
345    let git_commit = get_git_commit(directory);
346
347    DeploymentMetadata {
348        deployed_by,
349        git_commit,
350    }
351}
352
353/// Connect a planning client for apply commands.
354///
355/// Validates the connection and requires the `materialize_deployer` role.
356pub async fn connect_apply_client(settings: &Settings) -> Result<Client, CliError> {
357    let client = Client::connect_with_profile(settings.connection().clone())
358        .await
359        .map_err(CliError::Connection)?;
360    let role =
361        crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
362    crate::cli::commands::setup::require_deployer(role)?;
363    Ok(client)
364}
365
366/// Compile the project (without type checking) and connect a planning client
367/// for database-object apply commands.
368pub async fn compile_apply_project_and_connect(
369    settings: &Settings,
370) -> Result<(Project, Client), CliError> {
371    let planned_project = crate::cli::commands::compile::run_without_typecheck(
372        settings,
373        !crate::log::json_output_enabled(),
374    )
375    .await?;
376    let client = connect_apply_client(settings).await?;
377    Ok((planned_project, client))
378}
379
380/// Generate a unique 7-character hex identifier for deployments when no
381/// explicit environment name is provided.
382pub fn generate_random_env_name() -> String {
383    use sha2::{Digest, Sha256};
384    use std::time::SystemTime;
385
386    let now = SystemTime::now()
387        .duration_since(SystemTime::UNIX_EPOCH)
388        .expect("system time before Unix epoch")
389        .as_nanos();
390
391    let mut hasher = Sha256::new();
392    hasher.update(now.to_le_bytes());
393    let hash = hasher.finalize();
394
395    // Take first 4 bytes of hash and format as 7-char hex
396    format!(
397        "{:07x}",
398        u32::from_le_bytes([hash[0], hash[1], hash[2], hash[3]]) & 0xFFFFFFF
399    )
400}
401
402/// Helper for executing database object deployments.
403///
404/// This struct consolidates the pattern of executing a database object's
405/// SQL statements (main statement + indexes + grants + comments) with
406/// consistent error handling. Supports dry-run mode where SQL is printed
407/// instead of executed.
408///
409/// In dry-run mode, statements are always recorded in an internal log
410/// that can be drained per-object via `take_statements()`.
411pub struct DeploymentExecutor<'a> {
412    client: &'a Client,
413    dry_run: bool,
414    statement_log: RefCell<Vec<String>>,
415}
416
417impl<'a> DeploymentExecutor<'a> {
418    /// Create a new deployment executor that executes SQL.
419    pub fn new(client: &'a Client) -> Self {
420        Self {
421            client,
422            dry_run: false,
423            statement_log: RefCell::new(Vec::new()),
424        }
425    }
426
427    /// Create a deployment executor that always runs in dry-run mode (for planning).
428    pub fn new_dry_run(client: &'a Client) -> Self {
429        Self {
430            client,
431            dry_run: true,
432            statement_log: RefCell::new(Vec::new()),
433        }
434    }
435
436    /// Create a deployment executor with configurable dry-run mode.
437    pub fn with_dry_run(client: &'a Client, dry_run: bool) -> Self {
438        Self {
439            client,
440            dry_run,
441            statement_log: RefCell::new(Vec::new()),
442        }
443    }
444
445    /// Returns true if this executor is in dry-run mode.
446    pub fn is_dry_run(&self) -> bool {
447        self.dry_run
448    }
449
450    /// Drain and return all statements recorded since the last call.
451    ///
452    /// Use this between objects to capture per-object statement lists
453    /// for `ObjectResult`.
454    pub fn take_statements(&self) -> Vec<String> {
455        self.statement_log.borrow_mut().drain(..).collect()
456    }
457
458    /// Execute all SQL statements for a database object.
459    ///
460    /// This executes the main CREATE statement, followed by any indexes,
461    /// grants, and comments associated with the object.
462    pub async fn execute_object(
463        &self,
464        typed_obj: &compiled::DatabaseObject,
465    ) -> Result<(), CliError> {
466        // Execute main statement
467        self.execute_sql(&typed_obj.stmt).await?;
468
469        // Execute indexes
470        for index in &typed_obj.indexes {
471            self.execute_sql(index).await?;
472        }
473
474        // Execute grants
475        for grant in &typed_obj.grants {
476            self.execute_sql(grant).await?;
477        }
478
479        // Execute comments
480        for comment in &typed_obj.comments {
481            self.execute_sql(comment).await?;
482        }
483
484        Ok(())
485    }
486
487    /// Create databases and schemas for `schema_set`, then execute filtered mod_statements.
488    ///
489    /// When `staging_suffix` is `Some`, schema names are suffixed and mod_statement
490    /// schema references are rewritten at the AST level to target staging schemas.
491    pub async fn prepare_databases_and_schemas(
492        &self,
493        planned_project: &Project,
494        schema_set: &BTreeSet<project::SchemaQualifier>,
495        staging_suffix: Option<&str>,
496    ) -> Result<(), CliError> {
497        if schema_set.is_empty() {
498            return Ok(());
499        }
500
501        // Step 1: Create databases
502        let databases: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
503        for db in &databases {
504            let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(db));
505            self.execute_sql(&sql).await?;
506        }
507
508        // Step 2: Create schemas (with optional staging suffix)
509        for sq in schema_set {
510            let schema_name = match staging_suffix {
511                Some(suffix) => format!("{}{}", sq.schema, suffix),
512                None => sq.schema.clone(),
513            };
514            verbose!(
515                "Creating schema {}.{} if not exists",
516                sq.database,
517                schema_name
518            );
519            let sql = format!(
520                "CREATE SCHEMA IF NOT EXISTS {}.{}",
521                quote_identifier(&sq.database),
522                quote_identifier(&schema_name)
523            );
524            self.execute_sql(&sql).await?;
525        }
526
527        // Step 3: Execute mod_statements filtered by schema_set membership
528        for mod_stmt in planned_project.iter_mod_statements() {
529            match mod_stmt {
530                project::ModStatement::Database {
531                    database,
532                    statement,
533                } => {
534                    let has_schema = schema_set.iter().any(|sq| sq.database == *database);
535                    if has_schema {
536                        verbose!("Applying database setup for: {}", database);
537                        self.execute_sql(statement).await?;
538                    }
539                }
540                project::ModStatement::Schema {
541                    database,
542                    schema,
543                    statement,
544                } => {
545                    if schema_set.contains(&project::SchemaQualifier::new(
546                        database.to_string(),
547                        schema.to_string(),
548                    )) {
549                        if let Some(suffix) = staging_suffix {
550                            let staging_schema = format!("{}{}", schema, suffix);
551                            let mut rewritten = statement.clone();
552                            normalize::rewrite_schema_names(
553                                std::slice::from_mut(&mut rewritten),
554                                schema,
555                                suffix,
556                            );
557                            verbose!("Applying schema setup for: {}.{}", database, staging_schema);
558                            self.execute_sql(&rewritten).await?;
559                        } else {
560                            verbose!("Applying schema setup for: {}.{}", database, schema);
561                            self.execute_sql(statement).await?;
562                        }
563                    }
564                }
565            }
566        }
567
568        Ok(())
569    }
570
571    /// Execute (or print in dry-run mode) a single SQL statement.
572    ///
573    /// In dry-run mode, statements are always logged to the internal
574    /// statement buffer (retrievable via `take_statements()`).
575    pub async fn execute_sql(&self, stmt: &impl ToString) -> Result<(), CliError> {
576        let sql = stmt.to_string();
577
578        if self.dry_run {
579            self.statement_log.borrow_mut().push(sql.clone());
580            return Ok(());
581        }
582
583        self.client
584            .execute(&sql, &[])
585            .await
586            .map_err(|source| CliError::SqlExecutionFailed {
587                statement: sql,
588                source,
589            })?;
590        Ok(())
591    }
592
593    /// Ensure a database exists.
594    ///
595    /// Real mode: delegates to `client.provisioning().create_database()`.
596    /// Dry-run: logs `CREATE DATABASE IF NOT EXISTS ...`.
597    pub async fn ensure_database(&self, name: &str) -> Result<(), CliError> {
598        if self.dry_run {
599            let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(name));
600            self.statement_log.borrow_mut().push(sql);
601        } else {
602            self.client.provisioning().create_database(name).await?;
603        }
604        Ok(())
605    }
606
607    /// Ensure a schema exists in the given database.
608    ///
609    /// Real mode: delegates to `client.provisioning().create_schema()`.
610    /// Dry-run: logs `CREATE SCHEMA IF NOT EXISTS ...`.
611    pub async fn ensure_schema(&self, database: &str, schema: &str) -> Result<(), CliError> {
612        if self.dry_run {
613            let sql = format!(
614                "CREATE SCHEMA IF NOT EXISTS {}.{}",
615                quote_identifier(database),
616                quote_identifier(schema)
617            );
618            self.statement_log.borrow_mut().push(sql);
619        } else {
620            self.client
621                .provisioning()
622                .create_schema(database, schema)
623                .await?;
624        }
625        Ok(())
626    }
627
628    /// Create a staging cluster by cloning a production cluster's configuration.
629    ///
630    /// Real mode: delegates to `client.provisioning().create_cluster_with_config()`.
631    /// Dry-run: logs a placeholder `CREATE CLUSTER ...` statement.
632    pub async fn create_cluster(
633        &self,
634        staging_name: &str,
635        prod_name: &str,
636        config: &ClusterConfig,
637    ) -> Result<(), CliError> {
638        if self.dry_run {
639            let sql = format!(
640                "CREATE CLUSTER {} (SIZE = '<from {}')",
641                quote_identifier(staging_name),
642                prod_name
643            );
644            self.statement_log.borrow_mut().push(sql);
645        } else {
646            self.client
647                .provisioning()
648                .create_cluster_with_config(staging_name, config)
649                .await?;
650        }
651        Ok(())
652    }
653
654    /// Record cluster mappings for a deployment.
655    ///
656    /// Real mode: delegates to `client.deployments().insert_deployment_clusters()`.
657    /// Dry-run: no-op (internal bookkeeping).
658    pub async fn record_deployment_clusters(
659        &self,
660        stage_name: &str,
661        clusters: &[String],
662    ) -> Result<(), CliError> {
663        if !self.dry_run {
664            self.client
665                .deployments()
666                .insert_deployment_clusters(stage_name, clusters)
667                .await?;
668            verbose!("Cluster mappings recorded");
669        }
670        Ok(())
671    }
672}