1use crate::cli::CliError;
35use crate::cli::git::get_git_commit;
36use crate::client::{Client, ClusterConfig, quote_identifier};
37use crate::config::Settings;
38use crate::project::analysis::deployment_snapshot::DeploymentMetadata;
39use crate::project::ir::graph::Project;
40use crate::project::resolve::normalize;
41use crate::project::{self, ir::compiled};
42use crate::{info, verbose};
43use owo_colors::{OwoColorize, Stream};
44use serde::Serialize;
45use std::cell::RefCell;
46use std::collections::BTreeSet;
47use std::fmt;
48use std::path::Path;
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
52#[serde(rename_all = "snake_case")]
53pub enum ObjectAction {
54 Created,
55 Altered,
56 UpToDate,
57 Skipped,
58}
59
60impl fmt::Display for ObjectAction {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 ObjectAction::Created => write!(f, "created"),
64 ObjectAction::Altered => write!(f, "altered"),
65 ObjectAction::UpToDate => write!(f, "up_to_date"),
66 ObjectAction::Skipped => write!(f, "skipped"),
67 }
68 }
69}
70
71#[derive(Clone, Serialize)]
73pub struct ObjectResult {
74 pub object: String,
76 pub action: ObjectAction,
78 pub statements: Vec<String>,
80 #[serde(skip)]
83 pub redacted_statements: Vec<String>,
84 #[serde(skip_serializing_if = "Option::is_none")]
87 pub transaction_group: Option<String>,
88 #[serde(skip_serializing_if = "Vec::is_empty")]
94 pub post_statements: Vec<String>,
95}
96
97impl fmt::Display for ObjectResult {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 match self.action {
100 ObjectAction::Created | ObjectAction::Altered => {
101 write!(
102 f,
103 " {} {}",
104 "✓".if_supports_color(Stream::Stderr, |t| t.green()),
105 self.object
106 )
107 }
108 ObjectAction::UpToDate => write!(
109 f,
110 " {} {} ({})",
111 "=".if_supports_color(Stream::Stderr, |t| t.dimmed()),
112 self.object,
113 "up to date".if_supports_color(Stream::Stderr, |t| t.dimmed())
114 ),
115 ObjectAction::Skipped => write!(
116 f,
117 " {} {} ({})",
118 "-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
119 self.object,
120 "skipped".if_supports_color(Stream::Stderr, |t| t.dimmed())
121 ),
122 }
123 }
124}
125
126#[derive(Clone, Serialize)]
128pub struct ApplyResult {
129 pub phase: String,
131 pub results: Vec<ObjectResult>,
133}
134
135impl fmt::Display for ApplyResult {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 if self.results.is_empty() {
138 return Ok(());
139 }
140
141 let created = self
142 .results
143 .iter()
144 .filter(|r| r.action == ObjectAction::Created)
145 .count();
146 let altered = self
147 .results
148 .iter()
149 .filter(|r| r.action == ObjectAction::Altered)
150 .count();
151 let up_to_date = self
152 .results
153 .iter()
154 .filter(|r| r.action == ObjectAction::UpToDate)
155 .count();
156
157 let label = &self.phase;
158 let mut lines = Vec::new();
159
160 if up_to_date > 0 && created == 0 && altered == 0 {
161 lines.push(format!(
162 " {} {} {} up to date",
163 "✓".if_supports_color(Stream::Stderr, |t| t.green()),
164 up_to_date,
165 label
166 ));
167 }
168
169 for r in &self.results {
170 if r.action != ObjectAction::UpToDate {
171 lines.push(format!("{}", r));
172 }
173 }
174
175 write!(f, "{}", lines.join("\n"))
176 }
177}
178
179#[derive(Serialize)]
182pub struct ApplyPlan {
183 pub setup_statements: Vec<String>,
186 pub phases: Vec<ApplyResult>,
188 #[serde(skip)]
190 prepared_schemas: BTreeSet<project::SchemaQualifier>,
191}
192
193struct ExecutionBatch<'a> {
195 transaction_group: Option<&'a str>,
197 objects: Vec<&'a ObjectResult>,
199}
200
201impl ApplyPlan {
202 pub fn new() -> Self {
203 Self {
204 setup_statements: Vec::new(),
205 phases: Vec::new(),
206 prepared_schemas: BTreeSet::new(),
207 }
208 }
209
210 pub async fn prepare_schemas(
213 &mut self,
214 executor: &DeploymentExecutor<'_>,
215 planned_project: &Project,
216 schema_set: &BTreeSet<project::SchemaQualifier>,
217 ) -> Result<(), CliError> {
218 let new_schemas: BTreeSet<_> = schema_set
219 .difference(&self.prepared_schemas)
220 .cloned()
221 .collect();
222 if new_schemas.is_empty() {
223 return Ok(());
224 }
225 executor
226 .prepare_databases_and_schemas(planned_project, &new_schemas, None)
227 .await?;
228 self.setup_statements.extend(executor.take_statements());
229 self.prepared_schemas.extend(new_schemas);
230 Ok(())
231 }
232
233 pub fn add_phase(&mut self, result: ApplyResult) {
235 self.phases.push(result);
236 }
237
238 pub async fn execute(&self, client: &Client) -> Result<(), CliError> {
244 for sql in &self.setup_statements {
246 client
247 .execute(sql, &[])
248 .await
249 .map_err(|source| CliError::SqlExecutionFailed {
250 statement: sql.clone(),
251 source,
252 })?;
253 }
254
255 let mut batches: Vec<ExecutionBatch<'_>> = Vec::new();
257 for phase in &self.phases {
258 for obj in &phase.results {
259 let obj_txn = obj.transaction_group.as_deref();
260 match batches.last_mut() {
261 Some(batch) if batch.transaction_group == obj_txn && obj_txn.is_some() => {
262 batch.objects.push(obj);
264 }
265 _ => {
266 batches.push(ExecutionBatch {
268 transaction_group: obj_txn,
269 objects: vec![obj],
270 });
271 }
272 }
273 }
274 }
275
276 for batch in &batches {
278 let in_txn = batch.transaction_group.is_some();
279 if in_txn {
280 client.execute("BEGIN", &[]).await.map_err(|source| {
281 CliError::SqlExecutionFailed {
282 statement: "BEGIN".to_string(),
283 source,
284 }
285 })?;
286 }
287
288 for obj in &batch.objects {
289 for sql in obj.redacted_statements.iter().chain(&obj.statements) {
290 if let Err(e) = client.execute(sql, &[]).await {
291 if in_txn {
292 let _ = client.execute("ROLLBACK", &[]).await;
293 }
294 return Err(CliError::SqlExecutionFailed {
295 statement: if obj.redacted_statements.contains(sql) {
296 "[REDACTED — contains secret value]".to_string()
297 } else {
298 sql.clone()
299 },
300 source: e,
301 });
302 }
303 }
304 }
305
306 if in_txn {
307 client.execute("COMMIT", &[]).await.map_err(|source| {
308 CliError::SqlExecutionFailed {
309 statement: "COMMIT".to_string(),
310 source,
311 }
312 })?;
313 }
314
315 for obj in &batch.objects {
316 for sql in &obj.post_statements {
317 client.execute(sql, &[]).await.map_err(|source| {
318 CliError::SqlExecutionFailed {
319 statement: sql.clone(),
320 source,
321 }
322 })?;
323 }
324 }
325 }
326
327 Ok(())
328 }
329}
330
331impl fmt::Display for ApplyPlan {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 let mut first = true;
334 for phase in &self.phases {
335 if phase.results.is_empty() {
336 continue;
337 }
338 if !first {
339 writeln!(f)?;
340 }
341 write!(f, "{}", phase)?;
342 first = false;
343 }
344 Ok(())
345 }
346}
347
348pub async fn collect_deployment_metadata(client: &Client, directory: &Path) -> DeploymentMetadata {
354 let deployed_by = client
355 .introspection()
356 .get_current_user()
357 .await
358 .unwrap_or_else(|e| {
359 info!("warning: failed to get current user: {}", e);
360 "unknown".to_string()
361 });
362
363 let git_commit = get_git_commit(directory);
364
365 DeploymentMetadata {
366 deployed_by,
367 git_commit,
368 }
369}
370
371pub async fn connect_apply_client(settings: &Settings) -> Result<Client, CliError> {
375 let client = Client::connect_with_profile(settings.connection().clone())
376 .await
377 .map_err(CliError::Connection)?;
378 let role =
379 crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
380 crate::cli::commands::setup::require_deployer(role)?;
381 Ok(client)
382}
383
384pub async fn compile_apply_project_and_connect(
387 settings: &Settings,
388) -> Result<(Project, Client), CliError> {
389 let planned_project = crate::cli::commands::compile::run_without_typecheck(
390 settings,
391 !crate::log::json_output_enabled(),
392 )
393 .await?;
394 let client = connect_apply_client(settings).await?;
395 Ok((planned_project, client))
396}
397
398pub fn generate_random_env_name() -> String {
401 use sha2::{Digest, Sha256};
402 use std::time::SystemTime;
403
404 let now = SystemTime::now()
405 .duration_since(SystemTime::UNIX_EPOCH)
406 .expect("system time before Unix epoch")
407 .as_nanos();
408
409 let mut hasher = Sha256::new();
410 hasher.update(now.to_le_bytes());
411 let hash = hasher.finalize();
412
413 format!(
415 "{:07x}",
416 u32::from_le_bytes([hash[0], hash[1], hash[2], hash[3]]) & 0xFFFFFFF
417 )
418}
419
420pub struct DeploymentExecutor<'a> {
430 client: &'a Client,
431 dry_run: bool,
432 statement_log: RefCell<Vec<String>>,
433}
434
435impl<'a> DeploymentExecutor<'a> {
436 pub fn new(client: &'a Client) -> Self {
438 Self {
439 client,
440 dry_run: false,
441 statement_log: RefCell::new(Vec::new()),
442 }
443 }
444
445 pub fn new_dry_run(client: &'a Client) -> Self {
447 Self {
448 client,
449 dry_run: true,
450 statement_log: RefCell::new(Vec::new()),
451 }
452 }
453
454 pub fn with_dry_run(client: &'a Client, dry_run: bool) -> Self {
456 Self {
457 client,
458 dry_run,
459 statement_log: RefCell::new(Vec::new()),
460 }
461 }
462
463 pub fn is_dry_run(&self) -> bool {
465 self.dry_run
466 }
467
468 pub fn take_statements(&self) -> Vec<String> {
473 self.statement_log.borrow_mut().drain(..).collect()
474 }
475
476 pub async fn execute_object(
481 &self,
482 typed_obj: &compiled::DatabaseObject,
483 ) -> Result<(), CliError> {
484 self.execute_sql(&typed_obj.stmt).await?;
486
487 for index in &typed_obj.indexes {
489 self.execute_sql(index).await?;
490 }
491
492 for grant in &typed_obj.grants {
494 self.execute_sql(grant).await?;
495 }
496
497 for comment in &typed_obj.comments {
499 self.execute_sql(comment).await?;
500 }
501
502 Ok(())
503 }
504
505 pub async fn prepare_databases_and_schemas(
510 &self,
511 planned_project: &Project,
512 schema_set: &BTreeSet<project::SchemaQualifier>,
513 staging_suffix: Option<&str>,
514 ) -> Result<(), CliError> {
515 if schema_set.is_empty() {
516 return Ok(());
517 }
518
519 let databases: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
521 for db in &databases {
522 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(db));
523 self.execute_sql(&sql).await?;
524 }
525
526 for sq in schema_set {
528 let schema_name = match staging_suffix {
529 Some(suffix) => format!("{}{}", sq.schema, suffix),
530 None => sq.schema.clone(),
531 };
532 verbose!(
533 "Creating schema {}.{} if not exists",
534 sq.database,
535 schema_name
536 );
537 let sql = format!(
538 "CREATE SCHEMA IF NOT EXISTS {}.{}",
539 quote_identifier(&sq.database),
540 quote_identifier(&schema_name)
541 );
542 self.execute_sql(&sql).await?;
543 }
544
545 for mod_stmt in planned_project.iter_mod_statements() {
547 match mod_stmt {
548 project::ModStatement::Database {
549 database,
550 statement,
551 } => {
552 let has_schema = schema_set.iter().any(|sq| sq.database == *database);
553 if has_schema {
554 verbose!("Applying database setup for: {}", database);
555 self.execute_sql(statement).await?;
556 }
557 }
558 project::ModStatement::Schema {
559 database,
560 schema,
561 statement,
562 } => {
563 if schema_set.contains(&project::SchemaQualifier::new(
564 database.to_string(),
565 schema.to_string(),
566 )) {
567 if let Some(suffix) = staging_suffix {
568 let staging_schema = format!("{}{}", schema, suffix);
569 let mut rewritten = statement.clone();
570 normalize::rewrite_schema_names(
571 std::slice::from_mut(&mut rewritten),
572 schema,
573 suffix,
574 );
575 verbose!("Applying schema setup for: {}.{}", database, staging_schema);
576 self.execute_sql(&rewritten).await?;
577 } else {
578 verbose!("Applying schema setup for: {}.{}", database, schema);
579 self.execute_sql(statement).await?;
580 }
581 }
582 }
583 }
584 }
585
586 Ok(())
587 }
588
589 pub async fn execute_sql(&self, stmt: &impl ToString) -> Result<(), CliError> {
594 let sql = stmt.to_string();
595
596 if self.dry_run {
597 self.statement_log.borrow_mut().push(sql.clone());
598 return Ok(());
599 }
600
601 self.client
602 .execute(&sql, &[])
603 .await
604 .map_err(|source| CliError::SqlExecutionFailed {
605 statement: sql,
606 source,
607 })?;
608 Ok(())
609 }
610
611 pub async fn ensure_database(&self, name: &str) -> Result<(), CliError> {
616 if self.dry_run {
617 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(name));
618 self.statement_log.borrow_mut().push(sql);
619 } else {
620 self.client.provisioning().create_database(name).await?;
621 }
622 Ok(())
623 }
624
625 pub async fn ensure_schema(&self, database: &str, schema: &str) -> Result<(), CliError> {
630 if self.dry_run {
631 let sql = format!(
632 "CREATE SCHEMA IF NOT EXISTS {}.{}",
633 quote_identifier(database),
634 quote_identifier(schema)
635 );
636 self.statement_log.borrow_mut().push(sql);
637 } else {
638 self.client
639 .provisioning()
640 .create_schema(database, schema)
641 .await?;
642 }
643 Ok(())
644 }
645
646 pub async fn create_cluster(
651 &self,
652 staging_name: &str,
653 prod_name: &str,
654 config: &ClusterConfig,
655 ) -> Result<(), CliError> {
656 if self.dry_run {
657 let sql = format!(
658 "CREATE CLUSTER {} (SIZE = '<from {}')",
659 quote_identifier(staging_name),
660 prod_name
661 );
662 self.statement_log.borrow_mut().push(sql);
663 } else {
664 self.client
665 .provisioning()
666 .create_cluster_with_config(staging_name, config)
667 .await?;
668 }
669 Ok(())
670 }
671
672 pub async fn record_deployment_clusters(
677 &self,
678 stage_name: &str,
679 clusters: &[String],
680 ) -> Result<(), CliError> {
681 if !self.dry_run {
682 self.client
683 .deployments()
684 .insert_deployment_clusters(stage_name, clusters)
685 .await?;
686 verbose!("Cluster mappings recorded");
687 }
688 Ok(())
689 }
690}