mz_deploy/cli/commands/
apply_sources.rs1use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::{
16 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult,
17 compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use std::collections::BTreeSet;
24
25const PHASE_NAME: &str = "sources";
26const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Source;
27
28fn matches(stmt: &Statement) -> bool {
29 matches!(stmt, Statement::CreateSource(_))
30}
31
32pub async fn plan(
34 _settings: &Settings,
35 client: &Client,
36 executor: &DeploymentExecutor<'_>,
37 planned_project: &project::ir::graph::Project,
38 apply_plan: &mut ApplyPlan,
39) -> Result<ApplyResult, CliError> {
40 let mut target_ids = BTreeSet::new();
41 for obj in planned_project.iter_objects() {
42 if matches(&obj.typed_object.stmt) {
43 target_ids.insert(obj.id.clone());
44 }
45 }
46
47 if target_ids.is_empty() {
48 return Ok(ApplyResult {
49 phase: PHASE_NAME.to_string(),
50 results: vec![],
51 });
52 }
53
54 let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
55 let existing = client
56 .introspection()
57 .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
58 .await
59 .map_err(CliError::Connection)?;
60
61 let schemas: BTreeSet<_> = target_objects
62 .iter()
63 .filter(|(obj_id, _)| !existing.contains(obj_id))
64 .map(|(obj_id, _)| {
65 project::SchemaQualifier::new(
66 obj_id.expect_database().to_string(),
67 obj_id.schema().to_string(),
68 )
69 })
70 .collect();
71 apply_plan
72 .prepare_schemas(executor, planned_project, &schemas)
73 .await?;
74
75 let mut results = Vec::new();
76
77 for (obj_id, typed_obj) in target_objects {
78 executor.take_statements();
79
80 let action = if existing.contains(&obj_id) {
81 apply_objects::reconcile_grants_and_comments(
82 client,
83 executor,
84 &obj_id,
85 typed_obj,
86 &GRANT_KIND,
87 )
88 .await?;
89 ObjectAction::UpToDate
90 } else {
91 executor.execute_sql(&typed_obj.stmt).await?;
92 for index in &typed_obj.indexes {
93 executor.execute_sql(index).await?;
94 }
95 apply_objects::reconcile_grants_and_comments(
96 client,
97 executor,
98 &obj_id,
99 typed_obj,
100 &GRANT_KIND,
101 )
102 .await?;
103 ObjectAction::Created
104 };
105
106 results.push(ObjectResult {
107 object: obj_id.to_string(),
108 action,
109 statements: executor.take_statements(),
110 redacted_statements: vec![],
111 transaction_group: None,
112 });
113 }
114
115 Ok(ApplyResult {
116 phase: PHASE_NAME.to_string(),
117 results,
118 })
119}
120
121pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
123 let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
124 let mut apply_plan = ApplyPlan::new();
125 let executor = DeploymentExecutor::new_dry_run(&client);
126 let phase = plan(
127 settings,
128 &client,
129 &executor,
130 &planned_project,
131 &mut apply_plan,
132 )
133 .await?;
134 apply_plan.add_phase(phase);
135
136 if !dry_run {
137 apply_plan.execute(&client).await?;
138 }
139
140 Ok(apply_plan)
141}