1use crate::client::connection::{Client, DeploymentsClient, DeploymentsClientMut};
60use crate::client::errors::ConnectionError;
61use crate::client::models::{
62 ApplyState, ConflictRecord, DeploymentDetails, DeploymentHistoryEntry, DeploymentKind,
63 DeploymentMetadata, DeploymentMode, DeploymentObjectRecord, PendingStatement,
64 ProductionClusterRecord, SchemaDeploymentRecord, StagingDeployment,
65};
66use crate::client::quote_identifier;
67use crate::client::staging_suffix_like_pattern;
68use crate::project::SchemaQualifier;
69use crate::project::analysis::deployment_snapshot::DeploymentSnapshot;
70use crate::project::ir::object_id::ObjectId;
71use async_stream::try_stream;
72use chrono::{DateTime, Utc};
73use futures::Stream;
74use mz_postgres_util::Sql;
75use std::collections::{BTreeMap, BTreeSet};
76use std::fmt;
77use tokio_postgres::types::ToSql;
78
79#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
81#[serde(rename_all = "snake_case")]
82pub enum FailureReason {
83 NoReplicas,
85 AllReplicasProblematic { problematic: i64, total: i64 },
87}
88
89impl fmt::Display for FailureReason {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 match self {
92 FailureReason::NoReplicas => write!(f, "no replicas configured"),
93 FailureReason::AllReplicasProblematic { problematic, total } => {
94 write!(
95 f,
96 "all {} of {} replicas OOM-looping (3+ crashes in 24h)",
97 problematic, total
98 )
99 }
100 }
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, serde::Serialize)]
106#[serde(rename_all = "snake_case")]
107pub enum ClusterDeploymentStatus {
108 Ready,
110 Hydrating { hydrated: i64, total: i64 },
112 Lagging { max_lag_secs: i64 },
114 Failing { reason: FailureReason },
116}
117
118#[derive(Debug, Clone, serde::Serialize)]
120pub struct ClusterStatusContext {
121 pub cluster_name: String,
123 pub cluster_id: String,
125 pub status: ClusterDeploymentStatus,
127 pub hydrated_count: i64,
129 pub total_count: i64,
131 pub max_lag_secs: i64,
133 pub total_replicas: i64,
135 pub problematic_replicas: i64,
137}
138
139#[derive(Debug, Clone)]
145pub struct HydrationStatusUpdate {
146 pub cluster_name: String,
148 pub cluster_id: String,
150 pub status: ClusterDeploymentStatus,
152 pub failure_reason: Option<FailureReason>,
154 pub hydrated_count: i64,
156 pub total_count: i64,
158 pub max_lag_secs: i64,
160 pub total_replicas: i64,
162 pub problematic_replicas: i64,
164}
165
166pub(super) async fn insert_schema_deployments(
168 client: &Client,
169 deployments: &[SchemaDeploymentRecord],
170) -> Result<(), ConnectionError> {
171 if deployments.is_empty() {
172 return Ok(());
173 }
174
175 let insert_sql = r#"
176 INSERT INTO _mz_deploy.tables.deployments
177 (deploy_id, database, schema, deployed_at, deployed_by, promoted_at, commit, kind, mode)
178 VALUES
179 ($1, $2, $3, $4, $5, $6, $7, $8, $9)
180 "#;
181
182 for deployment in deployments {
183 let kind_str = deployment.kind.to_string();
184 let mode_str = deployment.mode.to_string();
185 client
186 .execute(
187 insert_sql,
188 &[
189 &deployment.deploy_id,
190 &deployment.database,
191 &deployment.schema,
192 &deployment.deployed_at,
193 &deployment.deployed_by,
194 &deployment.promoted_at,
195 &deployment.git_commit,
196 &kind_str,
197 &mode_str,
198 ],
199 )
200 .await?;
201 }
202
203 Ok(())
204}
205
206pub(super) async fn append_deployment_objects(
208 client: &Client,
209 objects: &[DeploymentObjectRecord],
210) -> Result<(), ConnectionError> {
211 if objects.is_empty() {
212 return Ok(());
213 }
214
215 let insert_sql = r#"
216 INSERT INTO _mz_deploy.tables.objects
217 (deploy_id, database, schema, object, hash)
218 VALUES
219 ($1, $2, $3, $4, $5)
220 "#;
221
222 for obj in objects {
223 client
224 .execute(
225 insert_sql,
226 &[
227 &obj.deploy_id,
228 &obj.database,
229 &obj.schema,
230 &obj.object,
231 &obj.object_hash,
232 ],
233 )
234 .await?;
235 }
236
237 Ok(())
238}
239
240pub(super) async fn insert_deployment_clusters(
245 client: &Client,
246 deploy_id: &str,
247 clusters: &[String],
248) -> Result<(), ConnectionError> {
249 if clusters.is_empty() {
250 return Ok(());
251 }
252
253 let placeholders: Vec<String> = (1..=clusters.len()).map(|i| format!("${}", i)).collect();
255 let placeholders_str = placeholders.join(", ");
256
257 let select_sql = format!(
258 "SELECT name, id FROM mz_catalog.mz_clusters WHERE name IN ({})",
259 placeholders_str
260 );
261
262 #[allow(clippy::as_conversions)]
263 let params: Vec<&(dyn ToSql + Sync)> =
264 clusters.iter().map(|c| c as &(dyn ToSql + Sync)).collect();
265
266 let rows = client.query(&select_sql, ¶ms).await?;
267
268 if rows.len() != clusters.len() {
270 let found_names: BTreeSet<String> = rows.iter().map(|row| row.get("name")).collect();
271 let missing: Vec<&str> = clusters
272 .iter()
273 .filter(|name| !found_names.contains(*name))
274 .map(|s| s.as_str())
275 .collect();
276
277 return Err(ConnectionError::IntrospectionFailed {
278 object_type: "cluster".to_string(),
279 source: format!(
280 "Failed to resolve cluster names to IDs. The following clusters do not exist: {}",
281 missing.join(", ")
282 )
283 .into(),
284 });
285 }
286
287 let insert_sql = r#"
289 INSERT INTO _mz_deploy.tables.clusters (deploy_id, cluster_id)
290 VALUES ($1, $2)
291 "#;
292
293 for row in rows {
294 let cluster_id: String = row.get("id");
295 client
296 .execute(insert_sql, &[&deploy_id, &cluster_id])
297 .await?;
298 }
299
300 Ok(())
301}
302
303pub(super) async fn get_deployment_clusters(
309 client: &Client,
310 deploy_id: &str,
311) -> Result<Vec<String>, ConnectionError> {
312 let query = r#"
313 SELECT name
314 FROM _mz_deploy.public.deployment_clusters
315 WHERE deploy_id = $1
316 ORDER BY name
317 "#;
318
319 let rows = client.query(query, &[&deploy_id]).await?;
320
321 Ok(rows.iter().map(|row| row.get("name")).collect())
322}
323
324pub(super) async fn validate_deployment_clusters(
329 client: &Client,
330 deploy_id: &str,
331) -> Result<(), ConnectionError> {
332 let query = r#"
333 SELECT cluster_id
334 FROM _mz_deploy.public.missing_clusters
335 WHERE deploy_id = $1
336 "#;
337
338 let rows = client.query(query, &[&deploy_id]).await?;
339
340 if !rows.is_empty() {
341 let missing_ids: Vec<String> = rows.iter().map(|row| row.get("cluster_id")).collect();
342 return Err(ConnectionError::IntrospectionFailed {
343 object_type: "cluster".to_string(),
344 source: format!(
345 "Deployment '{}' references {} cluster(s) that no longer exist: {}. \
346 These clusters may have been deleted. Run 'mz-deploy abort {}' to clean up.",
347 deploy_id,
348 missing_ids.len(),
349 missing_ids.join(", "),
350 deploy_id
351 )
352 .into(),
353 });
354 }
355
356 Ok(())
357}
358
359pub(super) async fn delete_deployment_clusters(
361 client: &Client,
362 deploy_id: &str,
363) -> Result<(), ConnectionError> {
364 client
365 .execute(
366 "DELETE FROM _mz_deploy.tables.clusters WHERE deploy_id = $1",
367 &[&deploy_id],
368 )
369 .await?;
370 Ok(())
371}
372
373pub(super) async fn update_promoted_at(
375 client: &Client,
376 deploy_id: &str,
377) -> Result<(), ConnectionError> {
378 let update_sql = r#"
379 UPDATE _mz_deploy.tables.deployments
380 SET promoted_at = NOW()
381 WHERE deploy_id = $1
382 "#;
383
384 client.execute(update_sql, &[&deploy_id]).await?;
385 Ok(())
386}
387
388pub(super) async fn delete_deployment(
390 client: &Client,
391 deploy_id: &str,
392) -> Result<(), ConnectionError> {
393 client
394 .execute(
395 "DELETE FROM _mz_deploy.tables.deployments WHERE deploy_id = $1",
396 &[&deploy_id],
397 )
398 .await?;
399 client
400 .execute(
401 "DELETE FROM _mz_deploy.tables.objects WHERE deploy_id = $1",
402 &[&deploy_id],
403 )
404 .await?;
405 Ok(())
406}
407
408pub(super) async fn get_schema_deployments(
410 client: &Client,
411 deploy_id: Option<&str>,
412) -> Result<Vec<SchemaDeploymentRecord>, ConnectionError> {
413 let query = if deploy_id.is_none() {
420 r#"
421 SELECT deploy_id, database, schema,
422 promoted_at as deployed_at,
423 '' as deployed_by,
424 promoted_at,
425 commit,
426 kind,
427 'stage' as mode
428 FROM _mz_deploy.public.production
429 ORDER BY database, schema
430 "#
431 } else {
432 r#"
433 SELECT deploy_id, database, schema,
434 deployed_at,
435 deployed_by,
436 promoted_at,
437 commit,
438 kind,
439 mode
440 FROM _mz_deploy.public.deployments
441 WHERE deploy_id = $1
442 ORDER BY database, schema
443 "#
444 };
445
446 let rows = if deploy_id.is_none() {
447 client.query(query, &[]).await?
448 } else {
449 client.query(query, &[&deploy_id]).await?
450 };
451
452 let mut records = Vec::new();
453 for row in rows {
454 let deploy_id: String = row.get("deploy_id");
455 let database: String = row.get("database");
456 let schema: String = row.get("schema");
457 let deployed_at: DateTime<Utc> = row.get("deployed_at");
458 let deployed_by: String = row.get("deployed_by");
459 let promoted_at: Option<DateTime<Utc>> = row.get("promoted_at");
460 let git_commit: Option<String> = row.get("commit");
461 let kind_str: String = row.get("kind");
462 let mode_str: String = row.get("mode");
463
464 let kind = kind_str.parse().map_err(|e| {
465 ConnectionError::Message(format!("Failed to parse deployment kind: {}", e))
466 })?;
467 let mode = mode_str.parse::<DeploymentMode>().map_err(|e| {
468 ConnectionError::Message(format!("Failed to parse deployment mode: {}", e))
469 })?;
470
471 records.push(SchemaDeploymentRecord {
472 deploy_id,
473 database,
474 schema,
475 deployed_at,
476 deployed_by,
477 promoted_at,
478 git_commit,
479 kind,
480 mode,
481 });
482 }
483
484 Ok(records)
485}
486
487pub(super) async fn get_deployment_objects(
489 client: &Client,
490 deploy_id: Option<&str>,
491) -> Result<DeploymentSnapshot, ConnectionError> {
492 let query = if deploy_id.is_none() {
493 r#"
494 SELECT o.database, o.schema, o.object, o.hash, p.kind
495 FROM _mz_deploy.public.objects o
496 JOIN _mz_deploy.public.production p
497 ON o.database = p.database AND o.schema = p.schema
498 WHERE o.deploy_id = p.deploy_id
499 "#
500 } else {
501 r#"
502 SELECT o.database, o.schema, o.object, o.hash, d.kind
503 FROM _mz_deploy.public.objects o
504 JOIN _mz_deploy.public.deployments d
505 ON o.deploy_id = d.deploy_id
506 AND o.database = d.database
507 AND o.schema = d.schema
508 WHERE o.deploy_id = $1
509 "#
510 };
511
512 let rows = if deploy_id.is_none() {
513 client.query(query, &[]).await?
514 } else {
515 client.query(query, &[&deploy_id]).await?
516 };
517
518 let mut objects = BTreeMap::new();
519 let mut schemas = BTreeMap::new();
520 for row in rows {
521 let database: String = row.get("database");
522 let schema: String = row.get("schema");
523 let object: String = row.get("object");
524 let object_hash: String = row.get("hash");
525
526 let kind_str: String = row.get("kind");
527 let kind = kind_str.parse().map_err(|e| {
528 ConnectionError::Message(format!("Failed to parse deployment kind: {}", e))
529 })?;
530
531 let object_id = ObjectId::new(database.clone(), schema.clone(), object);
532 objects.insert(object_id, object_hash);
533 schemas
534 .entry(SchemaQualifier::new(database, schema))
535 .or_insert(kind);
536 }
537
538 Ok(DeploymentSnapshot { objects, schemas })
539}
540
541pub(super) async fn get_deployment_metadata(
543 client: &Client,
544 deploy_id: &str,
545) -> Result<Option<DeploymentMetadata>, ConnectionError> {
546 let query = r#"
547 SELECT deploy_id,
548 promoted_at,
549 mode,
550 database,
551 schema
552 FROM _mz_deploy.public.deployments
553 WHERE deploy_id = $1
554 "#;
555
556 let rows = client.query(query, &[&deploy_id]).await?;
557
558 if rows.is_empty() {
559 return Ok(None);
560 }
561
562 let first_row = &rows[0];
563 let deploy_id: String = first_row.get("deploy_id");
564 let promoted_at: Option<DateTime<Utc>> = first_row.get("promoted_at");
565 let mode_str: String = first_row.get("mode");
566 let mode = mode_str
567 .parse::<DeploymentMode>()
568 .map_err(|e| ConnectionError::Message(format!("Failed to parse deployment mode: {}", e)))?;
569
570 let mut schemas = Vec::new();
571 for row in rows {
572 let database: String = row.get("database");
573 let schema: String = row.get("schema");
574 schemas.push(SchemaQualifier::new(database, schema));
575 }
576
577 Ok(Some(DeploymentMetadata {
578 deploy_id,
579 promoted_at,
580 mode,
581 schemas,
582 }))
583}
584
585pub(super) async fn get_deployment_details(
589 client: &Client,
590 deploy_id: &str,
591) -> Result<Option<DeploymentDetails>, ConnectionError> {
592 let query = r#"
593 SELECT deploy_id,
594 deployed_at,
595 promoted_at,
596 deployed_by,
597 commit,
598 kind,
599 mode,
600 database,
601 schema
602 FROM _mz_deploy.public.deployments
603 WHERE deploy_id = $1
604 ORDER BY database, schema
605 "#;
606
607 let rows = client.query(query, &[&deploy_id]).await?;
608
609 if rows.is_empty() {
610 return Ok(None);
611 }
612
613 let first_row = &rows[0];
614 let deployed_at: DateTime<Utc> = first_row.get("deployed_at");
615 let promoted_at: Option<DateTime<Utc>> = first_row.get("promoted_at");
616 let deployed_by: String = first_row.get("deployed_by");
617 let git_commit: Option<String> = first_row.get("commit");
618 let kind_str: String = first_row.get("kind");
619 let kind: DeploymentKind = kind_str.parse().map_err(ConnectionError::Message)?;
620 let mode_str: String = first_row.get("mode");
621 let mode = mode_str
622 .parse::<DeploymentMode>()
623 .map_err(|e| ConnectionError::Message(format!("Failed to parse deployment mode: {}", e)))?;
624
625 let mut schemas = Vec::new();
626 for row in rows {
627 let database: String = row.get("database");
628 let schema: String = row.get("schema");
629 schemas.push(SchemaQualifier::new(database, schema));
630 }
631
632 Ok(Some(DeploymentDetails {
633 deployed_at,
634 promoted_at,
635 deployed_by,
636 git_commit,
637 kind,
638 mode,
639 schemas,
640 }))
641}
642
643pub(super) async fn list_staging_deployments(
647 client: &Client,
648) -> Result<BTreeMap<String, StagingDeployment>, ConnectionError> {
649 let query = r#"
650 SELECT deploy_id,
651 deployed_at,
652 deployed_by,
653 commit,
654 kind,
655 mode,
656 database,
657 schema
658 FROM _mz_deploy.public.staging_deployments
659 ORDER BY deploy_id, database, schema
660 "#;
661
662 let rows = client.query(query, &[]).await?;
663
664 let mut deployments: BTreeMap<String, StagingDeployment> = BTreeMap::new();
665
666 for row in rows {
667 let deploy_id: String = row.get("deploy_id");
668 let deployed_at: DateTime<Utc> = row.get("deployed_at");
669 let deployed_by: String = row.get("deployed_by");
670 let git_commit: Option<String> = row.get("commit");
671 let kind_str: String = row.get("kind");
672 let mode_str: String = row.get("mode");
673 let database: String = row.get("database");
674 let schema: String = row.get("schema");
675
676 let kind: DeploymentKind = kind_str.parse().map_err(|e| {
680 ConnectionError::Message(format!("Failed to parse deployment kind: {}", e))
681 })?;
682 let mode: DeploymentMode = mode_str.parse().map_err(|e| {
683 ConnectionError::Message(format!("Failed to parse deployment mode: {}", e))
684 })?;
685
686 deployments
687 .entry(deploy_id)
688 .or_insert_with(|| StagingDeployment {
689 deployed_at,
690 deployed_by: deployed_by.clone(),
691 git_commit: git_commit.clone(),
692 kind,
693 mode,
694 schemas: Vec::new(),
695 })
696 .schemas
697 .push(SchemaQualifier::new(database, schema));
698 }
699
700 Ok(deployments)
701}
702
703pub(super) async fn list_deployment_history(
707 client: &Client,
708 limit: Option<usize>,
709) -> Result<Vec<DeploymentHistoryEntry>, ConnectionError> {
710 let query = if let Some(limit) = limit {
713 format!(
714 r#"
715 WITH unique_deployments AS (
716 SELECT DISTINCT deploy_id, promoted_at, deployed_by, commit, kind
717 FROM _mz_deploy.public.deployments
718 WHERE promoted_at IS NOT NULL
719 ORDER BY promoted_at DESC
720 LIMIT {}
721 )
722 SELECT d.deploy_id,
723 d.promoted_at,
724 d.deployed_by,
725 d.commit,
726 d.kind,
727 d.database,
728 d.schema
729 FROM _mz_deploy.public.deployments d
730 JOIN unique_deployments u
731 ON d.deploy_id = u.deploy_id
732 AND d.promoted_at = u.promoted_at
733 AND d.deployed_by = u.deployed_by
734 ORDER BY d.promoted_at DESC, d.database, d.schema
735 "#,
736 limit
737 )
738 } else {
739 r#"
740 SELECT deploy_id,
741 promoted_at,
742 deployed_by,
743 commit,
744 kind,
745 database,
746 schema
747 FROM _mz_deploy.public.deployments
748 WHERE promoted_at IS NOT NULL
749 ORDER BY promoted_at DESC, database, schema
750 "#
751 .to_string()
752 };
753
754 let rows = client.query(&query, &[]).await?;
755
756 let mut deployments: Vec<DeploymentHistoryEntry> = Vec::new();
758 let mut current_deploy_id: Option<String> = None;
759
760 for row in rows {
761 let deploy_id: String = row.get("deploy_id");
762 let promoted_at: DateTime<Utc> = row.get("promoted_at");
763 let deployed_by: String = row.get("deployed_by");
764 let git_commit: Option<String> = row.get("commit");
765 let kind_str: String = row.get("kind");
766 let database: String = row.get("database");
767 let schema: String = row.get("schema");
768
769 if current_deploy_id.as_ref() != Some(&deploy_id) {
771 let kind = kind_str.parse().unwrap_or(DeploymentKind::Objects);
773 deployments.push(DeploymentHistoryEntry {
775 deploy_id: deploy_id.clone(),
776 promoted_at,
777 deployed_by,
778 git_commit,
779 kind,
780 schemas: vec![SchemaQualifier::new(database, schema)],
781 });
782 current_deploy_id = Some(deploy_id);
783 } else {
784 if let Some(last) = deployments.last_mut() {
786 last.schemas.push(SchemaQualifier::new(database, schema));
787 }
788 }
789 }
790
791 Ok(deployments)
792}
793
794pub(super) async fn check_deployment_conflicts(
796 client: &Client,
797 deploy_id: &str,
798) -> Result<Vec<ConflictRecord>, ConnectionError> {
799 let query = r#"
800 SELECT p.database, p.schema, p.deploy_id, p.promoted_at
801 FROM _mz_deploy.public.production p
802 JOIN _mz_deploy.public.deployments d USING (database, schema)
803 WHERE d.deploy_id = $1 AND p.promoted_at > d.deployed_at
804 "#;
805
806 let rows = client.query(query, &[&deploy_id]).await?;
807
808 let conflicts = rows
809 .iter()
810 .map(|row| ConflictRecord {
811 database: row.get("database"),
812 schema: row.get("schema"),
813 deploy_id: row.get("deploy_id"),
814 promoted_at: row.get("promoted_at"),
815 })
816 .collect();
817
818 Ok(conflicts)
819}
820
821pub(super) async fn list_production_clusters(
828 client: &Client,
829) -> Result<Vec<ProductionClusterRecord>, ConnectionError> {
830 let query = r#"
838 SELECT DISTINCT ON (c.name)
839 c.name AS cluster_name,
840 p.database AS database,
841 p.schema AS schema,
842 p.promoted_at AS promoted_at
843 FROM _mz_deploy.public.production p
844 JOIN mz_catalog.mz_databases d ON d.name = p.database
845 JOIN mz_catalog.mz_schemas s
846 ON s.name = p.schema AND s.database_id = d.id
847 JOIN mz_catalog.mz_objects o
848 ON o.schema_id = s.id AND o.cluster_id IS NOT NULL
849 JOIN mz_catalog.mz_clusters c ON c.id = o.cluster_id
850 ORDER BY c.name, p.promoted_at DESC
851 "#;
852
853 let rows = client.query(query, &[]).await?;
854 let records = rows
855 .iter()
856 .map(|row| ProductionClusterRecord {
857 cluster_name: row.get("cluster_name"),
858 database: row.get("database"),
859 schema: row.get("schema"),
860 promoted_at: row.get("promoted_at"),
861 })
862 .collect();
863 Ok(records)
864}
865
866pub(super) async fn deployment_table_exists(client: &Client) -> Result<bool, ConnectionError> {
868 let query = r#"
869 SELECT EXISTS(
870 SELECT 1
871 FROM mz_catalog.mz_tables t
872 JOIN mz_catalog.mz_schemas s ON t.schema_id = s.id
873 JOIN mz_catalog.mz_databases d ON s.database_id = d.id
874 WHERE t.name = 'deployments'
875 AND s.name = 'public'
876 AND d.name = '_mz_deploy'
877 )
878 "#;
879
880 let row = client.query_one(query, &[]).await?;
881
882 Ok(row.get(0))
883}
884
885pub const DEFAULT_ALLOWED_LAG_SECS: i64 = 300;
887
888fn hydration_status_query(allowed_lag_secs: i64) -> String {
894 format!(
895 r#"
896 WITH
897 problematic_replicas AS (
898 SELECT replica_id
899 FROM mz_internal.mz_cluster_replica_status_history
900 WHERE occurred_at + INTERVAL '24 hours' > mz_now()
901 AND reason = 'oom-killed'
902 GROUP BY replica_id
903 HAVING COUNT(*) >= 3
904 ),
905 cluster_health AS (
906 SELECT
907 c.name AS cluster_name,
908 c.id AS cluster_id,
909 COUNT(r.id) AS total_replicas,
910 COUNT(pr.replica_id) AS problematic_replicas
911 FROM mz_clusters c
912 LEFT JOIN mz_cluster_replicas r ON c.id = r.cluster_id
913 LEFT JOIN problematic_replicas pr ON r.id = pr.replica_id
914 WHERE c.name LIKE $1 ESCAPE '\'
915 GROUP BY c.name, c.id
916 ),
917 hydration_counts AS (
918 SELECT
919 c.name AS cluster_name,
920 r.id AS replica_id,
921 COUNT(*) FILTER (WHERE mhs.hydrated) AS hydrated,
922 COUNT(*) AS total
923 FROM mz_clusters c
924 JOIN mz_cluster_replicas r ON c.id = r.cluster_id
925 LEFT JOIN mz_internal.mz_hydration_statuses mhs ON mhs.replica_id = r.id
926 WHERE c.name LIKE $1 ESCAPE '\'
927 GROUP BY c.name, r.id
928 ),
929 hydration_best AS (
930 SELECT cluster_name, MAX(hydrated) AS hydrated, MAX(total) AS total
931 FROM hydration_counts
932 GROUP BY cluster_name
933 ),
934 cluster_lag AS (
935 SELECT
936 c.name AS cluster_name,
937 MAX(EXTRACT(EPOCH FROM wgl.lag)) AS max_lag_secs
938 FROM mz_clusters c
939 JOIN mz_cluster_replicas r ON c.id = r.cluster_id
940 JOIN mz_internal.mz_hydration_statuses mhs ON mhs.replica_id = r.id
941 JOIN mz_internal.mz_wallclock_global_lag wgl ON wgl.object_id = mhs.object_id
942 WHERE c.name LIKE $1 ESCAPE '\'
943 GROUP BY c.name
944 )
945 SELECT
946 ch.cluster_name,
947 ch.cluster_id,
948 CASE
949 WHEN ch.total_replicas = 0 THEN 'failing'
950 WHEN ch.total_replicas = ch.problematic_replicas THEN 'failing'
951 WHEN COALESCE(hb.hydrated, 0) < COALESCE(hb.total, 0) THEN 'hydrating'
952 WHEN COALESCE(cl.max_lag_secs, 0) > {allowed_lag_secs} THEN 'lagging'
953 ELSE 'ready'
954 END AS status,
955 CASE
956 WHEN ch.total_replicas = 0 THEN 'no_replicas'
957 WHEN ch.total_replicas = ch.problematic_replicas THEN 'all_replicas_problematic'
958 ELSE NULL
959 END AS failure_reason,
960 COALESCE(hb.hydrated, 0) AS hydrated_count,
961 COALESCE(hb.total, 0) AS total_count,
962 COALESCE(cl.max_lag_secs, 0)::bigint AS max_lag_secs,
963 ch.total_replicas,
964 ch.problematic_replicas
965 FROM cluster_health ch
966 LEFT JOIN hydration_best hb ON ch.cluster_name = hb.cluster_name
967 LEFT JOIN cluster_lag cl ON ch.cluster_name = cl.cluster_name
968 "#,
969 allowed_lag_secs = allowed_lag_secs
970 )
971}
972
973pub(super) async fn get_deployment_hydration_status(
988 client: &Client,
989 deploy_id: &str,
990 allowed_lag_secs: i64,
991) -> Result<Vec<ClusterStatusContext>, ConnectionError> {
992 let pattern = staging_suffix_like_pattern(deploy_id);
993 let query = hydration_status_query(allowed_lag_secs);
994 let rows = client.query(&query, &[&pattern]).await?;
995
996 let mut results = Vec::new();
997 for row in rows {
998 let cluster_name: String = row.get("cluster_name");
999 let cluster_id: String = row.get("cluster_id");
1000 let status_str: String = row.get("status");
1001 let failure_reason: Option<String> = row.get("failure_reason");
1002 let hydrated_count: i64 = row.get("hydrated_count");
1003 let total_count: i64 = row.get("total_count");
1004 let max_lag_secs: i64 = row.get("max_lag_secs");
1005 let total_replicas: i64 = row.get("total_replicas");
1006 let problematic_replicas: i64 = row.get("problematic_replicas");
1007
1008 let status = match status_str.as_str() {
1009 "ready" => ClusterDeploymentStatus::Ready,
1010 "hydrating" => ClusterDeploymentStatus::Hydrating {
1011 hydrated: hydrated_count,
1012 total: total_count,
1013 },
1014 "lagging" => ClusterDeploymentStatus::Lagging { max_lag_secs },
1015 "failing" => {
1016 let reason = match failure_reason.as_deref() {
1017 Some("no_replicas") => FailureReason::NoReplicas,
1018 Some("all_replicas_problematic") => FailureReason::AllReplicasProblematic {
1019 problematic: problematic_replicas,
1020 total: total_replicas,
1021 },
1022 _ => FailureReason::NoReplicas, };
1024 ClusterDeploymentStatus::Failing { reason }
1025 }
1026 _ => ClusterDeploymentStatus::Ready, };
1028
1029 results.push(ClusterStatusContext {
1030 cluster_name,
1031 cluster_id,
1032 status,
1033 hydrated_count,
1034 total_count,
1035 max_lag_secs,
1036 total_replicas,
1037 problematic_replicas,
1038 });
1039 }
1040
1041 Ok(results)
1042}
1043
1044pub(super) async fn create_apply_state_schemas(
1055 client: &Client,
1056 deploy_id: &str,
1057) -> Result<(), ConnectionError> {
1058 let pre_schema = format!("apply_{}_pre", deploy_id);
1059 let post_schema = format!("apply_{}_post", deploy_id);
1060 let pre_schema_quoted = quote_identifier(&pre_schema);
1061 let post_schema_quoted = quote_identifier(&post_schema);
1062
1063 let create_pre = format!(
1064 "CREATE SCHEMA IF NOT EXISTS _mz_deploy.{}",
1065 pre_schema_quoted
1066 );
1067 client.execute(&create_pre, &[]).await?;
1068
1069 let create_post = format!(
1070 "CREATE SCHEMA IF NOT EXISTS _mz_deploy.{}",
1071 post_schema_quoted
1072 );
1073 client.execute(&create_post, &[]).await?;
1074
1075 let comment_check_query = r#"
1076 SELECT c.comment
1077 FROM mz_catalog.mz_schemas s
1078 JOIN mz_catalog.mz_databases d ON s.database_id = d.id
1079 LEFT JOIN mz_internal.mz_comments c ON s.id = c.id
1080 WHERE s.name = $1 AND d.name = '_mz_deploy'
1081 "#;
1082
1083 let rows = client.query(comment_check_query, &[&pre_schema]).await?;
1084 if !rows.is_empty() {
1085 let comment: Option<String> = rows[0].get("comment");
1086 if comment.is_none() {
1087 let comment_pre = format!(
1088 "COMMENT ON SCHEMA _mz_deploy.{} IS 'swapped=false'",
1089 pre_schema_quoted
1090 );
1091 client.execute(&comment_pre, &[]).await?;
1092 }
1093 }
1094
1095 let rows = client.query(comment_check_query, &[&post_schema]).await?;
1096 if !rows.is_empty() {
1097 let comment: Option<String> = rows[0].get("comment");
1098 if comment.is_none() {
1099 let comment_post = format!(
1100 "COMMENT ON SCHEMA _mz_deploy.{} IS 'swapped=true'",
1101 post_schema_quoted
1102 );
1103 client.execute(&comment_post, &[]).await?;
1104 }
1105 }
1106
1107 Ok(())
1108}
1109
1110pub(super) async fn get_apply_state(
1118 client: &Client,
1119 deploy_id: &str,
1120) -> Result<ApplyState, ConnectionError> {
1121 let pre_schema = format!("apply_{}_pre", deploy_id);
1122
1123 let query = r#"
1125 SELECT c.comment
1126 FROM mz_catalog.mz_schemas s
1127 JOIN mz_catalog.mz_databases d ON s.database_id = d.id
1128 LEFT JOIN mz_internal.mz_comments c ON s.id = c.id
1129 WHERE s.name = $1 AND d.name = '_mz_deploy'
1130 "#;
1131
1132 let rows = client.query(query, &[&pre_schema]).await?;
1133
1134 if rows.is_empty() {
1135 return Ok(ApplyState::NotStarted);
1136 }
1137
1138 let comment: Option<String> = rows[0].get("comment");
1139 match comment.as_deref() {
1140 Some("swapped=false") => Ok(ApplyState::PreSwap),
1141 Some("swapped=true") => Ok(ApplyState::PostSwap),
1142 _ => {
1143 Ok(ApplyState::NotStarted)
1145 }
1146 }
1147}
1148
1149pub(super) async fn delete_apply_state_schemas(
1151 client: &Client,
1152 deploy_id: &str,
1153) -> Result<(), ConnectionError> {
1154 let pre_schema = format!("apply_{}_pre", deploy_id);
1155 let post_schema = format!("apply_{}_post", deploy_id);
1156
1157 let drop_pre = format!(
1158 "DROP SCHEMA IF EXISTS _mz_deploy.{}",
1159 quote_identifier(&pre_schema)
1160 );
1161 client.execute(&drop_pre, &[]).await?;
1162
1163 let drop_post = format!(
1164 "DROP SCHEMA IF EXISTS _mz_deploy.{}",
1165 quote_identifier(&post_schema)
1166 );
1167 client.execute(&drop_post, &[]).await?;
1168
1169 Ok(())
1170}
1171
1172pub(super) async fn insert_pending_statements(
1174 client: &Client,
1175 statements: &[PendingStatement],
1176) -> Result<(), ConnectionError> {
1177 if statements.is_empty() {
1178 return Ok(());
1179 }
1180
1181 let insert_sql = r#"
1182 INSERT INTO _mz_deploy.tables.pending_statements
1183 (deploy_id, sequence_num, database, schema, object, object_hash, statement_sql, statement_kind, executed_at)
1184 VALUES
1185 ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1186 "#;
1187
1188 for stmt in statements {
1189 client
1190 .execute(
1191 insert_sql,
1192 &[
1193 &stmt.deploy_id,
1194 &stmt.sequence_num,
1195 &stmt.database,
1196 &stmt.schema,
1197 &stmt.object,
1198 &stmt.object_hash,
1199 &stmt.statement_sql,
1200 &stmt.statement_kind,
1201 &stmt.executed_at,
1202 ],
1203 )
1204 .await?;
1205 }
1206
1207 Ok(())
1208}
1209
1210pub(super) async fn get_pending_statements(
1212 client: &Client,
1213 deploy_id: &str,
1214) -> Result<Vec<PendingStatement>, ConnectionError> {
1215 let query = r#"
1216 SELECT deploy_id, sequence_num, database, schema, object, object_hash,
1217 statement_sql, statement_kind, executed_at
1218 FROM _mz_deploy.public.pending_statements
1219 WHERE deploy_id = $1 AND executed_at IS NULL
1220 ORDER BY sequence_num
1221 "#;
1222
1223 let rows = client.query(query, &[&deploy_id]).await?;
1224
1225 let mut statements = Vec::new();
1226 for row in rows {
1227 statements.push(PendingStatement {
1228 deploy_id: row.get("deploy_id"),
1229 sequence_num: row.get("sequence_num"),
1230 database: row.get("database"),
1231 schema: row.get("schema"),
1232 object: row.get("object"),
1233 object_hash: row.get("object_hash"),
1234 statement_sql: row.get("statement_sql"),
1235 statement_kind: row.get("statement_kind"),
1236 executed_at: row.get("executed_at"),
1237 });
1238 }
1239
1240 Ok(statements)
1241}
1242
1243pub(super) async fn mark_statement_executed(
1245 client: &Client,
1246 deploy_id: &str,
1247 sequence_num: i32,
1248) -> Result<(), ConnectionError> {
1249 let update_sql = r#"
1250 UPDATE _mz_deploy.tables.pending_statements
1251 SET executed_at = NOW()
1252 WHERE deploy_id = $1 AND sequence_num = $2
1253 "#;
1254
1255 client
1256 .execute(update_sql, &[&deploy_id, &sequence_num])
1257 .await?;
1258
1259 Ok(())
1260}
1261
1262pub(super) async fn delete_pending_statements(
1264 client: &Client,
1265 deploy_id: &str,
1266) -> Result<(), ConnectionError> {
1267 client
1268 .execute(
1269 "DELETE FROM _mz_deploy.tables.pending_statements WHERE deploy_id = $1",
1270 &[&deploy_id],
1271 )
1272 .await?;
1273
1274 Ok(())
1275}
1276
1277pub(super) async fn insert_replacement_mvs(
1279 client: &Client,
1280 records: &[super::models::ReplacementMvRecord],
1281) -> Result<(), ConnectionError> {
1282 for record in records {
1283 client
1284 .execute(
1285 r#"INSERT INTO _mz_deploy.tables.replacement_mvs
1286 (deploy_id, target_database, target_schema, target_name,
1287 replacement_schema)
1288 VALUES ($1, $2, $3, $4, $5)"#,
1289 &[
1290 &record.deploy_id,
1291 &record.target_database,
1292 &record.target_schema,
1293 &record.target_name,
1294 &record.replacement_schema,
1295 ],
1296 )
1297 .await?;
1298 }
1299
1300 Ok(())
1301}
1302
1303pub(super) async fn get_replacement_mvs(
1305 client: &Client,
1306 deploy_id: &str,
1307) -> Result<Vec<super::models::ReplacementMvRecord>, ConnectionError> {
1308 let rows = client
1309 .query(
1310 r#"SELECT deploy_id, target_database, target_schema, target_name,
1311 replacement_schema
1312 FROM _mz_deploy.public.replacement_mvs
1313 WHERE deploy_id = $1
1314 ORDER BY target_database, target_schema, target_name"#,
1315 &[&deploy_id],
1316 )
1317 .await?;
1318
1319 Ok(rows
1320 .iter()
1321 .map(|row| super::models::ReplacementMvRecord {
1322 deploy_id: row.get("deploy_id"),
1323 target_database: row.get("target_database"),
1324 target_schema: row.get("target_schema"),
1325 target_name: row.get("target_name"),
1326 replacement_schema: row.get("replacement_schema"),
1327 })
1328 .collect())
1329}
1330
1331impl DeploymentsClient<'_> {
1332 pub async fn insert_schema_deployments(
1333 &self,
1334 deployments: &[SchemaDeploymentRecord],
1335 ) -> Result<(), ConnectionError> {
1336 insert_schema_deployments(self.client, deployments).await
1337 }
1338
1339 pub async fn append_deployment_objects(
1340 &self,
1341 objects: &[DeploymentObjectRecord],
1342 ) -> Result<(), ConnectionError> {
1343 append_deployment_objects(self.client, objects).await
1344 }
1345
1346 pub async fn insert_deployment_clusters(
1347 &self,
1348 deploy_id: &str,
1349 clusters: &[String],
1350 ) -> Result<(), ConnectionError> {
1351 insert_deployment_clusters(self.client, deploy_id, clusters).await
1352 }
1353
1354 pub async fn get_deployment_clusters(
1355 &self,
1356 deploy_id: &str,
1357 ) -> Result<Vec<String>, ConnectionError> {
1358 get_deployment_clusters(self.client, deploy_id).await
1359 }
1360
1361 pub async fn validate_deployment_clusters(
1362 &self,
1363 deploy_id: &str,
1364 ) -> Result<(), ConnectionError> {
1365 validate_deployment_clusters(self.client, deploy_id).await
1366 }
1367
1368 pub async fn list_production_clusters(
1371 &self,
1372 ) -> Result<Vec<ProductionClusterRecord>, ConnectionError> {
1373 list_production_clusters(self.client).await
1374 }
1375
1376 pub async fn get_deployment_hydration_status(
1377 &self,
1378 deploy_id: &str,
1379 ) -> Result<Vec<ClusterStatusContext>, ConnectionError> {
1380 get_deployment_hydration_status(self.client, deploy_id, DEFAULT_ALLOWED_LAG_SECS).await
1381 }
1382
1383 pub async fn get_deployment_hydration_status_with_lag(
1384 &self,
1385 deploy_id: &str,
1386 allowed_lag_secs: i64,
1387 ) -> Result<Vec<ClusterStatusContext>, ConnectionError> {
1388 get_deployment_hydration_status(self.client, deploy_id, allowed_lag_secs).await
1389 }
1390
1391 pub async fn delete_deployment_clusters(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1392 delete_deployment_clusters(self.client, deploy_id).await
1393 }
1394
1395 pub async fn update_promoted_at(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1396 update_promoted_at(self.client, deploy_id).await
1397 }
1398
1399 pub async fn delete_deployment(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1400 delete_deployment(self.client, deploy_id).await
1401 }
1402
1403 pub async fn get_schema_deployments(
1404 &self,
1405 deploy_id: Option<&str>,
1406 ) -> Result<Vec<SchemaDeploymentRecord>, ConnectionError> {
1407 get_schema_deployments(self.client, deploy_id).await
1408 }
1409
1410 pub async fn get_deployment_objects(
1411 &self,
1412 deploy_id: Option<&str>,
1413 ) -> Result<DeploymentSnapshot, ConnectionError> {
1414 get_deployment_objects(self.client, deploy_id).await
1415 }
1416
1417 pub async fn get_deployment_metadata(
1418 &self,
1419 deploy_id: &str,
1420 ) -> Result<Option<DeploymentMetadata>, ConnectionError> {
1421 get_deployment_metadata(self.client, deploy_id).await
1422 }
1423
1424 pub async fn validate_staging(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1426 let metadata = self.get_deployment_metadata(deploy_id).await?;
1427 match metadata {
1428 Some(meta) if meta.promoted_at.is_some() => {
1429 Err(ConnectionError::DeploymentAlreadyPromoted {
1430 deploy_id: deploy_id.to_string(),
1431 })
1432 }
1433 Some(_) => Ok(()),
1434 None => Err(ConnectionError::DeploymentNotFound {
1435 deploy_id: deploy_id.to_string(),
1436 }),
1437 }
1438 }
1439
1440 pub async fn get_deployment_details(
1441 &self,
1442 deploy_id: &str,
1443 ) -> Result<Option<DeploymentDetails>, ConnectionError> {
1444 get_deployment_details(self.client, deploy_id).await
1445 }
1446
1447 pub async fn list_staging_deployments(
1448 &self,
1449 ) -> Result<BTreeMap<String, StagingDeployment>, ConnectionError> {
1450 list_staging_deployments(self.client).await
1451 }
1452
1453 pub async fn list_deployment_history(
1454 &self,
1455 limit: Option<usize>,
1456 ) -> Result<Vec<DeploymentHistoryEntry>, ConnectionError> {
1457 list_deployment_history(self.client, limit).await
1458 }
1459
1460 pub async fn check_deployment_conflicts(
1461 &self,
1462 deploy_id: &str,
1463 ) -> Result<Vec<ConflictRecord>, ConnectionError> {
1464 check_deployment_conflicts(self.client, deploy_id).await
1465 }
1466
1467 pub async fn deployment_table_exists(&self) -> Result<bool, ConnectionError> {
1468 deployment_table_exists(self.client).await
1469 }
1470
1471 pub async fn create_apply_state_schemas(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1472 create_apply_state_schemas(self.client, deploy_id).await
1473 }
1474
1475 pub async fn get_apply_state(&self, deploy_id: &str) -> Result<ApplyState, ConnectionError> {
1476 get_apply_state(self.client, deploy_id).await
1477 }
1478
1479 pub async fn delete_apply_state_schemas(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1480 delete_apply_state_schemas(self.client, deploy_id).await
1481 }
1482
1483 pub async fn insert_pending_statements(
1484 &self,
1485 statements: &[PendingStatement],
1486 ) -> Result<(), ConnectionError> {
1487 insert_pending_statements(self.client, statements).await
1488 }
1489
1490 pub async fn get_pending_statements(
1491 &self,
1492 deploy_id: &str,
1493 ) -> Result<Vec<PendingStatement>, ConnectionError> {
1494 get_pending_statements(self.client, deploy_id).await
1495 }
1496
1497 pub async fn mark_statement_executed(
1498 &self,
1499 deploy_id: &str,
1500 sequence_num: i32,
1501 ) -> Result<(), ConnectionError> {
1502 mark_statement_executed(self.client, deploy_id, sequence_num).await
1503 }
1504
1505 pub async fn delete_pending_statements(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1506 delete_pending_statements(self.client, deploy_id).await
1507 }
1508
1509 pub async fn insert_replacement_mvs(
1510 &self,
1511 records: &[super::models::ReplacementMvRecord],
1512 ) -> Result<(), ConnectionError> {
1513 insert_replacement_mvs(self.client, records).await
1514 }
1515
1516 pub async fn get_replacement_mvs(
1517 &self,
1518 deploy_id: &str,
1519 ) -> Result<Vec<super::models::ReplacementMvRecord>, ConnectionError> {
1520 get_replacement_mvs(self.client, deploy_id).await
1521 }
1522
1523 pub async fn delete_replacement_mvs(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1524 delete_replacement_mvs(self.client, deploy_id).await
1525 }
1526}
1527
1528impl DeploymentsClientMut<'_> {
1529 pub fn subscribe_deployment_hydration(
1531 &mut self,
1532 deploy_id: &str,
1533 allowed_lag_secs: i64,
1534 ) -> impl Stream<Item = Result<HydrationStatusUpdate, ConnectionError>> + '_ {
1535 let deploy_id = deploy_id.to_string();
1536
1537 try_stream! {
1538 let txn = self.client.begin_transaction().await?;
1539 let pattern = staging_suffix_like_pattern(&deploy_id);
1540 let query = hydration_status_query(allowed_lag_secs);
1541 let subscribe_sql = format!(
1542 "DECLARE c CURSOR FOR SUBSCRIBE ({query})"
1543 );
1544
1545 let subscribe_sql = Sql::raw_unchecked(subscribe_sql);
1546 mz_postgres_util::execute(&txn, subscribe_sql, &[&pattern]).await?;
1547
1548 loop {
1549 let rows = mz_postgres_util::query(&txn, Sql::new("FETCH ALL c"), &[]).await?;
1550 if rows.is_empty() {
1551 continue;
1552 }
1553
1554 for row in rows {
1555 let mz_diff: i64 = row.get(1);
1556 if mz_diff == -1 {
1557 continue;
1558 }
1559
1560 let status_str: String = row.get(4);
1561 let failure_reason_str: Option<String> = row.get(5);
1562 let hydrated_count: i64 = row.get(6);
1563 let total_count: i64 = row.get(7);
1564 let max_lag_secs: i64 = row.get(8);
1565 let total_replicas: i64 = row.get(9);
1566 let problematic_replicas: i64 = row.get(10);
1567
1568 let failure_reason = failure_reason_str.as_deref().map(|s| match s {
1569 "no_replicas" => FailureReason::NoReplicas,
1570 "all_replicas_problematic" => FailureReason::AllReplicasProblematic {
1571 problematic: problematic_replicas,
1572 total: total_replicas,
1573 },
1574 _ => FailureReason::NoReplicas,
1575 });
1576
1577 let status = match status_str.as_str() {
1578 "ready" => ClusterDeploymentStatus::Ready,
1579 "hydrating" => ClusterDeploymentStatus::Hydrating {
1580 hydrated: hydrated_count,
1581 total: total_count,
1582 },
1583 "lagging" => ClusterDeploymentStatus::Lagging { max_lag_secs },
1584 "failing" => ClusterDeploymentStatus::Failing {
1585 reason: failure_reason.clone().unwrap_or(FailureReason::NoReplicas),
1586 },
1587 _ => ClusterDeploymentStatus::Ready,
1588 };
1589
1590 yield HydrationStatusUpdate {
1591 cluster_name: row.get(2),
1592 cluster_id: row.get(3),
1593 status,
1594 failure_reason,
1595 hydrated_count,
1596 total_count,
1597 max_lag_secs,
1598 total_replicas,
1599 problematic_replicas,
1600 };
1601 }
1602 }
1603 }
1604 }
1605}
1606
1607pub(super) async fn delete_replacement_mvs(
1609 client: &Client,
1610 deploy_id: &str,
1611) -> Result<(), ConnectionError> {
1612 client
1613 .execute(
1614 "DELETE FROM _mz_deploy.tables.replacement_mvs WHERE deploy_id = $1",
1615 &[&deploy_id],
1616 )
1617 .await?;
1618
1619 Ok(())
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624 use super::hydration_status_query;
1625
1626 #[mz_ore::test]
1627 fn test_hydration_status_query_escapes_like_patterns() {
1628 let query = hydration_status_query(60);
1637
1638 assert_eq!(
1641 query.matches("LIKE $1").count(),
1642 3,
1643 "expected exactly three `LIKE $1` predicates, query: {}",
1644 query
1645 );
1646 assert_eq!(
1647 query.matches(r"LIKE $1 ESCAPE '\'").count(),
1648 3,
1649 "every `LIKE $1` predicate must use `ESCAPE '\\'`, query: {}",
1650 query
1651 );
1652 }
1653}