Skip to main content

mz_deploy/project/analysis/
deployment_snapshot.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment snapshot tracking.
11//!
12//! This module provides functionality for capturing and comparing deployment state snapshots.
13//! Instead of hashing source files, it hashes normalized compiled objects so
14//! formatting and comment changes don't trigger unnecessary redeployments.
15//!
16//! A deployment snapshot captures the state of all deployed objects with their content hashes,
17//! enabling change detection (like git diff but for database objects) and supporting
18//! blue/green deployment workflows.
19//!
20//! ## What's Hashed
21//!
22//! Content hashes are computed by [`compute_typed_hash`] from the **compiled AST
23//! representation** of each object, not the source SQL file contents. The hash
24//! includes:
25//! - The main CREATE statement (via `Hash` on the AST node)
26//! - All indexes (sorted deterministically by cluster, on_name, name, key_parts)
27//!
28//! Hashing uses SHA-256 (via a `Sha256Hasher` bridge to `std::hash::Hash`)
29//! for cross-platform stability. The output format is `sha256:<hex>`.
30//!
31//! **Key Insight:** Because the hash is computed from the parsed AST, changes
32//! that are syntactically equivalent (whitespace, comment edits, identifier
33//! casing) produce identical hashes and do not trigger redeployment.
34//!
35//! ## Apply-Managed Exclusions
36//!
37//! Tables, table-from-source, sources, secrets, and connections are excluded
38//! from snapshots by [`build_snapshot_from_planned`] because they are managed
39//! by the `apply` command path and should not participate in deployment hash
40//! change detection.
41//!
42//! ## Snapshot Storage
43//!
44//! Snapshots are persisted in the `_mz_deploy` database:
45//! - `_mz_deploy.public.deployments` — per-schema deployment metadata
46//! - `_mz_deploy.public.objects` — per-object content hashes (append-only)
47
48use std::collections::BTreeMap;
49use std::hash::{Hash, Hasher};
50
51use chrono::{DateTime, Utc};
52
53use crate::project::SchemaQualifier;
54use crate::project::ast::Statement;
55use sha2::{Digest, Sha256};
56
57use crate::client::{
58    Client, ConnectionError, DeploymentKind, DeploymentMode, DeploymentObjectRecord,
59    SchemaDeploymentRecord,
60};
61use crate::project::ir::{compiled, graph, object_id::ObjectId};
62
63/// A wrapper that bridges `std::hash::Hasher` to `sha2::Digest`.
64///
65/// This allows us to use the `Hash` trait on AST nodes while using SHA256 for stability.
66struct Sha256Hasher {
67    digest: Sha256,
68}
69
70impl Sha256Hasher {
71    fn new() -> Self {
72        Self {
73            digest: Sha256::new(),
74        }
75    }
76
77    fn finalize(self) -> String {
78        let result = self.digest.finalize();
79        format!("sha256:{:x}", result)
80    }
81}
82
83impl Hasher for Sha256Hasher {
84    fn write(&mut self, bytes: &[u8]) {
85        self.digest.update(bytes);
86    }
87
88    fn finish(&self) -> u64 {
89        // This is never called when using Hash trait, but required by trait
90        // We use finalize() instead to get the full hash
91        panic!("Sha256Hasher::finish() should not be called, use finalize() instead");
92    }
93}
94
95/// Represents a point-in-time snapshot of deployment state.
96///
97/// Maps object IDs to their content hashes, where the hash is computed from
98/// the normalized compiled representation (not source file contents).
99/// Also tracks which schemas were deployed as atomic units.
100#[derive(Debug, Clone, serde::Serialize)]
101pub struct DeploymentSnapshot {
102    /// Map of ObjectId to content hash
103    pub objects: BTreeMap<ObjectId, String>,
104    /// Map of (database, schema) to deployment kind
105    /// - Objects: Regular schemas containing views/MVs that need swapping
106    /// - Sinks: Schemas containing only sinks (no swap needed, sinks created after swap)
107    /// - Tables: Schemas containing only tables
108    pub schemas: BTreeMap<SchemaQualifier, DeploymentKind>,
109}
110
111/// Metadata collected during deployment.
112#[derive(Debug, Clone)]
113pub struct DeploymentMetadata {
114    /// Materialize user/role that performed the deployment
115    pub deployed_by: String,
116    /// Git commit hash if the project is in a git repository
117    pub git_commit: Option<String>,
118}
119
120/// Error types for deployment snapshot operations.
121#[derive(Debug, thiserror::Error)]
122pub enum DeploymentSnapshotError {
123    #[error("failed to connect to database: {0}")]
124    Connection(#[from] ConnectionError),
125
126    #[error("failed to build snapshot from project graph: {0}")]
127    PlannedAccess(String),
128
129    #[error("invalid object FQN: {0}")]
130    InvalidFqn(String),
131
132    #[error("deployment '{environment}' already exists")]
133    DeploymentAlreadyExists { environment: String },
134
135    #[error("deployment '{environment}' not found")]
136    DeploymentNotFound { environment: String },
137
138    #[error("deployment '{environment}' has already been promoted")]
139    DeploymentAlreadyPromoted { environment: String },
140}
141
142impl Default for DeploymentSnapshot {
143    fn default() -> Self {
144        Self {
145            objects: BTreeMap::new(),
146            schemas: BTreeMap::new(),
147        }
148    }
149}
150
151/// Compute a deterministic hash of a typed DatabaseObject.
152/// The hash includes:
153/// - The main CREATE statement
154/// - All indexes
155///
156/// Uses SHA256 for stable, deterministic hashing across platforms and Rust versions.
157pub(crate) fn compute_typed_hash(db_obj: &compiled::DatabaseObject) -> String {
158    let mut hasher = Sha256Hasher::new();
159
160    // Hash the main statement directly using its Hash implementation
161    db_obj.stmt.hash(&mut hasher);
162
163    let mut indexes = db_obj.indexes.clone();
164
165    // Ensure hash is stable by sorting indexes deterministically
166    indexes.sort_by(|a, b| {
167        a.in_cluster
168            .cmp(&b.in_cluster)
169            .then(a.on_name.cmp(&b.on_name))
170            .then(a.name.cmp(&b.name))
171            .then_with(|| {
172                let key_a = a.key_parts.as_ref().map(|ks| {
173                    ks.iter()
174                        .map(|e| e.to_string())
175                        .collect::<Vec<_>>()
176                        .join(",")
177                });
178                let key_b = b.key_parts.as_ref().map(|ks| {
179                    ks.iter()
180                        .map(|e| e.to_string())
181                        .collect::<Vec<_>>()
182                        .join(",")
183                });
184
185                key_a.cmp(&key_b)
186            })
187    });
188
189    // Hash all indexes directly using their Hash implementation
190    for index in &indexes {
191        index.hash(&mut hasher);
192    }
193
194    hasher.finalize()
195}
196
197/// Build a deployment snapshot from a project graph by hashing all compiled objects.
198///
199/// This iterates through all objects in the project and computes their
200/// content hashes based on the normalized compiled representation.
201pub(crate) fn build_snapshot_from_planned(
202    planned_project: &graph::Project,
203) -> Result<DeploymentSnapshot, DeploymentSnapshotError> {
204    let mut objects = BTreeMap::new();
205    let mut schemas = BTreeMap::new();
206
207    // Get all objects in topological order
208    let sorted_objects = planned_project
209        .get_sorted_objects()
210        .map_err(|e| DeploymentSnapshotError::PlannedAccess(e.to_string()))?;
211
212    // Compute hash for each object and collect schemas
213    // Default to Objects kind - callers can override for specific schemas
214    for (object_id, typed_obj) in sorted_objects {
215        // Skip apply-managed objects — they are never recorded by record_stage_metadata
216        // and should not participate in deployment hash change detection.
217        match &typed_obj.stmt {
218            Statement::CreateTable(_)
219            | Statement::CreateTableFromSource(_)
220            | Statement::CreateSource(_)
221            | Statement::CreateSecret(_)
222            | Statement::CreateConnection(_) => continue,
223            _ => {}
224        }
225
226        let hash = compute_typed_hash(typed_obj);
227        objects.insert(object_id.clone(), hash);
228
229        // Track which schema this object belongs to.
230        // Schemas marked as replacement in the project config get Replacement kind;
231        // all others default to Objects.
232        let sq = SchemaQualifier::new(
233            object_id.expect_database().to_string(),
234            object_id.schema().to_string(),
235        );
236        let kind = if planned_project.replacement_schemas.contains(&sq) {
237            DeploymentKind::Replacement
238        } else {
239            DeploymentKind::Objects
240        };
241        schemas.entry(sq).or_insert(kind);
242    }
243
244    Ok(DeploymentSnapshot { objects, schemas })
245}
246
247/// Load the current deployment state snapshot from the database for a specific environment.
248///
249/// # Arguments
250/// * `client` - Database client connection
251/// * `environment` - None for production, Some("staging") for staging environments
252///
253/// # Returns
254/// DeploymentSnapshot with current deployment state, or empty snapshot if no deployments exist
255pub(crate) async fn load_from_database(
256    client: &Client,
257    environment: Option<&str>,
258) -> Result<DeploymentSnapshot, DeploymentSnapshotError> {
259    let deployment_snapshot = client
260        .deployments()
261        .get_deployment_objects(environment)
262        .await
263        .map_err(DeploymentSnapshotError::Connection)?;
264
265    Ok(deployment_snapshot)
266}
267
268/// Write deployment snapshot to the database using the normalized schema.
269///
270/// This writes to both deployments and objects tables.
271/// Schema deployments are inserted (no delete), while object deployments
272/// are appended (insert-only history).
273///
274/// # Arguments
275/// * `client` - Database client connection
276/// * `snapshot` - The deployment snapshot to write (includes per-schema deployment kind)
277/// * `deploy_id` - Deploy ID (e.g., `"<init>"` for direct deploy, `"staging"` for staged)
278/// * `metadata` - Deployment metadata (user, git commit, etc.)
279/// * `promoted_at` - Optional promoted_at timestamp (Some(now) for direct apply, None for stage)
280/// * `mode` - Deployment mode tag stored on each row
281pub(crate) async fn write_to_database(
282    client: &Client,
283    snapshot: &DeploymentSnapshot,
284    deploy_id: &str,
285    metadata: &DeploymentMetadata,
286    promoted_at: Option<DateTime<Utc>>,
287    mode: DeploymentMode,
288) -> Result<(), DeploymentSnapshotError> {
289    let now = Utc::now();
290
291    // Build schema deployment records (kind is now per-schema from the snapshot)
292    let mut schema_records = Vec::new();
293    for (sq, kind) in &snapshot.schemas {
294        schema_records.push(SchemaDeploymentRecord {
295            deploy_id: deploy_id.to_string(),
296            database: sq.database.clone(),
297            schema: sq.schema.clone(),
298            deployed_at: now,
299            deployed_by: metadata.deployed_by.clone(),
300            promoted_at,
301            git_commit: metadata.git_commit.clone(),
302            kind: *kind,
303            mode,
304        });
305    }
306
307    // Build deployment object records
308    let mut object_records = Vec::new();
309    for (object_id, hash) in &snapshot.objects {
310        object_records.push(DeploymentObjectRecord {
311            deploy_id: deploy_id.to_string(),
312            database: object_id.expect_database().to_string(),
313            schema: object_id.schema().to_string(),
314            object: object_id.object().to_string(),
315            object_hash: hash.clone(),
316            deployed_at: now,
317        });
318    }
319
320    // Write to database
321    client
322        .deployments()
323        .insert_schema_deployments(&schema_records)
324        .await?;
325    client
326        .deployments()
327        .append_deployment_objects(&object_records)
328        .await?;
329
330    Ok(())
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use std::collections::BTreeSet;
337
338    #[mz_ore::test]
339    fn test_empty_snapshot() {
340        let snapshot = DeploymentSnapshot::default();
341        assert!(snapshot.objects.is_empty());
342    }
343
344    /// Parse a single SQL statement into a compiled::DatabaseObject.
345    fn make_typed_object(sql: &str) -> compiled::DatabaseObject {
346        let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
347        let stmt = match parsed.into_iter().next().unwrap().ast {
348            mz_sql_parser::ast::Statement::CreateView(s) => Statement::CreateView(s),
349            mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
350                Statement::CreateMaterializedView(s)
351            }
352            mz_sql_parser::ast::Statement::CreateTable(s) => Statement::CreateTable(s),
353            mz_sql_parser::ast::Statement::CreateSource(s) => Statement::CreateSource(s),
354            mz_sql_parser::ast::Statement::CreateConnection(s) => Statement::CreateConnection(s),
355            mz_sql_parser::ast::Statement::CreateSecret(s) => Statement::CreateSecret(s),
356            mz_sql_parser::ast::Statement::CreateSink(s) => Statement::CreateSink(s),
357            other => panic!("Unexpected statement type: {:?}", other),
358        };
359        compiled::DatabaseObject {
360            path: std::path::PathBuf::from("test.sql"),
361            stmt,
362            indexes: vec![],
363            grants: vec![],
364            comments: vec![],
365            tests: vec![],
366        }
367    }
368
369    /// Build a graph::Project from (database, schema, name, typed_obj) tuples.
370    fn make_planned_project(
371        objects: Vec<(&str, &str, &str, compiled::DatabaseObject)>,
372    ) -> graph::Project {
373        let mut db_map: BTreeMap<String, BTreeMap<String, Vec<compiled::DatabaseObject>>> =
374            BTreeMap::new();
375        for (database, schema, _name, typed_obj) in objects {
376            db_map
377                .entry(database.to_string())
378                .or_default()
379                .entry(schema.to_string())
380                .or_default()
381                .push(typed_obj);
382        }
383        let databases: Vec<compiled::Database> = db_map
384            .into_iter()
385            .map(|(db_name, schemas)| compiled::Database {
386                name: db_name,
387                schemas: schemas
388                    .into_iter()
389                    .map(|(schema_name, objs)| compiled::Schema {
390                        name: schema_name,
391                        objects: objs,
392                        mod_statements: None,
393                    })
394                    .collect(),
395                mod_statements: None,
396            })
397            .collect();
398        let typed_project = compiled::Project {
399            databases,
400            replacement_schemas: BTreeSet::new(),
401        };
402        graph::Project::from(typed_project)
403    }
404
405    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
406    #[mz_ore::test]
407    fn test_apply_managed_objects_excluded_from_snapshot() {
408        let view_obj = make_typed_object("CREATE VIEW my_view AS SELECT 1");
409        let mv_obj = make_typed_object("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c AS SELECT 1");
410        let table_obj = make_typed_object("CREATE TABLE my_table (id INT)");
411        let source_obj = make_typed_object(
412            "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
413        );
414        let conn_obj =
415            make_typed_object("CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')");
416        let secret_obj = make_typed_object("CREATE SECRET my_secret AS 'hunter2'");
417
418        let planned_project = make_planned_project(vec![
419            ("db", "public", "my_view", view_obj),
420            ("db", "public", "my_mv", mv_obj),
421            ("db", "storage", "my_table", table_obj),
422            ("db", "storage", "my_source", source_obj),
423            ("db", "storage", "my_conn", conn_obj),
424            ("db", "storage", "my_secret", secret_obj),
425        ]);
426
427        let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
428
429        // Only the view and MV should be in the snapshot
430        let object_names: Vec<&str> = snapshot.objects.keys().map(|id| id.object()).collect();
431        assert_eq!(
432            object_names,
433            vec!["my_mv", "my_view"],
434            "Only views and MVs should appear in snapshot, got: {:?}",
435            object_names
436        );
437
438        // Apply-managed objects should NOT be present
439        for name in &["my_table", "my_source", "my_conn", "my_secret"] {
440            assert!(
441                !snapshot.objects.keys().any(|id| id.object() == *name),
442                "{} should not be in the snapshot",
443                name
444            );
445        }
446
447        // Schema tracking should still include the public schema (has view/MV)
448        assert!(
449            snapshot
450                .schemas
451                .contains_key(&SchemaQualifier::new("db".into(), "public".into()))
452        );
453        // Storage schema should NOT be tracked (all its objects are apply-managed)
454        assert!(
455            !snapshot
456                .schemas
457                .contains_key(&SchemaQualifier::new("db".into(), "storage".into())),
458            "Schema with only apply-managed objects should not appear in snapshot"
459        );
460    }
461}