mz_deploy/cli/commands/
apply_sources.rs1use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::{
16 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult,
17 compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use std::collections::BTreeSet;
24
25const PHASE_NAME: &str = "sources";
26const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Source;
27
28fn matches(stmt: &Statement) -> bool {
29 matches!(stmt, Statement::CreateSource(_))
30}
31
32pub async fn plan(
34 _settings: &Settings,
35 client: &Client,
36 executor: &DeploymentExecutor<'_>,
37 planned_project: &project::ir::graph::Project,
38 apply_plan: &mut ApplyPlan,
39) -> Result<ApplyResult, CliError> {
40 let mut target_ids = BTreeSet::new();
41 for obj in planned_project.iter_objects() {
42 if matches(&obj.typed_object.stmt) {
43 target_ids.insert(obj.id.clone());
44 }
45 }
46
47 if target_ids.is_empty() {
48 return Ok(ApplyResult {
49 phase: PHASE_NAME.to_string(),
50 results: vec![],
51 });
52 }
53
54 let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
55 let existing = client
56 .introspection()
57 .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
58 .await
59 .map_err(CliError::Connection)?;
60
61 let schemas: BTreeSet<_> = target_objects
62 .iter()
63 .filter(|(obj_id, _)| !existing.contains(obj_id))
64 .map(|(obj_id, _)| {
65 project::SchemaQualifier::new(
66 obj_id.expect_database().to_string(),
67 obj_id.schema().to_string(),
68 )
69 })
70 .collect();
71 apply_plan
72 .prepare_schemas(executor, planned_project, &schemas)
73 .await?;
74
75 let mut results = Vec::new();
76
77 for (obj_id, typed_obj) in target_objects {
78 executor.take_statements();
79
80 let action = if existing.contains(&obj_id) {
81 apply_objects::reconcile_grants_and_comments(
82 client,
83 executor,
84 &obj_id,
85 typed_obj,
86 &GRANT_KIND,
87 )
88 .await?;
89 ObjectAction::UpToDate
90 } else {
91 executor.execute_sql(&typed_obj.stmt).await?;
92 for index in &typed_obj.indexes {
93 executor.execute_sql(index).await?;
94 }
95 apply_objects::reconcile_grants_and_comments(
96 client,
97 executor,
98 &obj_id,
99 typed_obj,
100 &GRANT_KIND,
101 )
102 .await?;
103 ObjectAction::Created
104 };
105
106 results.push(ObjectResult {
107 object: obj_id.to_string(),
108 action,
109 statements: executor.take_statements(),
110 redacted_statements: vec![],
111 transaction_group: None,
112 post_statements: vec![],
113 });
114 }
115
116 Ok(ApplyResult {
117 phase: PHASE_NAME.to_string(),
118 results,
119 })
120}
121
122pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
124 let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
125 let mut apply_plan = ApplyPlan::new();
126 let executor = DeploymentExecutor::new_dry_run(&client);
127 let phase = plan(
128 settings,
129 &client,
130 &executor,
131 &planned_project,
132 &mut apply_plan,
133 )
134 .await?;
135 apply_plan.add_phase(phase);
136
137 if !dry_run {
138 apply_plan.execute(&client).await?;
139 }
140
141 Ok(apply_plan)
142}