mz_deploy/cli/commands/
apply_network_policies.rs1use crate::cli::CliError;
13use crate::cli::commands::grants;
14use crate::cli::executor::{
15 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult, connect_apply_client,
16};
17use crate::client::Client;
18use crate::config::Settings;
19use crate::project::network_policies::{self, NetworkPolicyDefinition};
20use mz_sql_parser::ast::AlterNetworkPolicyStatement;
21
22pub async fn plan(
24 settings: &Settings,
25 client: &Client,
26 executor: &DeploymentExecutor<'_>,
27) -> Result<ApplyResult, CliError> {
28 let profile = settings.connection();
29 let directory = &settings.directory;
30
31 let definitions =
32 network_policies::load_network_policies(directory, &profile.name, settings.variables())?;
33
34 if definitions.is_empty() {
35 return Ok(ApplyResult {
36 phase: "network_policies".to_string(),
37 results: vec![],
38 });
39 }
40
41 let mut object_results = Vec::new();
42 for def in &definitions {
43 let obj_result = plan_network_policy(client, executor, def).await?;
44 object_results.push(obj_result);
45 }
46
47 Ok(ApplyResult {
48 phase: "network_policies".to_string(),
49 results: object_results,
50 })
51}
52
53pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
55 let client = connect_apply_client(settings).await?;
56 let executor = DeploymentExecutor::new_dry_run(&client);
57 let mut plan_result = ApplyPlan::new();
58 let phase = plan(settings, &client, &executor).await?;
59 plan_result.add_phase(phase);
60
61 if !dry_run {
62 plan_result.execute(&client).await?;
63 }
64
65 Ok(plan_result)
66}
67
68async fn plan_network_policy(
71 client: &Client,
72 executor: &DeploymentExecutor<'_>,
73 def: &NetworkPolicyDefinition,
74) -> Result<ObjectResult, CliError> {
75 let policy_name = &def.name;
76
77 executor.take_statements();
79
80 let exists = client
82 .introspection()
83 .network_policy_exists(policy_name)
84 .await
85 .map_err(CliError::Connection)?;
86
87 let action = if exists {
88 let alter_stmt = AlterNetworkPolicyStatement {
90 name: def.create_stmt.name.clone(),
91 options: def.create_stmt.options.clone(),
92 };
93 executor.execute_sql(&alter_stmt).await?;
94 ObjectAction::Altered
95 } else {
96 executor.execute_sql(&def.create_stmt).await?;
97 ObjectAction::Created
98 };
99
100 grants::reconcile_named_object(
102 client,
103 executor,
104 policy_name,
105 &def.grants,
106 &grants::GrantNamedObjectKind::NetworkPolicy,
107 )
108 .await?;
109
110 for comment in &def.comments {
112 executor.execute_sql(comment).await?;
113 }
114
115 Ok(ObjectResult {
116 object: policy_name.clone(),
117 action,
118 statements: executor.take_statements(),
119 redacted_statements: vec![],
120 transaction_group: None,
121 })
122}