mz_deploy/cli/commands/
apply_tables.rs1use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::{
16 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult,
17 compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use crate::project::ir::graph::Project;
24use std::collections::BTreeSet;
25
26const PHASE_NAME: &str = "tables";
27const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Table;
28
29fn matches(stmt: &Statement) -> bool {
30 matches!(
31 stmt,
32 Statement::CreateTable(_) | Statement::CreateTableFromSource(_)
33 )
34}
35
36pub async fn plan(
38 _settings: &Settings,
39 client: &Client,
40 executor: &DeploymentExecutor<'_>,
41 planned_project: &Project,
42 apply_plan: &mut ApplyPlan,
43) -> Result<ApplyResult, CliError> {
44 let mut target_ids = BTreeSet::new();
45 for obj in planned_project.iter_objects() {
46 if matches(&obj.typed_object.stmt) {
47 target_ids.insert(obj.id.clone());
48 }
49 }
50
51 if target_ids.is_empty() {
52 return Ok(ApplyResult {
53 phase: PHASE_NAME.to_string(),
54 results: vec![],
55 });
56 }
57
58 let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
59 let existing = client
60 .introspection()
61 .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
62 .await
63 .map_err(CliError::Connection)?;
64
65 let schemas: BTreeSet<_> = target_objects
66 .iter()
67 .filter(|(obj_id, _)| !existing.contains(obj_id))
68 .map(|(obj_id, _)| {
69 project::SchemaQualifier::new(
70 obj_id.expect_database().to_string(),
71 obj_id.schema().to_string(),
72 )
73 })
74 .collect();
75 apply_plan
76 .prepare_schemas(executor, planned_project, &schemas)
77 .await?;
78
79 let mut results = Vec::new();
80
81 for (obj_id, typed_obj) in target_objects {
82 executor.take_statements();
83
84 let action = if existing.contains(&obj_id) {
85 apply_objects::reconcile_grants_and_comments(
86 client,
87 executor,
88 &obj_id,
89 typed_obj,
90 &GRANT_KIND,
91 )
92 .await?;
93 ObjectAction::UpToDate
94 } else {
95 executor.execute_sql(&typed_obj.stmt).await?;
96 for index in &typed_obj.indexes {
97 executor.execute_sql(index).await?;
98 }
99 apply_objects::reconcile_grants_and_comments(
100 client,
101 executor,
102 &obj_id,
103 typed_obj,
104 &GRANT_KIND,
105 )
106 .await?;
107 ObjectAction::Created
108 };
109
110 let txn_group = if action == ObjectAction::Created {
111 if let Statement::CreateTableFromSource(s) = &typed_obj.stmt {
112 Some(s.source.to_string())
113 } else {
114 None
115 }
116 } else {
117 None
118 };
119
120 results.push(ObjectResult {
121 object: obj_id.to_string(),
122 action,
123 statements: executor.take_statements(),
124 redacted_statements: vec![],
125 transaction_group: txn_group,
126 });
127 }
128
129 results.sort_by(|a, b| a.transaction_group.cmp(&b.transaction_group));
132
133 Ok(ApplyResult {
134 phase: PHASE_NAME.to_string(),
135 results,
136 })
137}
138
139pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
141 let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
142 let mut apply_plan = ApplyPlan::new();
143 let executor = DeploymentExecutor::new_dry_run(&client);
144 let phase = plan(
145 settings,
146 &client,
147 &executor,
148 &planned_project,
149 &mut apply_plan,
150 )
151 .await?;
152 apply_plan.add_phase(phase);
153
154 if !dry_run {
155 apply_plan.execute(&client).await?;
156 super::lock::run(settings).await?;
157 }
158
159 Ok(apply_plan)
160}