Skip to main content

mz_deploy/cli/commands/
apply_tables.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apply tables command - create tables that don't exist in the database.
11
12use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::{
16    ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult,
17    compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use crate::project::ir::graph::Project;
24use std::collections::BTreeSet;
25
26const PHASE_NAME: &str = "tables";
27const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Table;
28
29fn matches(stmt: &Statement) -> bool {
30    matches!(
31        stmt,
32        Statement::CreateTable(_) | Statement::CreateTableFromSource(_)
33    )
34}
35
36/// Plan only table objects (no deployment tracking, no execution).
37pub async fn plan(
38    _settings: &Settings,
39    client: &Client,
40    executor: &DeploymentExecutor<'_>,
41    planned_project: &Project,
42    apply_plan: &mut ApplyPlan,
43) -> Result<ApplyResult, CliError> {
44    let mut target_ids = BTreeSet::new();
45    for obj in planned_project.iter_objects() {
46        if matches(&obj.typed_object.stmt) {
47            target_ids.insert(obj.id.clone());
48        }
49    }
50
51    if target_ids.is_empty() {
52        return Ok(ApplyResult {
53            phase: PHASE_NAME.to_string(),
54            results: vec![],
55        });
56    }
57
58    let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
59    let existing = client
60        .introspection()
61        .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
62        .await
63        .map_err(CliError::Connection)?;
64
65    let schemas: BTreeSet<_> = target_objects
66        .iter()
67        .filter(|(obj_id, _)| !existing.contains(obj_id))
68        .map(|(obj_id, _)| {
69            project::SchemaQualifier::new(
70                obj_id.expect_database().to_string(),
71                obj_id.schema().to_string(),
72            )
73        })
74        .collect();
75    apply_plan
76        .prepare_schemas(executor, planned_project, &schemas)
77        .await?;
78
79    let mut results = Vec::new();
80
81    for (obj_id, typed_obj) in target_objects {
82        executor.take_statements();
83
84        if existing.contains(&obj_id) {
85            apply_objects::reconcile_grants_and_comments(
86                client,
87                executor,
88                &obj_id,
89                typed_obj,
90                &GRANT_KIND,
91            )
92            .await?;
93            results.push(ObjectResult {
94                object: obj_id.to_string(),
95                action: ObjectAction::UpToDate,
96                statements: executor.take_statements(),
97                redacted_statements: vec![],
98                transaction_group: None,
99                post_statements: vec![],
100            });
101            continue;
102        }
103
104        executor.execute_sql(&typed_obj.stmt).await?;
105        let statements = executor.take_statements();
106
107        for index in &typed_obj.indexes {
108            executor.execute_sql(index).await?;
109        }
110        apply_objects::reconcile_grants_and_comments(
111            client,
112            executor,
113            &obj_id,
114            typed_obj,
115            &GRANT_KIND,
116        )
117        .await?;
118        let post_statements = executor.take_statements();
119
120        let transaction_group = match &typed_obj.stmt {
121            Statement::CreateTableFromSource(s) => Some(s.source.to_string()),
122            _ => None,
123        };
124
125        results.push(ObjectResult {
126            object: obj_id.to_string(),
127            action: ObjectAction::Created,
128            statements,
129            redacted_statements: vec![],
130            transaction_group,
131            post_statements,
132        });
133    }
134
135    // Reorder: non-grouped objects first, then grouped objects sorted by group key.
136    // stable_sort_by preserves topological order within each group.
137    results.sort_by(|a, b| a.transaction_group.cmp(&b.transaction_group));
138
139    Ok(ApplyResult {
140        phase: PHASE_NAME.to_string(),
141        results,
142    })
143}
144
145/// Run the `apply tables` command: compile, plan, optionally execute, then lock.
146pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
147    let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
148    let mut apply_plan = ApplyPlan::new();
149    let executor = DeploymentExecutor::new_dry_run(&client);
150    let phase = plan(
151        settings,
152        &client,
153        &executor,
154        &planned_project,
155        &mut apply_plan,
156    )
157    .await?;
158    apply_plan.add_phase(phase);
159
160    if !dry_run {
161        apply_plan.execute(&client).await?;
162        super::lock::run(settings).await?;
163    }
164
165    Ok(apply_plan)
166}