Skip to main content

mz_deploy/cli/
executor.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment execution utilities.
11//!
12//! This module contains the `DeploymentExecutor` for running SQL statements
13//! during deployment, along with helper functions for collecting deployment
14//! metadata and generating environment names.
15//!
16//! ## ApplyPlan Lifecycle
17//!
18//! ```text
19//! compile project → connect client → plan phases → execute
20//!                                        │
21//!                     ┌──────────────────┼───────────────────┐
22//!                     ▼                  ▼                   ▼
23//!              prepare_schemas     add_phase(...)      execute(&client)
24//!              (CREATE DB/SCHEMA)  (dry-run SQL)       (run all SQL)
25//! ```
26//!
27//! 1. **Compile** — `compile_apply_project_and_connect` loads and validates the project.
28//! 2. **Plan** — Each apply subcommand calls `DeploymentExecutor::new_dry_run()` to
29//!    collect SQL without executing it, then packages results as `ApplyResult` phases.
30//! 3. **Execute** — `ApplyPlan::execute()` runs setup statements first, then per-phase
31//!    object statements. Objects sharing a `transaction_group` key are wrapped in a
32//!    single `BEGIN`/`COMMIT` block with automatic `ROLLBACK` on failure.
33
34use crate::cli::CliError;
35use crate::cli::git::get_git_commit;
36use crate::client::{Client, ClusterConfig, quote_identifier};
37use crate::config::Settings;
38use crate::project::analysis::deployment_snapshot::DeploymentMetadata;
39use crate::project::ir::graph::Project;
40use crate::project::resolve::normalize;
41use crate::project::{self, ir::compiled};
42use crate::{info, verbose};
43use owo_colors::{OwoColorize, Stream};
44use serde::Serialize;
45use std::cell::RefCell;
46use std::collections::BTreeSet;
47use std::fmt;
48use std::path::Path;
49
50/// What happened when applying a single object.
51#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
52#[serde(rename_all = "snake_case")]
53pub enum ObjectAction {
54    Created,
55    Altered,
56    UpToDate,
57    Skipped,
58}
59
60impl fmt::Display for ObjectAction {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        match self {
63            ObjectAction::Created => write!(f, "created"),
64            ObjectAction::Altered => write!(f, "altered"),
65            ObjectAction::UpToDate => write!(f, "up_to_date"),
66            ObjectAction::Skipped => write!(f, "skipped"),
67        }
68    }
69}
70
71/// Result of applying a single object (cluster, role, connection, etc.).
72#[derive(Clone, Serialize)]
73pub struct ObjectResult {
74    /// Fully-qualified object name, e.g. "materialize.raw.pgconn".
75    pub object: String,
76    /// What happened: created, altered, up_to_date, skipped.
77    pub action: ObjectAction,
78    /// SQL statements that were (or would be) executed.
79    pub statements: Vec<String>,
80    /// SQL statements that must be executed but contain sensitive values
81    /// (e.g. CREATE SECRET, ALTER SECRET). These are never serialized or displayed.
82    #[serde(skip)]
83    pub redacted_statements: Vec<String>,
84    /// Optional transaction group key. Objects with the same group key are
85    /// executed inside a single BEGIN/COMMIT transaction block.
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub transaction_group: Option<String>,
88    /// Statements that must run AFTER the object's transaction commits, outside
89    /// any BEGIN/COMMIT (indexes, GRANTs, and COMMENTs). Materialize forbids
90    /// mixing a GRANT or COMMENT with object creation in one transaction, so a
91    /// grouped object keeps only its bare CREATE statements in `statements` and
92    /// defers the rest here.
93    #[serde(skip_serializing_if = "Vec::is_empty")]
94    pub post_statements: Vec<String>,
95}
96
97impl fmt::Display for ObjectResult {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        match self.action {
100            ObjectAction::Created | ObjectAction::Altered => {
101                write!(
102                    f,
103                    "  {} {}",
104                    "✓".if_supports_color(Stream::Stderr, |t| t.green()),
105                    self.object
106                )
107            }
108            ObjectAction::UpToDate => write!(
109                f,
110                "  {} {} ({})",
111                "=".if_supports_color(Stream::Stderr, |t| t.dimmed()),
112                self.object,
113                "up to date".if_supports_color(Stream::Stderr, |t| t.dimmed())
114            ),
115            ObjectAction::Skipped => write!(
116                f,
117                "  {} {} ({})",
118                "-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
119                self.object,
120                "skipped".if_supports_color(Stream::Stderr, |t| t.dimmed())
121            ),
122        }
123    }
124}
125
126/// Result of applying one phase (e.g. clusters, connections, etc.).
127#[derive(Clone, Serialize)]
128pub struct ApplyResult {
129    /// Phase name: "clusters", "roles", "connections", etc.
130    pub phase: String,
131    /// Per-object results.
132    pub results: Vec<ObjectResult>,
133}
134
135impl fmt::Display for ApplyResult {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        if self.results.is_empty() {
138            return Ok(());
139        }
140
141        let created = self
142            .results
143            .iter()
144            .filter(|r| r.action == ObjectAction::Created)
145            .count();
146        let altered = self
147            .results
148            .iter()
149            .filter(|r| r.action == ObjectAction::Altered)
150            .count();
151        let up_to_date = self
152            .results
153            .iter()
154            .filter(|r| r.action == ObjectAction::UpToDate)
155            .count();
156
157        let label = &self.phase;
158        let mut lines = Vec::new();
159
160        if up_to_date > 0 && created == 0 && altered == 0 {
161            lines.push(format!(
162                "  {} {} {} up to date",
163                "✓".if_supports_color(Stream::Stderr, |t| t.green()),
164                up_to_date,
165                label
166            ));
167        }
168
169        for r in &self.results {
170            if r.action != ObjectAction::UpToDate {
171                lines.push(format!("{}", r));
172            }
173        }
174
175        write!(f, "{}", lines.join("\n"))
176    }
177}
178
179/// A complete apply plan: global setup + ordered phase results.
180/// Built incrementally by adding phases, then executed as a unit.
181#[derive(Serialize)]
182pub struct ApplyPlan {
183    /// Global setup SQL (CREATE DATABASE/SCHEMA, mod_statements).
184    /// Deduplicated across all phases.
185    pub setup_statements: Vec<String>,
186    /// Per-phase results in dependency order.
187    pub phases: Vec<ApplyResult>,
188    /// Tracks which schemas have already been prepared (for deduplication).
189    #[serde(skip)]
190    prepared_schemas: BTreeSet<project::SchemaQualifier>,
191}
192
193/// A batch of objects to execute together, optionally inside a transaction.
194struct ExecutionBatch<'a> {
195    /// If Some, these objects are wrapped in BEGIN/COMMIT.
196    transaction_group: Option<&'a str>,
197    /// The objects in this batch.
198    objects: Vec<&'a ObjectResult>,
199}
200
201impl ApplyPlan {
202    pub fn new() -> Self {
203        Self {
204            setup_statements: Vec::new(),
205            phases: Vec::new(),
206            prepared_schemas: BTreeSet::new(),
207        }
208    }
209
210    /// Prepare databases and schemas for the given schema set.
211    /// Deduplicates against previously prepared schemas.
212    pub async fn prepare_schemas(
213        &mut self,
214        executor: &DeploymentExecutor<'_>,
215        planned_project: &Project,
216        schema_set: &BTreeSet<project::SchemaQualifier>,
217    ) -> Result<(), CliError> {
218        let new_schemas: BTreeSet<_> = schema_set
219            .difference(&self.prepared_schemas)
220            .cloned()
221            .collect();
222        if new_schemas.is_empty() {
223            return Ok(());
224        }
225        executor
226            .prepare_databases_and_schemas(planned_project, &new_schemas, None)
227            .await?;
228        self.setup_statements.extend(executor.take_statements());
229        self.prepared_schemas.extend(new_schemas);
230        Ok(())
231    }
232
233    /// Add a completed phase result.
234    pub fn add_phase(&mut self, result: ApplyResult) {
235        self.phases.push(result);
236    }
237
238    /// Execute: run global setup, then each phase's per-object SQL.
239    ///
240    /// Objects that share a `transaction_group` key are wrapped in a single
241    /// BEGIN/COMMIT block. On error inside a transaction, ROLLBACK is issued
242    /// before returning the error.
243    pub async fn execute(&self, client: &Client) -> Result<(), CliError> {
244        // Phase 1: setup statements
245        for sql in &self.setup_statements {
246            client
247                .execute(sql, &[])
248                .await
249                .map_err(|source| CliError::SqlExecutionFailed {
250                    statement: sql.clone(),
251                    source,
252                })?;
253        }
254
255        // Phase 2: Group objects into execution batches
256        let mut batches: Vec<ExecutionBatch<'_>> = Vec::new();
257        for phase in &self.phases {
258            for obj in &phase.results {
259                let obj_txn = obj.transaction_group.as_deref();
260                match batches.last_mut() {
261                    Some(batch) if batch.transaction_group == obj_txn && obj_txn.is_some() => {
262                        // Same transaction group — append to current batch
263                        batch.objects.push(obj);
264                    }
265                    _ => {
266                        // New batch (different group, no group, or first object)
267                        batches.push(ExecutionBatch {
268                            transaction_group: obj_txn,
269                            objects: vec![obj],
270                        });
271                    }
272                }
273            }
274        }
275
276        // Phase 3: Execute batches
277        for batch in &batches {
278            let in_txn = batch.transaction_group.is_some();
279            if in_txn {
280                client.execute("BEGIN", &[]).await.map_err(|source| {
281                    CliError::SqlExecutionFailed {
282                        statement: "BEGIN".to_string(),
283                        source,
284                    }
285                })?;
286            }
287
288            for obj in &batch.objects {
289                for sql in obj.redacted_statements.iter().chain(&obj.statements) {
290                    if let Err(e) = client.execute(sql, &[]).await {
291                        if in_txn {
292                            let _ = client.execute("ROLLBACK", &[]).await;
293                        }
294                        return Err(CliError::SqlExecutionFailed {
295                            statement: if obj.redacted_statements.contains(sql) {
296                                "[REDACTED — contains secret value]".to_string()
297                            } else {
298                                sql.clone()
299                            },
300                            source: e,
301                        });
302                    }
303                }
304            }
305
306            if in_txn {
307                client.execute("COMMIT", &[]).await.map_err(|source| {
308                    CliError::SqlExecutionFailed {
309                        statement: "COMMIT".to_string(),
310                        source,
311                    }
312                })?;
313            }
314
315            for obj in &batch.objects {
316                for sql in &obj.post_statements {
317                    client.execute(sql, &[]).await.map_err(|source| {
318                        CliError::SqlExecutionFailed {
319                            statement: sql.clone(),
320                            source,
321                        }
322                    })?;
323                }
324            }
325        }
326
327        Ok(())
328    }
329}
330
331impl fmt::Display for ApplyPlan {
332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333        let mut first = true;
334        for phase in &self.phases {
335            if phase.results.is_empty() {
336                continue;
337            }
338            if !first {
339                writeln!(f)?;
340            }
341            write!(f, "{}", phase)?;
342            first = false;
343        }
344        Ok(())
345    }
346}
347
348/// Collect deployment metadata (user and git commit).
349///
350/// This function retrieves the current database user and git commit hash
351/// for recording deployment provenance. If the current user cannot be
352/// determined, it defaults to "unknown".
353pub async fn collect_deployment_metadata(client: &Client, directory: &Path) -> DeploymentMetadata {
354    let deployed_by = client
355        .introspection()
356        .get_current_user()
357        .await
358        .unwrap_or_else(|e| {
359            info!("warning: failed to get current user: {}", e);
360            "unknown".to_string()
361        });
362
363    let git_commit = get_git_commit(directory);
364
365    DeploymentMetadata {
366        deployed_by,
367        git_commit,
368    }
369}
370
371/// Connect a planning client for apply commands.
372///
373/// Validates the connection and requires the `materialize_deployer` role.
374pub async fn connect_apply_client(settings: &Settings) -> Result<Client, CliError> {
375    let client = Client::connect_with_profile(settings.connection().clone())
376        .await
377        .map_err(CliError::Connection)?;
378    let role =
379        crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
380    crate::cli::commands::setup::require_deployer(role)?;
381    Ok(client)
382}
383
384/// Compile the project (without type checking) and connect a planning client
385/// for database-object apply commands.
386pub async fn compile_apply_project_and_connect(
387    settings: &Settings,
388) -> Result<(Project, Client), CliError> {
389    let planned_project = crate::cli::commands::compile::run_without_typecheck(
390        settings,
391        !crate::log::json_output_enabled(),
392    )
393    .await?;
394    let client = connect_apply_client(settings).await?;
395    Ok((planned_project, client))
396}
397
398/// Generate a unique 7-character hex identifier for deployments when no
399/// explicit environment name is provided.
400pub fn generate_random_env_name() -> String {
401    use sha2::{Digest, Sha256};
402    use std::time::SystemTime;
403
404    let now = SystemTime::now()
405        .duration_since(SystemTime::UNIX_EPOCH)
406        .expect("system time before Unix epoch")
407        .as_nanos();
408
409    let mut hasher = Sha256::new();
410    hasher.update(now.to_le_bytes());
411    let hash = hasher.finalize();
412
413    // Take first 4 bytes of hash and format as 7-char hex
414    format!(
415        "{:07x}",
416        u32::from_le_bytes([hash[0], hash[1], hash[2], hash[3]]) & 0xFFFFFFF
417    )
418}
419
420/// Helper for executing database object deployments.
421///
422/// This struct consolidates the pattern of executing a database object's
423/// SQL statements (main statement + indexes + grants + comments) with
424/// consistent error handling. Supports dry-run mode where SQL is printed
425/// instead of executed.
426///
427/// In dry-run mode, statements are always recorded in an internal log
428/// that can be drained per-object via `take_statements()`.
429pub struct DeploymentExecutor<'a> {
430    client: &'a Client,
431    dry_run: bool,
432    statement_log: RefCell<Vec<String>>,
433}
434
435impl<'a> DeploymentExecutor<'a> {
436    /// Create a new deployment executor that executes SQL.
437    pub fn new(client: &'a Client) -> Self {
438        Self {
439            client,
440            dry_run: false,
441            statement_log: RefCell::new(Vec::new()),
442        }
443    }
444
445    /// Create a deployment executor that always runs in dry-run mode (for planning).
446    pub fn new_dry_run(client: &'a Client) -> Self {
447        Self {
448            client,
449            dry_run: true,
450            statement_log: RefCell::new(Vec::new()),
451        }
452    }
453
454    /// Create a deployment executor with configurable dry-run mode.
455    pub fn with_dry_run(client: &'a Client, dry_run: bool) -> Self {
456        Self {
457            client,
458            dry_run,
459            statement_log: RefCell::new(Vec::new()),
460        }
461    }
462
463    /// Returns true if this executor is in dry-run mode.
464    pub fn is_dry_run(&self) -> bool {
465        self.dry_run
466    }
467
468    /// Drain and return all statements recorded since the last call.
469    ///
470    /// Use this between objects to capture per-object statement lists
471    /// for `ObjectResult`.
472    pub fn take_statements(&self) -> Vec<String> {
473        self.statement_log.borrow_mut().drain(..).collect()
474    }
475
476    /// Execute all SQL statements for a database object.
477    ///
478    /// This executes the main CREATE statement, followed by any indexes,
479    /// grants, and comments associated with the object.
480    pub async fn execute_object(
481        &self,
482        typed_obj: &compiled::DatabaseObject,
483    ) -> Result<(), CliError> {
484        // Execute main statement
485        self.execute_sql(&typed_obj.stmt).await?;
486
487        // Execute indexes
488        for index in &typed_obj.indexes {
489            self.execute_sql(index).await?;
490        }
491
492        // Execute grants
493        for grant in &typed_obj.grants {
494            self.execute_sql(grant).await?;
495        }
496
497        // Execute comments
498        for comment in &typed_obj.comments {
499            self.execute_sql(comment).await?;
500        }
501
502        Ok(())
503    }
504
505    /// Create databases and schemas for `schema_set`, then execute filtered mod_statements.
506    ///
507    /// When `staging_suffix` is `Some`, schema names are suffixed and mod_statement
508    /// schema references are rewritten at the AST level to target staging schemas.
509    pub async fn prepare_databases_and_schemas(
510        &self,
511        planned_project: &Project,
512        schema_set: &BTreeSet<project::SchemaQualifier>,
513        staging_suffix: Option<&str>,
514    ) -> Result<(), CliError> {
515        if schema_set.is_empty() {
516            return Ok(());
517        }
518
519        // Step 1: Create databases
520        let databases: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
521        for db in &databases {
522            let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(db));
523            self.execute_sql(&sql).await?;
524        }
525
526        // Step 2: Create schemas (with optional staging suffix)
527        for sq in schema_set {
528            let schema_name = match staging_suffix {
529                Some(suffix) => format!("{}{}", sq.schema, suffix),
530                None => sq.schema.clone(),
531            };
532            verbose!(
533                "Creating schema {}.{} if not exists",
534                sq.database,
535                schema_name
536            );
537            let sql = format!(
538                "CREATE SCHEMA IF NOT EXISTS {}.{}",
539                quote_identifier(&sq.database),
540                quote_identifier(&schema_name)
541            );
542            self.execute_sql(&sql).await?;
543        }
544
545        // Step 3: Execute mod_statements filtered by schema_set membership
546        for mod_stmt in planned_project.iter_mod_statements() {
547            match mod_stmt {
548                project::ModStatement::Database {
549                    database,
550                    statement,
551                } => {
552                    let has_schema = schema_set.iter().any(|sq| sq.database == *database);
553                    if has_schema {
554                        verbose!("Applying database setup for: {}", database);
555                        self.execute_sql(statement).await?;
556                    }
557                }
558                project::ModStatement::Schema {
559                    database,
560                    schema,
561                    statement,
562                } => {
563                    if schema_set.contains(&project::SchemaQualifier::new(
564                        database.to_string(),
565                        schema.to_string(),
566                    )) {
567                        if let Some(suffix) = staging_suffix {
568                            let staging_schema = format!("{}{}", schema, suffix);
569                            let mut rewritten = statement.clone();
570                            normalize::rewrite_schema_names(
571                                std::slice::from_mut(&mut rewritten),
572                                schema,
573                                suffix,
574                            );
575                            verbose!("Applying schema setup for: {}.{}", database, staging_schema);
576                            self.execute_sql(&rewritten).await?;
577                        } else {
578                            verbose!("Applying schema setup for: {}.{}", database, schema);
579                            self.execute_sql(statement).await?;
580                        }
581                    }
582                }
583            }
584        }
585
586        Ok(())
587    }
588
589    /// Execute (or print in dry-run mode) a single SQL statement.
590    ///
591    /// In dry-run mode, statements are always logged to the internal
592    /// statement buffer (retrievable via `take_statements()`).
593    pub async fn execute_sql(&self, stmt: &impl ToString) -> Result<(), CliError> {
594        let sql = stmt.to_string();
595
596        if self.dry_run {
597            self.statement_log.borrow_mut().push(sql.clone());
598            return Ok(());
599        }
600
601        self.client
602            .execute(&sql, &[])
603            .await
604            .map_err(|source| CliError::SqlExecutionFailed {
605                statement: sql,
606                source,
607            })?;
608        Ok(())
609    }
610
611    /// Ensure a database exists.
612    ///
613    /// Real mode: delegates to `client.provisioning().create_database()`.
614    /// Dry-run: logs `CREATE DATABASE IF NOT EXISTS ...`.
615    pub async fn ensure_database(&self, name: &str) -> Result<(), CliError> {
616        if self.dry_run {
617            let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(name));
618            self.statement_log.borrow_mut().push(sql);
619        } else {
620            self.client.provisioning().create_database(name).await?;
621        }
622        Ok(())
623    }
624
625    /// Ensure a schema exists in the given database.
626    ///
627    /// Real mode: delegates to `client.provisioning().create_schema()`.
628    /// Dry-run: logs `CREATE SCHEMA IF NOT EXISTS ...`.
629    pub async fn ensure_schema(&self, database: &str, schema: &str) -> Result<(), CliError> {
630        if self.dry_run {
631            let sql = format!(
632                "CREATE SCHEMA IF NOT EXISTS {}.{}",
633                quote_identifier(database),
634                quote_identifier(schema)
635            );
636            self.statement_log.borrow_mut().push(sql);
637        } else {
638            self.client
639                .provisioning()
640                .create_schema(database, schema)
641                .await?;
642        }
643        Ok(())
644    }
645
646    /// Create a staging cluster by cloning a production cluster's configuration.
647    ///
648    /// Real mode: delegates to `client.provisioning().create_cluster_with_config()`.
649    /// Dry-run: logs a placeholder `CREATE CLUSTER ...` statement.
650    pub async fn create_cluster(
651        &self,
652        staging_name: &str,
653        prod_name: &str,
654        config: &ClusterConfig,
655    ) -> Result<(), CliError> {
656        if self.dry_run {
657            let sql = format!(
658                "CREATE CLUSTER {} (SIZE = '<from {}')",
659                quote_identifier(staging_name),
660                prod_name
661            );
662            self.statement_log.borrow_mut().push(sql);
663        } else {
664            self.client
665                .provisioning()
666                .create_cluster_with_config(staging_name, config)
667                .await?;
668        }
669        Ok(())
670    }
671
672    /// Record cluster mappings for a deployment.
673    ///
674    /// Real mode: delegates to `client.deployments().insert_deployment_clusters()`.
675    /// Dry-run: no-op (internal bookkeeping).
676    pub async fn record_deployment_clusters(
677        &self,
678        stage_name: &str,
679        clusters: &[String],
680    ) -> Result<(), CliError> {
681        if !self.dry_run {
682            self.client
683                .deployments()
684                .insert_deployment_clusters(stage_name, clusters)
685                .await?;
686            verbose!("Cluster mappings recorded");
687        }
688        Ok(())
689    }
690}