Skip to main content

mz_deploy/cli/commands/
apply_tables.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apply tables command - create tables that don't exist in the database.
11
12use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::{
16    ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult,
17    compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use crate::project::ir::graph::Project;
24use std::collections::BTreeSet;
25
26const PHASE_NAME: &str = "tables";
27const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Table;
28
29fn matches(stmt: &Statement) -> bool {
30    matches!(
31        stmt,
32        Statement::CreateTable(_) | Statement::CreateTableFromSource(_)
33    )
34}
35
36/// Plan only table objects (no deployment tracking, no execution).
37pub async fn plan(
38    _settings: &Settings,
39    client: &Client,
40    executor: &DeploymentExecutor<'_>,
41    planned_project: &Project,
42    apply_plan: &mut ApplyPlan,
43) -> Result<ApplyResult, CliError> {
44    let mut target_ids = BTreeSet::new();
45    for obj in planned_project.iter_objects() {
46        if matches(&obj.typed_object.stmt) {
47            target_ids.insert(obj.id.clone());
48        }
49    }
50
51    if target_ids.is_empty() {
52        return Ok(ApplyResult {
53            phase: PHASE_NAME.to_string(),
54            results: vec![],
55        });
56    }
57
58    let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
59    let existing = client
60        .introspection()
61        .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
62        .await
63        .map_err(CliError::Connection)?;
64
65    let schemas: BTreeSet<_> = target_objects
66        .iter()
67        .filter(|(obj_id, _)| !existing.contains(obj_id))
68        .map(|(obj_id, _)| {
69            project::SchemaQualifier::new(
70                obj_id.expect_database().to_string(),
71                obj_id.schema().to_string(),
72            )
73        })
74        .collect();
75    apply_plan
76        .prepare_schemas(executor, planned_project, &schemas)
77        .await?;
78
79    let mut results = Vec::new();
80
81    for (obj_id, typed_obj) in target_objects {
82        executor.take_statements();
83
84        let action = if existing.contains(&obj_id) {
85            apply_objects::reconcile_grants_and_comments(
86                client,
87                executor,
88                &obj_id,
89                typed_obj,
90                &GRANT_KIND,
91            )
92            .await?;
93            ObjectAction::UpToDate
94        } else {
95            executor.execute_sql(&typed_obj.stmt).await?;
96            for index in &typed_obj.indexes {
97                executor.execute_sql(index).await?;
98            }
99            apply_objects::reconcile_grants_and_comments(
100                client,
101                executor,
102                &obj_id,
103                typed_obj,
104                &GRANT_KIND,
105            )
106            .await?;
107            ObjectAction::Created
108        };
109
110        let txn_group = if action == ObjectAction::Created {
111            if let Statement::CreateTableFromSource(s) = &typed_obj.stmt {
112                Some(s.source.to_string())
113            } else {
114                None
115            }
116        } else {
117            None
118        };
119
120        results.push(ObjectResult {
121            object: obj_id.to_string(),
122            action,
123            statements: executor.take_statements(),
124            redacted_statements: vec![],
125            transaction_group: txn_group,
126        });
127    }
128
129    // Reorder: non-grouped objects first, then grouped objects sorted by group key.
130    // stable_sort_by preserves topological order within each group.
131    results.sort_by(|a, b| a.transaction_group.cmp(&b.transaction_group));
132
133    Ok(ApplyResult {
134        phase: PHASE_NAME.to_string(),
135        results,
136    })
137}
138
139/// Run the `apply tables` command: compile, plan, optionally execute, then lock.
140pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
141    let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
142    let mut apply_plan = ApplyPlan::new();
143    let executor = DeploymentExecutor::new_dry_run(&client);
144    let phase = plan(
145        settings,
146        &client,
147        &executor,
148        &planned_project,
149        &mut apply_plan,
150    )
151    .await?;
152    apply_plan.add_phase(phase);
153
154    if !dry_run {
155        apply_plan.execute(&client).await?;
156        super::lock::run(settings).await?;
157    }
158
159    Ok(apply_plan)
160}