Skip to main content

mz_deploy/cli/commands/
apply_sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apply sources command - create sources that don't exist in the database.
11
12use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::{
16    ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult,
17    compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use std::collections::BTreeSet;
24
25const PHASE_NAME: &str = "sources";
26const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Source;
27
28fn matches(stmt: &Statement) -> bool {
29    matches!(stmt, Statement::CreateSource(_))
30}
31
32/// Plan only source objects (no deployment tracking, no execution).
33pub async fn plan(
34    _settings: &Settings,
35    client: &Client,
36    executor: &DeploymentExecutor<'_>,
37    planned_project: &project::ir::graph::Project,
38    apply_plan: &mut ApplyPlan,
39) -> Result<ApplyResult, CliError> {
40    let mut target_ids = BTreeSet::new();
41    for obj in planned_project.iter_objects() {
42        if matches(&obj.typed_object.stmt) {
43            target_ids.insert(obj.id.clone());
44        }
45    }
46
47    if target_ids.is_empty() {
48        return Ok(ApplyResult {
49            phase: PHASE_NAME.to_string(),
50            results: vec![],
51        });
52    }
53
54    let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
55    let existing = client
56        .introspection()
57        .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
58        .await
59        .map_err(CliError::Connection)?;
60
61    let schemas: BTreeSet<_> = target_objects
62        .iter()
63        .filter(|(obj_id, _)| !existing.contains(obj_id))
64        .map(|(obj_id, _)| {
65            project::SchemaQualifier::new(
66                obj_id.expect_database().to_string(),
67                obj_id.schema().to_string(),
68            )
69        })
70        .collect();
71    apply_plan
72        .prepare_schemas(executor, planned_project, &schemas)
73        .await?;
74
75    let mut results = Vec::new();
76
77    for (obj_id, typed_obj) in target_objects {
78        executor.take_statements();
79
80        let action = if existing.contains(&obj_id) {
81            apply_objects::reconcile_grants_and_comments(
82                client,
83                executor,
84                &obj_id,
85                typed_obj,
86                &GRANT_KIND,
87            )
88            .await?;
89            ObjectAction::UpToDate
90        } else {
91            executor.execute_sql(&typed_obj.stmt).await?;
92            for index in &typed_obj.indexes {
93                executor.execute_sql(index).await?;
94            }
95            apply_objects::reconcile_grants_and_comments(
96                client,
97                executor,
98                &obj_id,
99                typed_obj,
100                &GRANT_KIND,
101            )
102            .await?;
103            ObjectAction::Created
104        };
105
106        results.push(ObjectResult {
107            object: obj_id.to_string(),
108            action,
109            statements: executor.take_statements(),
110            redacted_statements: vec![],
111            transaction_group: None,
112        });
113    }
114
115    Ok(ApplyResult {
116        phase: PHASE_NAME.to_string(),
117        results,
118    })
119}
120
121/// Run the `apply sources` command: compile, plan, optionally execute.
122pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
123    let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
124    let mut apply_plan = ApplyPlan::new();
125    let executor = DeploymentExecutor::new_dry_run(&client);
126    let phase = plan(
127        settings,
128        &client,
129        &executor,
130        &planned_project,
131        &mut apply_plan,
132    )
133    .await?;
134    apply_plan.add_phase(phase);
135
136    if !dry_run {
137        apply_plan.execute(&client).await?;
138    }
139
140    Ok(apply_plan)
141}