mz_deploy/project/analysis/
deployment_snapshot.rs1use std::collections::BTreeMap;
49use std::hash::{Hash, Hasher};
50
51use chrono::{DateTime, Utc};
52
53use crate::project::SchemaQualifier;
54use crate::project::ast::Statement;
55use sha2::{Digest, Sha256};
56
57use crate::client::{
58 Client, ConnectionError, DeploymentKind, DeploymentMode, DeploymentObjectRecord,
59 SchemaDeploymentRecord,
60};
61use crate::project::ir::{compiled, graph, object_id::ObjectId};
62
63struct Sha256Hasher {
67 digest: Sha256,
68}
69
70impl Sha256Hasher {
71 fn new() -> Self {
72 Self {
73 digest: Sha256::new(),
74 }
75 }
76
77 fn finalize(self) -> String {
78 let result = self.digest.finalize();
79 format!("sha256:{:x}", result)
80 }
81}
82
83impl Hasher for Sha256Hasher {
84 fn write(&mut self, bytes: &[u8]) {
85 self.digest.update(bytes);
86 }
87
88 fn finish(&self) -> u64 {
89 panic!("Sha256Hasher::finish() should not be called, use finalize() instead");
92 }
93}
94
95#[derive(Debug, Clone, serde::Serialize)]
101pub struct DeploymentSnapshot {
102 pub objects: BTreeMap<ObjectId, String>,
104 pub schemas: BTreeMap<SchemaQualifier, DeploymentKind>,
109}
110
111#[derive(Debug, Clone)]
113pub struct DeploymentMetadata {
114 pub deployed_by: String,
116 pub git_commit: Option<String>,
118}
119
120#[derive(Debug, thiserror::Error)]
122pub enum DeploymentSnapshotError {
123 #[error("failed to connect to database: {0}")]
124 Connection(#[from] ConnectionError),
125
126 #[error("failed to build snapshot from project graph: {0}")]
127 PlannedAccess(String),
128
129 #[error("invalid object FQN: {0}")]
130 InvalidFqn(String),
131
132 #[error("deployment '{environment}' already exists")]
133 DeploymentAlreadyExists { environment: String },
134
135 #[error("deployment '{environment}' not found")]
136 DeploymentNotFound { environment: String },
137
138 #[error("deployment '{environment}' has already been promoted")]
139 DeploymentAlreadyPromoted { environment: String },
140}
141
142impl Default for DeploymentSnapshot {
143 fn default() -> Self {
144 Self {
145 objects: BTreeMap::new(),
146 schemas: BTreeMap::new(),
147 }
148 }
149}
150
151pub(crate) fn compute_typed_hash(db_obj: &compiled::DatabaseObject) -> String {
158 let mut hasher = Sha256Hasher::new();
159
160 db_obj.stmt.hash(&mut hasher);
162
163 let mut indexes = db_obj.indexes.clone();
164
165 indexes.sort_by(|a, b| {
167 a.in_cluster
168 .cmp(&b.in_cluster)
169 .then(a.on_name.cmp(&b.on_name))
170 .then(a.name.cmp(&b.name))
171 .then_with(|| {
172 let key_a = a.key_parts.as_ref().map(|ks| {
173 ks.iter()
174 .map(|e| e.to_string())
175 .collect::<Vec<_>>()
176 .join(",")
177 });
178 let key_b = b.key_parts.as_ref().map(|ks| {
179 ks.iter()
180 .map(|e| e.to_string())
181 .collect::<Vec<_>>()
182 .join(",")
183 });
184
185 key_a.cmp(&key_b)
186 })
187 });
188
189 for index in &indexes {
191 index.hash(&mut hasher);
192 }
193
194 hasher.finalize()
195}
196
197pub(crate) fn build_snapshot_from_planned(
202 planned_project: &graph::Project,
203) -> Result<DeploymentSnapshot, DeploymentSnapshotError> {
204 let mut objects = BTreeMap::new();
205 let mut schemas = BTreeMap::new();
206
207 let sorted_objects = planned_project
209 .get_sorted_objects()
210 .map_err(|e| DeploymentSnapshotError::PlannedAccess(e.to_string()))?;
211
212 for (object_id, typed_obj) in sorted_objects {
215 match &typed_obj.stmt {
218 Statement::CreateTable(_)
219 | Statement::CreateTableFromSource(_)
220 | Statement::CreateSource(_)
221 | Statement::CreateSecret(_)
222 | Statement::CreateConnection(_) => continue,
223 _ => {}
224 }
225
226 let hash = compute_typed_hash(typed_obj);
227 objects.insert(object_id.clone(), hash);
228
229 let sq = SchemaQualifier::new(
233 object_id.expect_database().to_string(),
234 object_id.schema().to_string(),
235 );
236 let kind = if planned_project.replacement_schemas.contains(&sq) {
237 DeploymentKind::Replacement
238 } else {
239 DeploymentKind::Objects
240 };
241 schemas.entry(sq).or_insert(kind);
242 }
243
244 Ok(DeploymentSnapshot { objects, schemas })
245}
246
247pub(crate) async fn load_from_database(
256 client: &Client,
257 environment: Option<&str>,
258) -> Result<DeploymentSnapshot, DeploymentSnapshotError> {
259 let deployment_snapshot = client
260 .deployments()
261 .get_deployment_objects(environment)
262 .await
263 .map_err(DeploymentSnapshotError::Connection)?;
264
265 Ok(deployment_snapshot)
266}
267
268pub(crate) async fn write_to_database(
282 client: &Client,
283 snapshot: &DeploymentSnapshot,
284 deploy_id: &str,
285 metadata: &DeploymentMetadata,
286 promoted_at: Option<DateTime<Utc>>,
287 mode: DeploymentMode,
288) -> Result<(), DeploymentSnapshotError> {
289 let now = Utc::now();
290
291 let mut schema_records = Vec::new();
293 for (sq, kind) in &snapshot.schemas {
294 schema_records.push(SchemaDeploymentRecord {
295 deploy_id: deploy_id.to_string(),
296 database: sq.database.clone(),
297 schema: sq.schema.clone(),
298 deployed_at: now,
299 deployed_by: metadata.deployed_by.clone(),
300 promoted_at,
301 git_commit: metadata.git_commit.clone(),
302 kind: *kind,
303 mode,
304 });
305 }
306
307 let mut object_records = Vec::new();
309 for (object_id, hash) in &snapshot.objects {
310 object_records.push(DeploymentObjectRecord {
311 deploy_id: deploy_id.to_string(),
312 database: object_id.expect_database().to_string(),
313 schema: object_id.schema().to_string(),
314 object: object_id.object().to_string(),
315 object_hash: hash.clone(),
316 deployed_at: now,
317 });
318 }
319
320 client
322 .deployments()
323 .insert_schema_deployments(&schema_records)
324 .await?;
325 client
326 .deployments()
327 .append_deployment_objects(&object_records)
328 .await?;
329
330 Ok(())
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use std::collections::BTreeSet;
337
338 #[mz_ore::test]
339 fn test_empty_snapshot() {
340 let snapshot = DeploymentSnapshot::default();
341 assert!(snapshot.objects.is_empty());
342 }
343
344 fn make_typed_object(sql: &str) -> compiled::DatabaseObject {
346 let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
347 let stmt = match parsed.into_iter().next().unwrap().ast {
348 mz_sql_parser::ast::Statement::CreateView(s) => Statement::CreateView(s),
349 mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
350 Statement::CreateMaterializedView(s)
351 }
352 mz_sql_parser::ast::Statement::CreateTable(s) => Statement::CreateTable(s),
353 mz_sql_parser::ast::Statement::CreateSource(s) => Statement::CreateSource(s),
354 mz_sql_parser::ast::Statement::CreateConnection(s) => Statement::CreateConnection(s),
355 mz_sql_parser::ast::Statement::CreateSecret(s) => Statement::CreateSecret(s),
356 mz_sql_parser::ast::Statement::CreateSink(s) => Statement::CreateSink(s),
357 other => panic!("Unexpected statement type: {:?}", other),
358 };
359 compiled::DatabaseObject {
360 path: std::path::PathBuf::from("test.sql"),
361 stmt,
362 indexes: vec![],
363 grants: vec![],
364 comments: vec![],
365 tests: vec![],
366 }
367 }
368
369 fn make_planned_project(
371 objects: Vec<(&str, &str, &str, compiled::DatabaseObject)>,
372 ) -> graph::Project {
373 let mut db_map: BTreeMap<String, BTreeMap<String, Vec<compiled::DatabaseObject>>> =
374 BTreeMap::new();
375 for (database, schema, _name, typed_obj) in objects {
376 db_map
377 .entry(database.to_string())
378 .or_default()
379 .entry(schema.to_string())
380 .or_default()
381 .push(typed_obj);
382 }
383 let databases: Vec<compiled::Database> = db_map
384 .into_iter()
385 .map(|(db_name, schemas)| compiled::Database {
386 name: db_name,
387 schemas: schemas
388 .into_iter()
389 .map(|(schema_name, objs)| compiled::Schema {
390 name: schema_name,
391 objects: objs,
392 mod_statements: None,
393 })
394 .collect(),
395 mod_statements: None,
396 })
397 .collect();
398 let typed_project = compiled::Project {
399 databases,
400 replacement_schemas: BTreeSet::new(),
401 };
402 graph::Project::from(typed_project)
403 }
404
405 #[cfg_attr(miri, ignore)] #[mz_ore::test]
407 fn test_apply_managed_objects_excluded_from_snapshot() {
408 let view_obj = make_typed_object("CREATE VIEW my_view AS SELECT 1");
409 let mv_obj = make_typed_object("CREATE MATERIALIZED VIEW my_mv IN CLUSTER c AS SELECT 1");
410 let table_obj = make_typed_object("CREATE TABLE my_table (id INT)");
411 let source_obj = make_typed_object(
412 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
413 );
414 let conn_obj =
415 make_typed_object("CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')");
416 let secret_obj = make_typed_object("CREATE SECRET my_secret AS 'hunter2'");
417
418 let planned_project = make_planned_project(vec![
419 ("db", "public", "my_view", view_obj),
420 ("db", "public", "my_mv", mv_obj),
421 ("db", "storage", "my_table", table_obj),
422 ("db", "storage", "my_source", source_obj),
423 ("db", "storage", "my_conn", conn_obj),
424 ("db", "storage", "my_secret", secret_obj),
425 ]);
426
427 let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
428
429 let object_names: Vec<&str> = snapshot.objects.keys().map(|id| id.object()).collect();
431 assert_eq!(
432 object_names,
433 vec!["my_mv", "my_view"],
434 "Only views and MVs should appear in snapshot, got: {:?}",
435 object_names
436 );
437
438 for name in &["my_table", "my_source", "my_conn", "my_secret"] {
440 assert!(
441 !snapshot.objects.keys().any(|id| id.object() == *name),
442 "{} should not be in the snapshot",
443 name
444 );
445 }
446
447 assert!(
449 snapshot
450 .schemas
451 .contains_key(&SchemaQualifier::new("db".into(), "public".into()))
452 );
453 assert!(
455 !snapshot
456 .schemas
457 .contains_key(&SchemaQualifier::new("db".into(), "storage".into())),
458 "Schema with only apply-managed objects should not appear in snapshot"
459 );
460 }
461}