1use crate::cli::CliError;
35use crate::cli::git::get_git_commit;
36use crate::client::{Client, ClusterConfig, quote_identifier};
37use crate::config::Settings;
38use crate::project::analysis::deployment_snapshot::DeploymentMetadata;
39use crate::project::ir::graph::Project;
40use crate::project::resolve::normalize;
41use crate::project::{self, ir::compiled};
42use crate::{info, verbose};
43use owo_colors::{OwoColorize, Stream};
44use serde::Serialize;
45use std::cell::RefCell;
46use std::collections::BTreeSet;
47use std::fmt;
48use std::path::Path;
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
52#[serde(rename_all = "snake_case")]
53pub enum ObjectAction {
54 Created,
55 Altered,
56 UpToDate,
57 Skipped,
58}
59
60impl fmt::Display for ObjectAction {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 match self {
63 ObjectAction::Created => write!(f, "created"),
64 ObjectAction::Altered => write!(f, "altered"),
65 ObjectAction::UpToDate => write!(f, "up_to_date"),
66 ObjectAction::Skipped => write!(f, "skipped"),
67 }
68 }
69}
70
71#[derive(Clone, Serialize)]
73pub struct ObjectResult {
74 pub object: String,
76 pub action: ObjectAction,
78 pub statements: Vec<String>,
80 #[serde(skip)]
83 pub redacted_statements: Vec<String>,
84 #[serde(skip_serializing_if = "Option::is_none")]
87 pub transaction_group: Option<String>,
88}
89
90impl fmt::Display for ObjectResult {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 match self.action {
93 ObjectAction::Created | ObjectAction::Altered => {
94 write!(
95 f,
96 " {} {}",
97 "✓".if_supports_color(Stream::Stderr, |t| t.green()),
98 self.object
99 )
100 }
101 ObjectAction::UpToDate => write!(
102 f,
103 " {} {} ({})",
104 "=".if_supports_color(Stream::Stderr, |t| t.dimmed()),
105 self.object,
106 "up to date".if_supports_color(Stream::Stderr, |t| t.dimmed())
107 ),
108 ObjectAction::Skipped => write!(
109 f,
110 " {} {} ({})",
111 "-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
112 self.object,
113 "skipped".if_supports_color(Stream::Stderr, |t| t.dimmed())
114 ),
115 }
116 }
117}
118
119#[derive(Clone, Serialize)]
121pub struct ApplyResult {
122 pub phase: String,
124 pub results: Vec<ObjectResult>,
126}
127
128impl fmt::Display for ApplyResult {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 if self.results.is_empty() {
131 return Ok(());
132 }
133
134 let created = self
135 .results
136 .iter()
137 .filter(|r| r.action == ObjectAction::Created)
138 .count();
139 let altered = self
140 .results
141 .iter()
142 .filter(|r| r.action == ObjectAction::Altered)
143 .count();
144 let up_to_date = self
145 .results
146 .iter()
147 .filter(|r| r.action == ObjectAction::UpToDate)
148 .count();
149
150 let label = &self.phase;
151 let mut lines = Vec::new();
152
153 if up_to_date > 0 && created == 0 && altered == 0 {
154 lines.push(format!(
155 " {} {} {} up to date",
156 "✓".if_supports_color(Stream::Stderr, |t| t.green()),
157 up_to_date,
158 label
159 ));
160 }
161
162 for r in &self.results {
163 if r.action != ObjectAction::UpToDate {
164 lines.push(format!("{}", r));
165 }
166 }
167
168 write!(f, "{}", lines.join("\n"))
169 }
170}
171
172#[derive(Serialize)]
175pub struct ApplyPlan {
176 pub setup_statements: Vec<String>,
179 pub phases: Vec<ApplyResult>,
181 #[serde(skip)]
183 prepared_schemas: BTreeSet<project::SchemaQualifier>,
184}
185
186struct ExecutionBatch<'a> {
188 transaction_group: Option<&'a str>,
190 objects: Vec<&'a ObjectResult>,
192}
193
194impl ApplyPlan {
195 pub fn new() -> Self {
196 Self {
197 setup_statements: Vec::new(),
198 phases: Vec::new(),
199 prepared_schemas: BTreeSet::new(),
200 }
201 }
202
203 pub async fn prepare_schemas(
206 &mut self,
207 executor: &DeploymentExecutor<'_>,
208 planned_project: &Project,
209 schema_set: &BTreeSet<project::SchemaQualifier>,
210 ) -> Result<(), CliError> {
211 let new_schemas: BTreeSet<_> = schema_set
212 .difference(&self.prepared_schemas)
213 .cloned()
214 .collect();
215 if new_schemas.is_empty() {
216 return Ok(());
217 }
218 executor
219 .prepare_databases_and_schemas(planned_project, &new_schemas, None)
220 .await?;
221 self.setup_statements.extend(executor.take_statements());
222 self.prepared_schemas.extend(new_schemas);
223 Ok(())
224 }
225
226 pub fn add_phase(&mut self, result: ApplyResult) {
228 self.phases.push(result);
229 }
230
231 pub async fn execute(&self, client: &Client) -> Result<(), CliError> {
237 for sql in &self.setup_statements {
239 client
240 .execute(sql, &[])
241 .await
242 .map_err(|source| CliError::SqlExecutionFailed {
243 statement: sql.clone(),
244 source,
245 })?;
246 }
247
248 let mut batches: Vec<ExecutionBatch<'_>> = Vec::new();
250 for phase in &self.phases {
251 for obj in &phase.results {
252 let obj_txn = obj.transaction_group.as_deref();
253 match batches.last_mut() {
254 Some(batch) if batch.transaction_group == obj_txn && obj_txn.is_some() => {
255 batch.objects.push(obj);
257 }
258 _ => {
259 batches.push(ExecutionBatch {
261 transaction_group: obj_txn,
262 objects: vec![obj],
263 });
264 }
265 }
266 }
267 }
268
269 for batch in &batches {
271 let in_txn = batch.transaction_group.is_some();
272 if in_txn {
273 client.execute("BEGIN", &[]).await.map_err(|source| {
274 CliError::SqlExecutionFailed {
275 statement: "BEGIN".to_string(),
276 source,
277 }
278 })?;
279 }
280
281 for obj in &batch.objects {
282 for sql in obj.redacted_statements.iter().chain(&obj.statements) {
283 if let Err(e) = client.execute(sql, &[]).await {
284 if in_txn {
285 let _ = client.execute("ROLLBACK", &[]).await;
286 }
287 return Err(CliError::SqlExecutionFailed {
288 statement: if obj.redacted_statements.contains(sql) {
289 "[REDACTED — contains secret value]".to_string()
290 } else {
291 sql.clone()
292 },
293 source: e,
294 });
295 }
296 }
297 }
298
299 if in_txn {
300 client.execute("COMMIT", &[]).await.map_err(|source| {
301 CliError::SqlExecutionFailed {
302 statement: "COMMIT".to_string(),
303 source,
304 }
305 })?;
306 }
307 }
308
309 Ok(())
310 }
311}
312
313impl fmt::Display for ApplyPlan {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 let mut first = true;
316 for phase in &self.phases {
317 if phase.results.is_empty() {
318 continue;
319 }
320 if !first {
321 writeln!(f)?;
322 }
323 write!(f, "{}", phase)?;
324 first = false;
325 }
326 Ok(())
327 }
328}
329
330pub async fn collect_deployment_metadata(client: &Client, directory: &Path) -> DeploymentMetadata {
336 let deployed_by = client
337 .introspection()
338 .get_current_user()
339 .await
340 .unwrap_or_else(|e| {
341 info!("warning: failed to get current user: {}", e);
342 "unknown".to_string()
343 });
344
345 let git_commit = get_git_commit(directory);
346
347 DeploymentMetadata {
348 deployed_by,
349 git_commit,
350 }
351}
352
353pub async fn connect_apply_client(settings: &Settings) -> Result<Client, CliError> {
357 let client = Client::connect_with_profile(settings.connection().clone())
358 .await
359 .map_err(CliError::Connection)?;
360 let role =
361 crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
362 crate::cli::commands::setup::require_deployer(role)?;
363 Ok(client)
364}
365
366pub async fn compile_apply_project_and_connect(
369 settings: &Settings,
370) -> Result<(Project, Client), CliError> {
371 let planned_project = crate::cli::commands::compile::run_without_typecheck(
372 settings,
373 !crate::log::json_output_enabled(),
374 )
375 .await?;
376 let client = connect_apply_client(settings).await?;
377 Ok((planned_project, client))
378}
379
380pub fn generate_random_env_name() -> String {
383 use sha2::{Digest, Sha256};
384 use std::time::SystemTime;
385
386 let now = SystemTime::now()
387 .duration_since(SystemTime::UNIX_EPOCH)
388 .expect("system time before Unix epoch")
389 .as_nanos();
390
391 let mut hasher = Sha256::new();
392 hasher.update(now.to_le_bytes());
393 let hash = hasher.finalize();
394
395 format!(
397 "{:07x}",
398 u32::from_le_bytes([hash[0], hash[1], hash[2], hash[3]]) & 0xFFFFFFF
399 )
400}
401
402pub struct DeploymentExecutor<'a> {
412 client: &'a Client,
413 dry_run: bool,
414 statement_log: RefCell<Vec<String>>,
415}
416
417impl<'a> DeploymentExecutor<'a> {
418 pub fn new(client: &'a Client) -> Self {
420 Self {
421 client,
422 dry_run: false,
423 statement_log: RefCell::new(Vec::new()),
424 }
425 }
426
427 pub fn new_dry_run(client: &'a Client) -> Self {
429 Self {
430 client,
431 dry_run: true,
432 statement_log: RefCell::new(Vec::new()),
433 }
434 }
435
436 pub fn with_dry_run(client: &'a Client, dry_run: bool) -> Self {
438 Self {
439 client,
440 dry_run,
441 statement_log: RefCell::new(Vec::new()),
442 }
443 }
444
445 pub fn is_dry_run(&self) -> bool {
447 self.dry_run
448 }
449
450 pub fn take_statements(&self) -> Vec<String> {
455 self.statement_log.borrow_mut().drain(..).collect()
456 }
457
458 pub async fn execute_object(
463 &self,
464 typed_obj: &compiled::DatabaseObject,
465 ) -> Result<(), CliError> {
466 self.execute_sql(&typed_obj.stmt).await?;
468
469 for index in &typed_obj.indexes {
471 self.execute_sql(index).await?;
472 }
473
474 for grant in &typed_obj.grants {
476 self.execute_sql(grant).await?;
477 }
478
479 for comment in &typed_obj.comments {
481 self.execute_sql(comment).await?;
482 }
483
484 Ok(())
485 }
486
487 pub async fn prepare_databases_and_schemas(
492 &self,
493 planned_project: &Project,
494 schema_set: &BTreeSet<project::SchemaQualifier>,
495 staging_suffix: Option<&str>,
496 ) -> Result<(), CliError> {
497 if schema_set.is_empty() {
498 return Ok(());
499 }
500
501 let databases: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
503 for db in &databases {
504 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(db));
505 self.execute_sql(&sql).await?;
506 }
507
508 for sq in schema_set {
510 let schema_name = match staging_suffix {
511 Some(suffix) => format!("{}{}", sq.schema, suffix),
512 None => sq.schema.clone(),
513 };
514 verbose!(
515 "Creating schema {}.{} if not exists",
516 sq.database,
517 schema_name
518 );
519 let sql = format!(
520 "CREATE SCHEMA IF NOT EXISTS {}.{}",
521 quote_identifier(&sq.database),
522 quote_identifier(&schema_name)
523 );
524 self.execute_sql(&sql).await?;
525 }
526
527 for mod_stmt in planned_project.iter_mod_statements() {
529 match mod_stmt {
530 project::ModStatement::Database {
531 database,
532 statement,
533 } => {
534 let has_schema = schema_set.iter().any(|sq| sq.database == *database);
535 if has_schema {
536 verbose!("Applying database setup for: {}", database);
537 self.execute_sql(statement).await?;
538 }
539 }
540 project::ModStatement::Schema {
541 database,
542 schema,
543 statement,
544 } => {
545 if schema_set.contains(&project::SchemaQualifier::new(
546 database.to_string(),
547 schema.to_string(),
548 )) {
549 if let Some(suffix) = staging_suffix {
550 let staging_schema = format!("{}{}", schema, suffix);
551 let mut rewritten = statement.clone();
552 normalize::rewrite_schema_names(
553 std::slice::from_mut(&mut rewritten),
554 schema,
555 suffix,
556 );
557 verbose!("Applying schema setup for: {}.{}", database, staging_schema);
558 self.execute_sql(&rewritten).await?;
559 } else {
560 verbose!("Applying schema setup for: {}.{}", database, schema);
561 self.execute_sql(statement).await?;
562 }
563 }
564 }
565 }
566 }
567
568 Ok(())
569 }
570
571 pub async fn execute_sql(&self, stmt: &impl ToString) -> Result<(), CliError> {
576 let sql = stmt.to_string();
577
578 if self.dry_run {
579 self.statement_log.borrow_mut().push(sql.clone());
580 return Ok(());
581 }
582
583 self.client
584 .execute(&sql, &[])
585 .await
586 .map_err(|source| CliError::SqlExecutionFailed {
587 statement: sql,
588 source,
589 })?;
590 Ok(())
591 }
592
593 pub async fn ensure_database(&self, name: &str) -> Result<(), CliError> {
598 if self.dry_run {
599 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", quote_identifier(name));
600 self.statement_log.borrow_mut().push(sql);
601 } else {
602 self.client.provisioning().create_database(name).await?;
603 }
604 Ok(())
605 }
606
607 pub async fn ensure_schema(&self, database: &str, schema: &str) -> Result<(), CliError> {
612 if self.dry_run {
613 let sql = format!(
614 "CREATE SCHEMA IF NOT EXISTS {}.{}",
615 quote_identifier(database),
616 quote_identifier(schema)
617 );
618 self.statement_log.borrow_mut().push(sql);
619 } else {
620 self.client
621 .provisioning()
622 .create_schema(database, schema)
623 .await?;
624 }
625 Ok(())
626 }
627
628 pub async fn create_cluster(
633 &self,
634 staging_name: &str,
635 prod_name: &str,
636 config: &ClusterConfig,
637 ) -> Result<(), CliError> {
638 if self.dry_run {
639 let sql = format!(
640 "CREATE CLUSTER {} (SIZE = '<from {}')",
641 quote_identifier(staging_name),
642 prod_name
643 );
644 self.statement_log.borrow_mut().push(sql);
645 } else {
646 self.client
647 .provisioning()
648 .create_cluster_with_config(staging_name, config)
649 .await?;
650 }
651 Ok(())
652 }
653
654 pub async fn record_deployment_clusters(
659 &self,
660 stage_name: &str,
661 clusters: &[String],
662 ) -> Result<(), CliError> {
663 if !self.dry_run {
664 self.client
665 .deployments()
666 .insert_deployment_clusters(stage_name, clusters)
667 .await?;
668 verbose!("Cluster mappings recorded");
669 }
670 Ok(())
671 }
672}