Skip to main content

mz_deploy/client/
deployment_ops.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment tracking operations.
11//!
12//! This module contains methods for managing deployment records in the database,
13//! including creating tracking tables, inserting/querying deployment records,
14//! and managing deployment lifecycle (staging, promotion, abort).
15//!
16//! ## Hydration State Machine
17//!
18//! During staging, clusters transition through the following states:
19//!
20//! ```text
21//!   ┌──────────┐    all objects hydrated     ┌───────┐
22//!   │ Hydrating │ ────────────────────────▶  │ Ready │
23//!   └──────────┘    & lag ≤ threshold         └───────┘
24//!        │                                       ▲
25//!        │         lag > threshold          ┌─────────┐
26//!        └────────────────────────────────▶ │ Lagging │
27//!                                           └─────────┘
28//!        │
29//!        ▼ (no replicas OR all replicas OOM-looping)
30//!   ┌─────────┐
31//!   │ Failing  │
32//!   └─────────┘
33//! ```
34//!
35//! - **Hydrating** → objects are being backfilled; progress tracked as
36//!   `hydrated / total` via `mz_internal.mz_hydration_statuses`.
37//! - **Ready** → all objects hydrated and max wallclock lag ≤ threshold
38//!   (default 300s / 5 minutes).
39//! - **Lagging** → all objects hydrated but wallclock lag exceeds threshold.
40//! - **Failing** → no replicas configured, or all replicas are problematic
41//!   (3+ OOM kills within 24 hours per `mz_cluster_replica_status_history`).
42//!
43//! ## Apply State Tracking
44//!
45//! The apply (cutover) process uses a pair of schemas in `_mz_deploy` as a
46//! state marker: `apply_<id>_pre` and `apply_<id>_post`. During the atomic
47//! swap transaction these schemas exchange names, moving the `swapped=true`
48//! comment to the `_pre` schema. This enables crash recovery:
49//! - Schema absent → `NotStarted`
50//! - `_pre` comment = `swapped=false` → `PreSwap` (resume pre-swap work)
51//! - `_pre` comment = `swapped=true` → `PostSwap` (resume post-swap work)
52//!
53//! ## SUBSCRIBE Streaming
54//!
55//! `subscribe_deployment_hydration` opens a `SUBSCRIBE` cursor over the
56//! hydration status query. Retractions (`mz_diff == -1`) are filtered out
57//! before yielding updates — only insertions are surfaced to the caller.
58
59use crate::client::connection::{Client, DeploymentsClient, DeploymentsClientMut};
60use crate::client::errors::ConnectionError;
61use crate::client::models::{
62    ApplyState, ConflictRecord, DeploymentDetails, DeploymentHistoryEntry, DeploymentKind,
63    DeploymentMetadata, DeploymentMode, DeploymentObjectRecord, PendingStatement,
64    ProductionClusterRecord, SchemaDeploymentRecord, StagingDeployment,
65};
66use crate::client::quote_identifier;
67use crate::client::staging_suffix_like_pattern;
68use crate::project::SchemaQualifier;
69use crate::project::analysis::deployment_snapshot::DeploymentSnapshot;
70use crate::project::ir::object_id::ObjectId;
71use async_stream::try_stream;
72use chrono::{DateTime, Utc};
73use futures::Stream;
74use mz_postgres_util::Sql;
75use std::collections::{BTreeMap, BTreeSet};
76use std::fmt;
77use tokio_postgres::types::ToSql;
78
79/// Reason why a cluster deployment is failing.
80#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
81#[serde(rename_all = "snake_case")]
82pub enum FailureReason {
83    /// Cluster has no replicas configured.
84    NoReplicas,
85    /// All replicas are experiencing repeated OOM kills (3+ in 24h).
86    AllReplicasProblematic { problematic: i64, total: i64 },
87}
88
89impl fmt::Display for FailureReason {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        match self {
92            FailureReason::NoReplicas => write!(f, "no replicas configured"),
93            FailureReason::AllReplicasProblematic { problematic, total } => {
94                write!(
95                    f,
96                    "all {} of {} replicas OOM-looping (3+ crashes in 24h)",
97                    problematic, total
98                )
99            }
100        }
101    }
102}
103
104/// Status of a cluster in a staging deployment.
105#[derive(Debug, Clone, PartialEq, serde::Serialize)]
106#[serde(rename_all = "snake_case")]
107pub enum ClusterDeploymentStatus {
108    /// Cluster is fully hydrated and lag is within threshold.
109    Ready,
110    /// Cluster is still hydrating.
111    Hydrating { hydrated: i64, total: i64 },
112    /// Cluster is hydrated but lag exceeds threshold.
113    Lagging { max_lag_secs: i64 },
114    /// Cluster is in a failing state.
115    Failing { reason: FailureReason },
116}
117
118/// Full status context for a cluster in a staging deployment.
119#[derive(Debug, Clone, serde::Serialize)]
120pub struct ClusterStatusContext {
121    /// Cluster name (with deployment suffix).
122    pub cluster_name: String,
123    /// Cluster ID.
124    pub cluster_id: String,
125    /// Overall status.
126    pub status: ClusterDeploymentStatus,
127    /// Number of hydrated objects.
128    pub hydrated_count: i64,
129    /// Total number of objects.
130    pub total_count: i64,
131    /// Maximum lag in seconds across all objects.
132    pub max_lag_secs: i64,
133    /// Total number of replicas.
134    pub total_replicas: i64,
135    /// Number of problematic (OOM-looping) replicas.
136    pub problematic_replicas: i64,
137}
138
139/// A hydration status update from the SUBSCRIBE stream.
140///
141/// This represents a single update from the streaming subscription
142/// to cluster hydration status. Retractions (mz_diff == -1) are
143/// filtered out before yielding these updates.
144#[derive(Debug, Clone)]
145pub struct HydrationStatusUpdate {
146    /// Cluster name (with deployment suffix).
147    pub cluster_name: String,
148    /// Cluster ID.
149    pub cluster_id: String,
150    /// Overall status.
151    pub status: ClusterDeploymentStatus,
152    /// Reason for failure, if status is Failing.
153    pub failure_reason: Option<FailureReason>,
154    /// Number of hydrated objects.
155    pub hydrated_count: i64,
156    /// Total number of objects.
157    pub total_count: i64,
158    /// Maximum lag in seconds across all objects.
159    pub max_lag_secs: i64,
160    /// Total number of replicas.
161    pub total_replicas: i64,
162    /// Number of problematic (OOM-looping) replicas.
163    pub problematic_replicas: i64,
164}
165
166/// Insert schema deployment records (insert-only, no DELETE).
167pub(super) async fn insert_schema_deployments(
168    client: &Client,
169    deployments: &[SchemaDeploymentRecord],
170) -> Result<(), ConnectionError> {
171    if deployments.is_empty() {
172        return Ok(());
173    }
174
175    let insert_sql = r#"
176        INSERT INTO _mz_deploy.tables.deployments
177            (deploy_id, database, schema, deployed_at, deployed_by, promoted_at, commit, kind, mode)
178        VALUES
179            ($1, $2, $3, $4, $5, $6, $7, $8, $9)
180    "#;
181
182    for deployment in deployments {
183        let kind_str = deployment.kind.to_string();
184        let mode_str = deployment.mode.to_string();
185        client
186            .execute(
187                insert_sql,
188                &[
189                    &deployment.deploy_id,
190                    &deployment.database,
191                    &deployment.schema,
192                    &deployment.deployed_at,
193                    &deployment.deployed_by,
194                    &deployment.promoted_at,
195                    &deployment.git_commit,
196                    &kind_str,
197                    &mode_str,
198                ],
199            )
200            .await?;
201    }
202
203    Ok(())
204}
205
206/// Append deployment object records (insert-only, never update or delete).
207pub(super) async fn append_deployment_objects(
208    client: &Client,
209    objects: &[DeploymentObjectRecord],
210) -> Result<(), ConnectionError> {
211    if objects.is_empty() {
212        return Ok(());
213    }
214
215    let insert_sql = r#"
216        INSERT INTO _mz_deploy.tables.objects
217            (deploy_id, database, schema, object, hash)
218        VALUES
219            ($1, $2, $3, $4, $5)
220    "#;
221
222    for obj in objects {
223        client
224            .execute(
225                insert_sql,
226                &[
227                    &obj.deploy_id,
228                    &obj.database,
229                    &obj.schema,
230                    &obj.object,
231                    &obj.object_hash,
232                ],
233            )
234            .await?;
235    }
236
237    Ok(())
238}
239
240/// Insert cluster records for a staging deployment.
241///
242/// Accepts cluster names and resolves them to cluster IDs internally.
243/// Fails if any cluster names cannot be resolved (cluster doesn't exist).
244pub(super) async fn insert_deployment_clusters(
245    client: &Client,
246    deploy_id: &str,
247    clusters: &[String],
248) -> Result<(), ConnectionError> {
249    if clusters.is_empty() {
250        return Ok(());
251    }
252
253    // Step 1: Query mz_catalog to get cluster IDs for the given names
254    let placeholders: Vec<String> = (1..=clusters.len()).map(|i| format!("${}", i)).collect();
255    let placeholders_str = placeholders.join(", ");
256
257    let select_sql = format!(
258        "SELECT name, id FROM mz_catalog.mz_clusters WHERE name IN ({})",
259        placeholders_str
260    );
261
262    #[allow(clippy::as_conversions)]
263    let params: Vec<&(dyn ToSql + Sync)> =
264        clusters.iter().map(|c| c as &(dyn ToSql + Sync)).collect();
265
266    let rows = client.query(&select_sql, &params).await?;
267
268    // Verify all clusters were found
269    if rows.len() != clusters.len() {
270        let found_names: BTreeSet<String> = rows.iter().map(|row| row.get("name")).collect();
271        let missing: Vec<&str> = clusters
272            .iter()
273            .filter(|name| !found_names.contains(*name))
274            .map(|s| s.as_str())
275            .collect();
276
277        return Err(ConnectionError::IntrospectionFailed {
278            object_type: "cluster".to_string(),
279            source: format!(
280                "Failed to resolve cluster names to IDs. The following clusters do not exist: {}",
281                missing.join(", ")
282            )
283            .into(),
284        });
285    }
286
287    // Step 2: Insert the cluster IDs into _mz_deploy.tables.clusters
288    let insert_sql = r#"
289        INSERT INTO _mz_deploy.tables.clusters (deploy_id, cluster_id)
290        VALUES ($1, $2)
291    "#;
292
293    for row in rows {
294        let cluster_id: String = row.get("id");
295        client
296            .execute(insert_sql, &[&deploy_id, &cluster_id])
297            .await?;
298    }
299
300    Ok(())
301}
302
303/// Get cluster names for a staging deployment.
304///
305/// Returns cluster names by resolving cluster IDs via JOIN with mz_catalog.mz_clusters.
306/// If a cluster ID exists in _mz_deploy.public.clusters but the cluster was deleted from the catalog,
307/// that cluster will be silently omitted from results.
308pub(super) async fn get_deployment_clusters(
309    client: &Client,
310    deploy_id: &str,
311) -> Result<Vec<String>, ConnectionError> {
312    let query = r#"
313        SELECT name
314        FROM _mz_deploy.public.deployment_clusters
315        WHERE deploy_id = $1
316        ORDER BY name
317    "#;
318
319    let rows = client.query(query, &[&deploy_id]).await?;
320
321    Ok(rows.iter().map(|row| row.get("name")).collect())
322}
323
324/// Validate that all cluster IDs in a deployment still exist in the catalog.
325///
326/// Returns an error if any cluster IDs in _mz_deploy.public.clusters cannot be resolved
327/// to clusters in mz_catalog.mz_clusters (i.e., clusters were deleted).
328pub(super) async fn validate_deployment_clusters(
329    client: &Client,
330    deploy_id: &str,
331) -> Result<(), ConnectionError> {
332    let query = r#"
333        SELECT cluster_id
334        FROM _mz_deploy.public.missing_clusters
335        WHERE deploy_id = $1
336    "#;
337
338    let rows = client.query(query, &[&deploy_id]).await?;
339
340    if !rows.is_empty() {
341        let missing_ids: Vec<String> = rows.iter().map(|row| row.get("cluster_id")).collect();
342        return Err(ConnectionError::IntrospectionFailed {
343            object_type: "cluster".to_string(),
344            source: format!(
345                "Deployment '{}' references {} cluster(s) that no longer exist: {}. \
346                 These clusters may have been deleted. Run 'mz-deploy abort {}' to clean up.",
347                deploy_id,
348                missing_ids.len(),
349                missing_ids.join(", "),
350                deploy_id
351            )
352            .into(),
353        });
354    }
355
356    Ok(())
357}
358
359/// Delete cluster records for a staging deployment.
360pub(super) async fn delete_deployment_clusters(
361    client: &Client,
362    deploy_id: &str,
363) -> Result<(), ConnectionError> {
364    client
365        .execute(
366            "DELETE FROM _mz_deploy.tables.clusters WHERE deploy_id = $1",
367            &[&deploy_id],
368        )
369        .await?;
370    Ok(())
371}
372
373/// Update promoted_at timestamp for a staging deployment.
374pub(super) async fn update_promoted_at(
375    client: &Client,
376    deploy_id: &str,
377) -> Result<(), ConnectionError> {
378    let update_sql = r#"
379        UPDATE _mz_deploy.tables.deployments
380        SET promoted_at = NOW()
381        WHERE deploy_id = $1
382    "#;
383
384    client.execute(update_sql, &[&deploy_id]).await?;
385    Ok(())
386}
387
388/// Delete all deployment records for a specific deployment.
389pub(super) async fn delete_deployment(
390    client: &Client,
391    deploy_id: &str,
392) -> Result<(), ConnectionError> {
393    client
394        .execute(
395            "DELETE FROM _mz_deploy.tables.deployments WHERE deploy_id = $1",
396            &[&deploy_id],
397        )
398        .await?;
399    client
400        .execute(
401            "DELETE FROM _mz_deploy.tables.objects WHERE deploy_id = $1",
402            &[&deploy_id],
403        )
404        .await?;
405    Ok(())
406}
407
408/// Get schema deployment records from the database for a specific deployment.
409pub(super) async fn get_schema_deployments(
410    client: &Client,
411    deploy_id: Option<&str>,
412) -> Result<Vec<SchemaDeploymentRecord>, ConnectionError> {
413    // The production view does not expose a mode column, so the no-filter
414    // branch hardcodes `'stage'` as a schema-compatibility placeholder. Every
415    // row from the production view is by definition a promoted deployment
416    // (non-null `promoted_at`); no caller of `SchemaDeploymentRecord` branches
417    // on `mode` when the record represents a promotion. The filtered branch
418    // reads `mode` from `_mz_deploy.public.deployments` normally.
419    let query = if deploy_id.is_none() {
420        r#"
421            SELECT deploy_id, database, schema,
422                   promoted_at as deployed_at,
423                   '' as deployed_by,
424                   promoted_at,
425                   commit,
426                   kind,
427                   'stage' as mode
428            FROM _mz_deploy.public.production
429            ORDER BY database, schema
430        "#
431    } else {
432        r#"
433            SELECT deploy_id, database, schema,
434                   deployed_at,
435                   deployed_by,
436                   promoted_at,
437                   commit,
438                   kind,
439                   mode
440            FROM _mz_deploy.public.deployments
441            WHERE deploy_id = $1
442            ORDER BY database, schema
443        "#
444    };
445
446    let rows = if deploy_id.is_none() {
447        client.query(query, &[]).await?
448    } else {
449        client.query(query, &[&deploy_id]).await?
450    };
451
452    let mut records = Vec::new();
453    for row in rows {
454        let deploy_id: String = row.get("deploy_id");
455        let database: String = row.get("database");
456        let schema: String = row.get("schema");
457        let deployed_at: DateTime<Utc> = row.get("deployed_at");
458        let deployed_by: String = row.get("deployed_by");
459        let promoted_at: Option<DateTime<Utc>> = row.get("promoted_at");
460        let git_commit: Option<String> = row.get("commit");
461        let kind_str: String = row.get("kind");
462        let mode_str: String = row.get("mode");
463
464        let kind = kind_str.parse().map_err(|e| {
465            ConnectionError::Message(format!("Failed to parse deployment kind: {}", e))
466        })?;
467        let mode = mode_str.parse::<DeploymentMode>().map_err(|e| {
468            ConnectionError::Message(format!("Failed to parse deployment mode: {}", e))
469        })?;
470
471        records.push(SchemaDeploymentRecord {
472            deploy_id,
473            database,
474            schema,
475            deployed_at,
476            deployed_by,
477            promoted_at,
478            git_commit,
479            kind,
480            mode,
481        });
482    }
483
484    Ok(records)
485}
486
487/// Get deployment object records from the database for a specific deployment.
488pub(super) async fn get_deployment_objects(
489    client: &Client,
490    deploy_id: Option<&str>,
491) -> Result<DeploymentSnapshot, ConnectionError> {
492    let query = if deploy_id.is_none() {
493        r#"
494            SELECT o.database, o.schema, o.object, o.hash, p.kind
495            FROM _mz_deploy.public.objects o
496            JOIN _mz_deploy.public.production p
497              ON o.database = p.database AND o.schema = p.schema
498            WHERE o.deploy_id = p.deploy_id
499        "#
500    } else {
501        r#"
502            SELECT o.database, o.schema, o.object, o.hash, d.kind
503            FROM _mz_deploy.public.objects o
504            JOIN _mz_deploy.public.deployments d
505              ON o.deploy_id = d.deploy_id
506                AND o.database = d.database
507                AND o.schema = d.schema
508            WHERE o.deploy_id = $1
509        "#
510    };
511
512    let rows = if deploy_id.is_none() {
513        client.query(query, &[]).await?
514    } else {
515        client.query(query, &[&deploy_id]).await?
516    };
517
518    let mut objects = BTreeMap::new();
519    let mut schemas = BTreeMap::new();
520    for row in rows {
521        let database: String = row.get("database");
522        let schema: String = row.get("schema");
523        let object: String = row.get("object");
524        let object_hash: String = row.get("hash");
525
526        let kind_str: String = row.get("kind");
527        let kind = kind_str.parse().map_err(|e| {
528            ConnectionError::Message(format!("Failed to parse deployment kind: {}", e))
529        })?;
530
531        let object_id = ObjectId::new(database.clone(), schema.clone(), object);
532        objects.insert(object_id, object_hash);
533        schemas
534            .entry(SchemaQualifier::new(database, schema))
535            .or_insert(kind);
536    }
537
538    Ok(DeploymentSnapshot { objects, schemas })
539}
540
541/// Get metadata about a deployment for validation.
542pub(super) async fn get_deployment_metadata(
543    client: &Client,
544    deploy_id: &str,
545) -> Result<Option<DeploymentMetadata>, ConnectionError> {
546    let query = r#"
547        SELECT deploy_id,
548               promoted_at,
549               mode,
550               database,
551               schema
552        FROM _mz_deploy.public.deployments
553        WHERE deploy_id = $1
554    "#;
555
556    let rows = client.query(query, &[&deploy_id]).await?;
557
558    if rows.is_empty() {
559        return Ok(None);
560    }
561
562    let first_row = &rows[0];
563    let deploy_id: String = first_row.get("deploy_id");
564    let promoted_at: Option<DateTime<Utc>> = first_row.get("promoted_at");
565    let mode_str: String = first_row.get("mode");
566    let mode = mode_str
567        .parse::<DeploymentMode>()
568        .map_err(|e| ConnectionError::Message(format!("Failed to parse deployment mode: {}", e)))?;
569
570    let mut schemas = Vec::new();
571    for row in rows {
572        let database: String = row.get("database");
573        let schema: String = row.get("schema");
574        schemas.push(SchemaQualifier::new(database, schema));
575    }
576
577    Ok(Some(DeploymentMetadata {
578        deploy_id,
579        promoted_at,
580        mode,
581        schemas,
582    }))
583}
584
585/// Get detailed information about a specific deployment.
586///
587/// Returns deployment details if the deployment exists, or None if not found.
588pub(super) async fn get_deployment_details(
589    client: &Client,
590    deploy_id: &str,
591) -> Result<Option<DeploymentDetails>, ConnectionError> {
592    let query = r#"
593        SELECT deploy_id,
594               deployed_at,
595               promoted_at,
596               deployed_by,
597               commit,
598               kind,
599               mode,
600               database,
601               schema
602        FROM _mz_deploy.public.deployments
603        WHERE deploy_id = $1
604        ORDER BY database, schema
605    "#;
606
607    let rows = client.query(query, &[&deploy_id]).await?;
608
609    if rows.is_empty() {
610        return Ok(None);
611    }
612
613    let first_row = &rows[0];
614    let deployed_at: DateTime<Utc> = first_row.get("deployed_at");
615    let promoted_at: Option<DateTime<Utc>> = first_row.get("promoted_at");
616    let deployed_by: String = first_row.get("deployed_by");
617    let git_commit: Option<String> = first_row.get("commit");
618    let kind_str: String = first_row.get("kind");
619    let kind: DeploymentKind = kind_str.parse().map_err(ConnectionError::Message)?;
620    let mode_str: String = first_row.get("mode");
621    let mode = mode_str
622        .parse::<DeploymentMode>()
623        .map_err(|e| ConnectionError::Message(format!("Failed to parse deployment mode: {}", e)))?;
624
625    let mut schemas = Vec::new();
626    for row in rows {
627        let database: String = row.get("database");
628        let schema: String = row.get("schema");
629        schemas.push(SchemaQualifier::new(database, schema));
630    }
631
632    Ok(Some(DeploymentDetails {
633        deployed_at,
634        promoted_at,
635        deployed_by,
636        git_commit,
637        kind,
638        mode,
639        schemas,
640    }))
641}
642
643/// List all staging deployments (promoted_at IS NULL), grouped by deploy_id.
644///
645/// Returns a map from deploy_id to staging deployment details.
646pub(super) async fn list_staging_deployments(
647    client: &Client,
648) -> Result<BTreeMap<String, StagingDeployment>, ConnectionError> {
649    let query = r#"
650        SELECT deploy_id,
651               deployed_at,
652               deployed_by,
653               commit,
654               kind,
655               mode,
656               database,
657               schema
658        FROM _mz_deploy.public.staging_deployments
659        ORDER BY deploy_id, database, schema
660    "#;
661
662    let rows = client.query(query, &[]).await?;
663
664    let mut deployments: BTreeMap<String, StagingDeployment> = BTreeMap::new();
665
666    for row in rows {
667        let deploy_id: String = row.get("deploy_id");
668        let deployed_at: DateTime<Utc> = row.get("deployed_at");
669        let deployed_by: String = row.get("deployed_by");
670        let git_commit: Option<String> = row.get("commit");
671        let kind_str: String = row.get("kind");
672        let mode_str: String = row.get("mode");
673        let database: String = row.get("database");
674        let schema: String = row.get("schema");
675
676        // Parse outside the closure so errors propagate instead of silently
677        // defaulting — a garbled `kind` or `mode` column is data corruption
678        // and should surface loud.
679        let kind: DeploymentKind = kind_str.parse().map_err(|e| {
680            ConnectionError::Message(format!("Failed to parse deployment kind: {}", e))
681        })?;
682        let mode: DeploymentMode = mode_str.parse().map_err(|e| {
683            ConnectionError::Message(format!("Failed to parse deployment mode: {}", e))
684        })?;
685
686        deployments
687            .entry(deploy_id)
688            .or_insert_with(|| StagingDeployment {
689                deployed_at,
690                deployed_by: deployed_by.clone(),
691                git_commit: git_commit.clone(),
692                kind,
693                mode,
694                schemas: Vec::new(),
695            })
696            .schemas
697            .push(SchemaQualifier::new(database, schema));
698    }
699
700    Ok(deployments)
701}
702
703/// List deployment history in chronological order (promoted deployments only).
704///
705/// Returns a vector of deployment history entries ordered by promotion time.
706pub(super) async fn list_deployment_history(
707    client: &Client,
708    limit: Option<usize>,
709) -> Result<Vec<DeploymentHistoryEntry>, ConnectionError> {
710    // We need to limit unique deployments, not individual schema rows
711    // First get distinct deployments, then join with schemas
712    let query = if let Some(limit) = limit {
713        format!(
714            r#"
715            WITH unique_deployments AS (
716                SELECT DISTINCT deploy_id, promoted_at, deployed_by, commit, kind
717                FROM _mz_deploy.public.deployments
718                WHERE promoted_at IS NOT NULL
719                ORDER BY promoted_at DESC
720                LIMIT {}
721            )
722            SELECT d.deploy_id,
723                   d.promoted_at,
724                   d.deployed_by,
725                   d.commit,
726                   d.kind,
727                   d.database,
728                   d.schema
729            FROM _mz_deploy.public.deployments d
730            JOIN unique_deployments u
731              ON d.deploy_id = u.deploy_id
732              AND d.promoted_at = u.promoted_at
733              AND d.deployed_by = u.deployed_by
734            ORDER BY d.promoted_at DESC, d.database, d.schema
735        "#,
736            limit
737        )
738    } else {
739        r#"
740            SELECT deploy_id,
741                   promoted_at,
742                   deployed_by,
743                   commit,
744                   kind,
745                   database,
746                   schema
747            FROM _mz_deploy.public.deployments
748            WHERE promoted_at IS NOT NULL
749            ORDER BY promoted_at DESC, database, schema
750        "#
751        .to_string()
752    };
753
754    let rows = client.query(&query, &[]).await?;
755
756    // Group by (deploy_id, promoted_at, deployed_by, commit, kind)
757    let mut deployments: Vec<DeploymentHistoryEntry> = Vec::new();
758    let mut current_deploy_id: Option<String> = None;
759
760    for row in rows {
761        let deploy_id: String = row.get("deploy_id");
762        let promoted_at: DateTime<Utc> = row.get("promoted_at");
763        let deployed_by: String = row.get("deployed_by");
764        let git_commit: Option<String> = row.get("commit");
765        let kind_str: String = row.get("kind");
766        let database: String = row.get("database");
767        let schema: String = row.get("schema");
768
769        // Check if this is a new deployment or same as current
770        if current_deploy_id.as_ref() != Some(&deploy_id) {
771            // Parse kind - default to Objects if parsing fails (shouldn't happen)
772            let kind = kind_str.parse().unwrap_or(DeploymentKind::Objects);
773            // Start a new deployment group
774            deployments.push(DeploymentHistoryEntry {
775                deploy_id: deploy_id.clone(),
776                promoted_at,
777                deployed_by,
778                git_commit,
779                kind,
780                schemas: vec![SchemaQualifier::new(database, schema)],
781            });
782            current_deploy_id = Some(deploy_id);
783        } else {
784            // Add schema to current deployment
785            if let Some(last) = deployments.last_mut() {
786                last.schemas.push(SchemaQualifier::new(database, schema));
787            }
788        }
789    }
790
791    Ok(deployments)
792}
793
794/// Check for deployment conflicts (schemas updated after deployment started).
795pub(super) async fn check_deployment_conflicts(
796    client: &Client,
797    deploy_id: &str,
798) -> Result<Vec<ConflictRecord>, ConnectionError> {
799    let query = r#"
800        SELECT p.database, p.schema, p.deploy_id, p.promoted_at
801        FROM _mz_deploy.public.production p
802        JOIN _mz_deploy.public.deployments d USING (database, schema)
803        WHERE d.deploy_id = $1 AND p.promoted_at > d.deployed_at
804    "#;
805
806    let rows = client.query(query, &[&deploy_id]).await?;
807
808    let conflicts = rows
809        .iter()
810        .map(|row| ConflictRecord {
811            database: row.get("database"),
812            schema: row.get("schema"),
813            deploy_id: row.get("deploy_id"),
814            promoted_at: row.get("promoted_at"),
815        })
816        .collect();
817
818    Ok(conflicts)
819}
820
821/// List clusters that host at least one promoted deployment.
822///
823/// For each such cluster, returns one representative protected deployment
824/// (the most recently promoted one) so callers can explain *why* the
825/// cluster is classified production. Used by `dev` to refuse overlay
826/// deployments that would land on production compute.
827pub(super) async fn list_production_clusters(
828    client: &Client,
829) -> Result<Vec<ProductionClusterRecord>, ConnectionError> {
830    // Walk the catalog rather than reading `_mz_deploy.tables.clusters`
831    // directly. Two reasons: (1) the promote swap renames the staging
832    // cluster into place, changing the `mz_clusters.id` of the production
833    // cluster — any id we stored at stage time is stale. (2) `mz_catalog`
834    // is readable by every role, while `_mz_deploy.tables.*` is only
835    // readable by `materialize_deployer`, so this query works for the
836    // `dev` guard run as `materialize_developer`.
837    let query = r#"
838        SELECT DISTINCT ON (c.name)
839            c.name         AS cluster_name,
840            p.database     AS database,
841            p.schema       AS schema,
842            p.promoted_at  AS promoted_at
843        FROM _mz_deploy.public.production p
844        JOIN mz_catalog.mz_databases d ON d.name = p.database
845        JOIN mz_catalog.mz_schemas s
846          ON s.name = p.schema AND s.database_id = d.id
847        JOIN mz_catalog.mz_objects o
848          ON o.schema_id = s.id AND o.cluster_id IS NOT NULL
849        JOIN mz_catalog.mz_clusters c ON c.id = o.cluster_id
850        ORDER BY c.name, p.promoted_at DESC
851    "#;
852
853    let rows = client.query(query, &[]).await?;
854    let records = rows
855        .iter()
856        .map(|row| ProductionClusterRecord {
857            cluster_name: row.get("cluster_name"),
858            database: row.get("database"),
859            schema: row.get("schema"),
860            promoted_at: row.get("promoted_at"),
861        })
862        .collect();
863    Ok(records)
864}
865
866/// Check if the deployment tracking table exists.
867pub(super) async fn deployment_table_exists(client: &Client) -> Result<bool, ConnectionError> {
868    let query = r#"
869        SELECT EXISTS(
870            SELECT 1
871            FROM mz_catalog.mz_tables t
872            JOIN mz_catalog.mz_schemas s ON t.schema_id = s.id
873            JOIN mz_catalog.mz_databases d ON s.database_id = d.id
874            WHERE t.name = 'deployments'
875                AND s.name = 'public'
876                AND d.name = '_mz_deploy'
877        )
878    "#;
879
880    let row = client.query_one(query, &[]).await?;
881
882    Ok(row.get(0))
883}
884
885/// Default allowed lag threshold in seconds (5 minutes).
886pub const DEFAULT_ALLOWED_LAG_SECS: i64 = 300;
887
888/// Build the shared hydration-status SQL query.
889///
890/// Both `get_deployment_hydration_status` (one-shot SELECT) and
891/// `subscribe_deployment_hydration` (SUBSCRIBE) use the same CTE logic.
892/// The query expects a single `$1` parameter for the cluster name LIKE pattern.
893fn hydration_status_query(allowed_lag_secs: i64) -> String {
894    format!(
895        r#"
896        WITH
897        problematic_replicas AS (
898            SELECT replica_id
899            FROM mz_internal.mz_cluster_replica_status_history
900            WHERE occurred_at + INTERVAL '24 hours' > mz_now()
901              AND reason = 'oom-killed'
902            GROUP BY replica_id
903            HAVING COUNT(*) >= 3
904        ),
905        cluster_health AS (
906            SELECT
907                c.name AS cluster_name,
908                c.id AS cluster_id,
909                COUNT(r.id) AS total_replicas,
910                COUNT(pr.replica_id) AS problematic_replicas
911            FROM mz_clusters c
912            LEFT JOIN mz_cluster_replicas r ON c.id = r.cluster_id
913            LEFT JOIN problematic_replicas pr ON r.id = pr.replica_id
914            WHERE c.name LIKE $1 ESCAPE '\'
915            GROUP BY c.name, c.id
916        ),
917        hydration_counts AS (
918            SELECT
919                c.name AS cluster_name,
920                r.id AS replica_id,
921                COUNT(*) FILTER (WHERE mhs.hydrated) AS hydrated,
922                COUNT(*) AS total
923            FROM mz_clusters c
924            JOIN mz_cluster_replicas r ON c.id = r.cluster_id
925            LEFT JOIN mz_internal.mz_hydration_statuses mhs ON mhs.replica_id = r.id
926            WHERE c.name LIKE $1 ESCAPE '\'
927            GROUP BY c.name, r.id
928        ),
929        hydration_best AS (
930            SELECT cluster_name, MAX(hydrated) AS hydrated, MAX(total) AS total
931            FROM hydration_counts
932            GROUP BY cluster_name
933        ),
934        cluster_lag AS (
935            SELECT
936                c.name AS cluster_name,
937                MAX(EXTRACT(EPOCH FROM wgl.lag)) AS max_lag_secs
938            FROM mz_clusters c
939            JOIN mz_cluster_replicas r ON c.id = r.cluster_id
940            JOIN mz_internal.mz_hydration_statuses mhs ON mhs.replica_id = r.id
941            JOIN mz_internal.mz_wallclock_global_lag wgl ON wgl.object_id = mhs.object_id
942            WHERE c.name LIKE $1 ESCAPE '\'
943            GROUP BY c.name
944        )
945        SELECT
946            ch.cluster_name,
947            ch.cluster_id,
948            CASE
949                WHEN ch.total_replicas = 0 THEN 'failing'
950                WHEN ch.total_replicas = ch.problematic_replicas THEN 'failing'
951                WHEN COALESCE(hb.hydrated, 0) < COALESCE(hb.total, 0) THEN 'hydrating'
952                WHEN COALESCE(cl.max_lag_secs, 0) > {allowed_lag_secs} THEN 'lagging'
953                ELSE 'ready'
954            END AS status,
955            CASE
956                WHEN ch.total_replicas = 0 THEN 'no_replicas'
957                WHEN ch.total_replicas = ch.problematic_replicas THEN 'all_replicas_problematic'
958                ELSE NULL
959            END AS failure_reason,
960            COALESCE(hb.hydrated, 0) AS hydrated_count,
961            COALESCE(hb.total, 0) AS total_count,
962            COALESCE(cl.max_lag_secs, 0)::bigint AS max_lag_secs,
963            ch.total_replicas,
964            ch.problematic_replicas
965        FROM cluster_health ch
966        LEFT JOIN hydration_best hb ON ch.cluster_name = hb.cluster_name
967        LEFT JOIN cluster_lag cl ON ch.cluster_name = cl.cluster_name
968    "#,
969        allowed_lag_secs = allowed_lag_secs
970    )
971}
972
973/// Get detailed hydration and health status for clusters in a staging deployment.
974///
975/// This function checks:
976/// - Hydration progress for each cluster
977/// - Wallclock lag to determine if data is fresh
978/// - Replica health (detecting OOM-looping replicas)
979///
980/// # Arguments
981/// * `client` - Database client
982/// * `deploy_id` - Staging deployment ID
983/// * `allowed_lag_secs` - Maximum allowed lag in seconds before marking as "lagging"
984///
985/// # Returns
986/// A vector of `ClusterStatusContext` with full status details for each cluster.
987pub(super) async fn get_deployment_hydration_status(
988    client: &Client,
989    deploy_id: &str,
990    allowed_lag_secs: i64,
991) -> Result<Vec<ClusterStatusContext>, ConnectionError> {
992    let pattern = staging_suffix_like_pattern(deploy_id);
993    let query = hydration_status_query(allowed_lag_secs);
994    let rows = client.query(&query, &[&pattern]).await?;
995
996    let mut results = Vec::new();
997    for row in rows {
998        let cluster_name: String = row.get("cluster_name");
999        let cluster_id: String = row.get("cluster_id");
1000        let status_str: String = row.get("status");
1001        let failure_reason: Option<String> = row.get("failure_reason");
1002        let hydrated_count: i64 = row.get("hydrated_count");
1003        let total_count: i64 = row.get("total_count");
1004        let max_lag_secs: i64 = row.get("max_lag_secs");
1005        let total_replicas: i64 = row.get("total_replicas");
1006        let problematic_replicas: i64 = row.get("problematic_replicas");
1007
1008        let status = match status_str.as_str() {
1009            "ready" => ClusterDeploymentStatus::Ready,
1010            "hydrating" => ClusterDeploymentStatus::Hydrating {
1011                hydrated: hydrated_count,
1012                total: total_count,
1013            },
1014            "lagging" => ClusterDeploymentStatus::Lagging { max_lag_secs },
1015            "failing" => {
1016                let reason = match failure_reason.as_deref() {
1017                    Some("no_replicas") => FailureReason::NoReplicas,
1018                    Some("all_replicas_problematic") => FailureReason::AllReplicasProblematic {
1019                        problematic: problematic_replicas,
1020                        total: total_replicas,
1021                    },
1022                    _ => FailureReason::NoReplicas, // fallback
1023                };
1024                ClusterDeploymentStatus::Failing { reason }
1025            }
1026            _ => ClusterDeploymentStatus::Ready, // fallback
1027        };
1028
1029        results.push(ClusterStatusContext {
1030            cluster_name,
1031            cluster_id,
1032            status,
1033            hydrated_count,
1034            total_count,
1035            max_lag_secs,
1036            total_replicas,
1037            problematic_replicas,
1038        });
1039    }
1040
1041    Ok(results)
1042}
1043
1044/// Create apply state schemas with comments for tracking apply progress.
1045///
1046/// Creates two schemas in `_mz_deploy`:
1047/// - `apply_<deploy_id>_pre` with comment 'swapped=false'
1048/// - `apply_<deploy_id>_post` with comment 'swapped=true'
1049///
1050/// The schemas are created first (if they don't exist), then comments are set
1051/// (if they don't have comments). During the swap transaction, the schemas
1052/// exchange names, which effectively moves the 'swapped=true' comment to the
1053/// `_pre` schema.
1054pub(super) async fn create_apply_state_schemas(
1055    client: &Client,
1056    deploy_id: &str,
1057) -> Result<(), ConnectionError> {
1058    let pre_schema = format!("apply_{}_pre", deploy_id);
1059    let post_schema = format!("apply_{}_post", deploy_id);
1060    let pre_schema_quoted = quote_identifier(&pre_schema);
1061    let post_schema_quoted = quote_identifier(&post_schema);
1062
1063    let create_pre = format!(
1064        "CREATE SCHEMA IF NOT EXISTS _mz_deploy.{}",
1065        pre_schema_quoted
1066    );
1067    client.execute(&create_pre, &[]).await?;
1068
1069    let create_post = format!(
1070        "CREATE SCHEMA IF NOT EXISTS _mz_deploy.{}",
1071        post_schema_quoted
1072    );
1073    client.execute(&create_post, &[]).await?;
1074
1075    let comment_check_query = r#"
1076        SELECT c.comment
1077        FROM mz_catalog.mz_schemas s
1078        JOIN mz_catalog.mz_databases d ON s.database_id = d.id
1079        LEFT JOIN mz_internal.mz_comments c ON s.id = c.id
1080        WHERE s.name = $1 AND d.name = '_mz_deploy'
1081    "#;
1082
1083    let rows = client.query(comment_check_query, &[&pre_schema]).await?;
1084    if !rows.is_empty() {
1085        let comment: Option<String> = rows[0].get("comment");
1086        if comment.is_none() {
1087            let comment_pre = format!(
1088                "COMMENT ON SCHEMA _mz_deploy.{} IS 'swapped=false'",
1089                pre_schema_quoted
1090            );
1091            client.execute(&comment_pre, &[]).await?;
1092        }
1093    }
1094
1095    let rows = client.query(comment_check_query, &[&post_schema]).await?;
1096    if !rows.is_empty() {
1097        let comment: Option<String> = rows[0].get("comment");
1098        if comment.is_none() {
1099            let comment_post = format!(
1100                "COMMENT ON SCHEMA _mz_deploy.{} IS 'swapped=true'",
1101                post_schema_quoted
1102            );
1103            client.execute(&comment_post, &[]).await?;
1104        }
1105    }
1106
1107    Ok(())
1108}
1109
1110/// Get the current apply state for a deployment.
1111///
1112/// Checks for the existence of `_mz_deploy.apply_<deploy_id>_pre` schema
1113/// and its comment to determine the state:
1114/// - Schema doesn't exist → NotStarted
1115/// - Schema exists with comment 'swapped=false' → PreSwap
1116/// - Schema exists with comment 'swapped=true' → PostSwap
1117pub(super) async fn get_apply_state(
1118    client: &Client,
1119    deploy_id: &str,
1120) -> Result<ApplyState, ConnectionError> {
1121    let pre_schema = format!("apply_{}_pre", deploy_id);
1122
1123    // Query schema existence and comment using mz_internal.mz_comments
1124    let query = r#"
1125        SELECT c.comment
1126        FROM mz_catalog.mz_schemas s
1127        JOIN mz_catalog.mz_databases d ON s.database_id = d.id
1128        LEFT JOIN mz_internal.mz_comments c ON s.id = c.id
1129        WHERE s.name = $1 AND d.name = '_mz_deploy'
1130    "#;
1131
1132    let rows = client.query(query, &[&pre_schema]).await?;
1133
1134    if rows.is_empty() {
1135        return Ok(ApplyState::NotStarted);
1136    }
1137
1138    let comment: Option<String> = rows[0].get("comment");
1139    match comment.as_deref() {
1140        Some("swapped=false") => Ok(ApplyState::PreSwap),
1141        Some("swapped=true") => Ok(ApplyState::PostSwap),
1142        _ => {
1143            // Unexpected comment or no comment - treat as not started
1144            Ok(ApplyState::NotStarted)
1145        }
1146    }
1147}
1148
1149/// Delete apply state schemas after successful completion.
1150pub(super) async fn delete_apply_state_schemas(
1151    client: &Client,
1152    deploy_id: &str,
1153) -> Result<(), ConnectionError> {
1154    let pre_schema = format!("apply_{}_pre", deploy_id);
1155    let post_schema = format!("apply_{}_post", deploy_id);
1156
1157    let drop_pre = format!(
1158        "DROP SCHEMA IF EXISTS _mz_deploy.{}",
1159        quote_identifier(&pre_schema)
1160    );
1161    client.execute(&drop_pre, &[]).await?;
1162
1163    let drop_post = format!(
1164        "DROP SCHEMA IF EXISTS _mz_deploy.{}",
1165        quote_identifier(&post_schema)
1166    );
1167    client.execute(&drop_post, &[]).await?;
1168
1169    Ok(())
1170}
1171
1172/// Insert pending statements for deferred execution (e.g., sinks).
1173pub(super) async fn insert_pending_statements(
1174    client: &Client,
1175    statements: &[PendingStatement],
1176) -> Result<(), ConnectionError> {
1177    if statements.is_empty() {
1178        return Ok(());
1179    }
1180
1181    let insert_sql = r#"
1182        INSERT INTO _mz_deploy.tables.pending_statements
1183            (deploy_id, sequence_num, database, schema, object, object_hash, statement_sql, statement_kind, executed_at)
1184        VALUES
1185            ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1186    "#;
1187
1188    for stmt in statements {
1189        client
1190            .execute(
1191                insert_sql,
1192                &[
1193                    &stmt.deploy_id,
1194                    &stmt.sequence_num,
1195                    &stmt.database,
1196                    &stmt.schema,
1197                    &stmt.object,
1198                    &stmt.object_hash,
1199                    &stmt.statement_sql,
1200                    &stmt.statement_kind,
1201                    &stmt.executed_at,
1202                ],
1203            )
1204            .await?;
1205    }
1206
1207    Ok(())
1208}
1209
1210/// Get pending statements for a deployment that haven't been executed yet.
1211pub(super) async fn get_pending_statements(
1212    client: &Client,
1213    deploy_id: &str,
1214) -> Result<Vec<PendingStatement>, ConnectionError> {
1215    let query = r#"
1216        SELECT deploy_id, sequence_num, database, schema, object, object_hash,
1217               statement_sql, statement_kind, executed_at
1218        FROM _mz_deploy.public.pending_statements
1219        WHERE deploy_id = $1 AND executed_at IS NULL
1220        ORDER BY sequence_num
1221    "#;
1222
1223    let rows = client.query(query, &[&deploy_id]).await?;
1224
1225    let mut statements = Vec::new();
1226    for row in rows {
1227        statements.push(PendingStatement {
1228            deploy_id: row.get("deploy_id"),
1229            sequence_num: row.get("sequence_num"),
1230            database: row.get("database"),
1231            schema: row.get("schema"),
1232            object: row.get("object"),
1233            object_hash: row.get("object_hash"),
1234            statement_sql: row.get("statement_sql"),
1235            statement_kind: row.get("statement_kind"),
1236            executed_at: row.get("executed_at"),
1237        });
1238    }
1239
1240    Ok(statements)
1241}
1242
1243/// Mark a pending statement as executed.
1244pub(super) async fn mark_statement_executed(
1245    client: &Client,
1246    deploy_id: &str,
1247    sequence_num: i32,
1248) -> Result<(), ConnectionError> {
1249    let update_sql = r#"
1250        UPDATE _mz_deploy.tables.pending_statements
1251        SET executed_at = NOW()
1252        WHERE deploy_id = $1 AND sequence_num = $2
1253    "#;
1254
1255    client
1256        .execute(update_sql, &[&deploy_id, &sequence_num])
1257        .await?;
1258
1259    Ok(())
1260}
1261
1262/// Delete all pending statements for a deployment.
1263pub(super) async fn delete_pending_statements(
1264    client: &Client,
1265    deploy_id: &str,
1266) -> Result<(), ConnectionError> {
1267    client
1268        .execute(
1269            "DELETE FROM _mz_deploy.tables.pending_statements WHERE deploy_id = $1",
1270            &[&deploy_id],
1271        )
1272        .await?;
1273
1274    Ok(())
1275}
1276
1277/// Insert replacement MV records for a deployment.
1278pub(super) async fn insert_replacement_mvs(
1279    client: &Client,
1280    records: &[super::models::ReplacementMvRecord],
1281) -> Result<(), ConnectionError> {
1282    for record in records {
1283        client
1284            .execute(
1285                r#"INSERT INTO _mz_deploy.tables.replacement_mvs
1286                   (deploy_id, target_database, target_schema, target_name,
1287                    replacement_schema)
1288                   VALUES ($1, $2, $3, $4, $5)"#,
1289                &[
1290                    &record.deploy_id,
1291                    &record.target_database,
1292                    &record.target_schema,
1293                    &record.target_name,
1294                    &record.replacement_schema,
1295                ],
1296            )
1297            .await?;
1298    }
1299
1300    Ok(())
1301}
1302
1303/// Get replacement MV records for a deployment.
1304pub(super) async fn get_replacement_mvs(
1305    client: &Client,
1306    deploy_id: &str,
1307) -> Result<Vec<super::models::ReplacementMvRecord>, ConnectionError> {
1308    let rows = client
1309        .query(
1310            r#"SELECT deploy_id, target_database, target_schema, target_name,
1311                      replacement_schema
1312               FROM _mz_deploy.public.replacement_mvs
1313               WHERE deploy_id = $1
1314               ORDER BY target_database, target_schema, target_name"#,
1315            &[&deploy_id],
1316        )
1317        .await?;
1318
1319    Ok(rows
1320        .iter()
1321        .map(|row| super::models::ReplacementMvRecord {
1322            deploy_id: row.get("deploy_id"),
1323            target_database: row.get("target_database"),
1324            target_schema: row.get("target_schema"),
1325            target_name: row.get("target_name"),
1326            replacement_schema: row.get("replacement_schema"),
1327        })
1328        .collect())
1329}
1330
1331impl DeploymentsClient<'_> {
1332    pub async fn insert_schema_deployments(
1333        &self,
1334        deployments: &[SchemaDeploymentRecord],
1335    ) -> Result<(), ConnectionError> {
1336        insert_schema_deployments(self.client, deployments).await
1337    }
1338
1339    pub async fn append_deployment_objects(
1340        &self,
1341        objects: &[DeploymentObjectRecord],
1342    ) -> Result<(), ConnectionError> {
1343        append_deployment_objects(self.client, objects).await
1344    }
1345
1346    pub async fn insert_deployment_clusters(
1347        &self,
1348        deploy_id: &str,
1349        clusters: &[String],
1350    ) -> Result<(), ConnectionError> {
1351        insert_deployment_clusters(self.client, deploy_id, clusters).await
1352    }
1353
1354    pub async fn get_deployment_clusters(
1355        &self,
1356        deploy_id: &str,
1357    ) -> Result<Vec<String>, ConnectionError> {
1358        get_deployment_clusters(self.client, deploy_id).await
1359    }
1360
1361    pub async fn validate_deployment_clusters(
1362        &self,
1363        deploy_id: &str,
1364    ) -> Result<(), ConnectionError> {
1365        validate_deployment_clusters(self.client, deploy_id).await
1366    }
1367
1368    /// List clusters that host at least one promoted deployment, with one
1369    /// representative protected deployment per cluster.
1370    pub async fn list_production_clusters(
1371        &self,
1372    ) -> Result<Vec<ProductionClusterRecord>, ConnectionError> {
1373        list_production_clusters(self.client).await
1374    }
1375
1376    pub async fn get_deployment_hydration_status(
1377        &self,
1378        deploy_id: &str,
1379    ) -> Result<Vec<ClusterStatusContext>, ConnectionError> {
1380        get_deployment_hydration_status(self.client, deploy_id, DEFAULT_ALLOWED_LAG_SECS).await
1381    }
1382
1383    pub async fn get_deployment_hydration_status_with_lag(
1384        &self,
1385        deploy_id: &str,
1386        allowed_lag_secs: i64,
1387    ) -> Result<Vec<ClusterStatusContext>, ConnectionError> {
1388        get_deployment_hydration_status(self.client, deploy_id, allowed_lag_secs).await
1389    }
1390
1391    pub async fn delete_deployment_clusters(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1392        delete_deployment_clusters(self.client, deploy_id).await
1393    }
1394
1395    pub async fn update_promoted_at(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1396        update_promoted_at(self.client, deploy_id).await
1397    }
1398
1399    pub async fn delete_deployment(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1400        delete_deployment(self.client, deploy_id).await
1401    }
1402
1403    pub async fn get_schema_deployments(
1404        &self,
1405        deploy_id: Option<&str>,
1406    ) -> Result<Vec<SchemaDeploymentRecord>, ConnectionError> {
1407        get_schema_deployments(self.client, deploy_id).await
1408    }
1409
1410    pub async fn get_deployment_objects(
1411        &self,
1412        deploy_id: Option<&str>,
1413    ) -> Result<DeploymentSnapshot, ConnectionError> {
1414        get_deployment_objects(self.client, deploy_id).await
1415    }
1416
1417    pub async fn get_deployment_metadata(
1418        &self,
1419        deploy_id: &str,
1420    ) -> Result<Option<DeploymentMetadata>, ConnectionError> {
1421        get_deployment_metadata(self.client, deploy_id).await
1422    }
1423
1424    /// Validate that a staging deployment exists and has not been promoted.
1425    pub async fn validate_staging(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1426        let metadata = self.get_deployment_metadata(deploy_id).await?;
1427        match metadata {
1428            Some(meta) if meta.promoted_at.is_some() => {
1429                Err(ConnectionError::DeploymentAlreadyPromoted {
1430                    deploy_id: deploy_id.to_string(),
1431                })
1432            }
1433            Some(_) => Ok(()),
1434            None => Err(ConnectionError::DeploymentNotFound {
1435                deploy_id: deploy_id.to_string(),
1436            }),
1437        }
1438    }
1439
1440    pub async fn get_deployment_details(
1441        &self,
1442        deploy_id: &str,
1443    ) -> Result<Option<DeploymentDetails>, ConnectionError> {
1444        get_deployment_details(self.client, deploy_id).await
1445    }
1446
1447    pub async fn list_staging_deployments(
1448        &self,
1449    ) -> Result<BTreeMap<String, StagingDeployment>, ConnectionError> {
1450        list_staging_deployments(self.client).await
1451    }
1452
1453    pub async fn list_deployment_history(
1454        &self,
1455        limit: Option<usize>,
1456    ) -> Result<Vec<DeploymentHistoryEntry>, ConnectionError> {
1457        list_deployment_history(self.client, limit).await
1458    }
1459
1460    pub async fn check_deployment_conflicts(
1461        &self,
1462        deploy_id: &str,
1463    ) -> Result<Vec<ConflictRecord>, ConnectionError> {
1464        check_deployment_conflicts(self.client, deploy_id).await
1465    }
1466
1467    pub async fn deployment_table_exists(&self) -> Result<bool, ConnectionError> {
1468        deployment_table_exists(self.client).await
1469    }
1470
1471    pub async fn create_apply_state_schemas(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1472        create_apply_state_schemas(self.client, deploy_id).await
1473    }
1474
1475    pub async fn get_apply_state(&self, deploy_id: &str) -> Result<ApplyState, ConnectionError> {
1476        get_apply_state(self.client, deploy_id).await
1477    }
1478
1479    pub async fn delete_apply_state_schemas(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1480        delete_apply_state_schemas(self.client, deploy_id).await
1481    }
1482
1483    pub async fn insert_pending_statements(
1484        &self,
1485        statements: &[PendingStatement],
1486    ) -> Result<(), ConnectionError> {
1487        insert_pending_statements(self.client, statements).await
1488    }
1489
1490    pub async fn get_pending_statements(
1491        &self,
1492        deploy_id: &str,
1493    ) -> Result<Vec<PendingStatement>, ConnectionError> {
1494        get_pending_statements(self.client, deploy_id).await
1495    }
1496
1497    pub async fn mark_statement_executed(
1498        &self,
1499        deploy_id: &str,
1500        sequence_num: i32,
1501    ) -> Result<(), ConnectionError> {
1502        mark_statement_executed(self.client, deploy_id, sequence_num).await
1503    }
1504
1505    pub async fn delete_pending_statements(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1506        delete_pending_statements(self.client, deploy_id).await
1507    }
1508
1509    pub async fn insert_replacement_mvs(
1510        &self,
1511        records: &[super::models::ReplacementMvRecord],
1512    ) -> Result<(), ConnectionError> {
1513        insert_replacement_mvs(self.client, records).await
1514    }
1515
1516    pub async fn get_replacement_mvs(
1517        &self,
1518        deploy_id: &str,
1519    ) -> Result<Vec<super::models::ReplacementMvRecord>, ConnectionError> {
1520        get_replacement_mvs(self.client, deploy_id).await
1521    }
1522
1523    pub async fn delete_replacement_mvs(&self, deploy_id: &str) -> Result<(), ConnectionError> {
1524        delete_replacement_mvs(self.client, deploy_id).await
1525    }
1526}
1527
1528impl DeploymentsClientMut<'_> {
1529    /// Subscribe to hydration status changes for a staging deployment.
1530    pub fn subscribe_deployment_hydration(
1531        &mut self,
1532        deploy_id: &str,
1533        allowed_lag_secs: i64,
1534    ) -> impl Stream<Item = Result<HydrationStatusUpdate, ConnectionError>> + '_ {
1535        let deploy_id = deploy_id.to_string();
1536
1537        try_stream! {
1538                let txn = self.client.begin_transaction().await?;
1539                let pattern = staging_suffix_like_pattern(&deploy_id);
1540                let query = hydration_status_query(allowed_lag_secs);
1541                let subscribe_sql = format!(
1542                    "DECLARE c CURSOR FOR SUBSCRIBE ({query})"
1543                );
1544
1545                let subscribe_sql = Sql::raw_unchecked(subscribe_sql);
1546                mz_postgres_util::execute(&txn, subscribe_sql, &[&pattern]).await?;
1547
1548                loop {
1549                    let rows = mz_postgres_util::query(&txn, Sql::new("FETCH ALL c"), &[]).await?;
1550                    if rows.is_empty() {
1551                        continue;
1552                    }
1553
1554                    for row in rows {
1555                        let mz_diff: i64 = row.get(1);
1556                        if mz_diff == -1 {
1557                            continue;
1558                        }
1559
1560                        let status_str: String = row.get(4);
1561                        let failure_reason_str: Option<String> = row.get(5);
1562                        let hydrated_count: i64 = row.get(6);
1563                        let total_count: i64 = row.get(7);
1564                        let max_lag_secs: i64 = row.get(8);
1565                        let total_replicas: i64 = row.get(9);
1566                        let problematic_replicas: i64 = row.get(10);
1567
1568                        let failure_reason = failure_reason_str.as_deref().map(|s| match s {
1569                            "no_replicas" => FailureReason::NoReplicas,
1570                            "all_replicas_problematic" => FailureReason::AllReplicasProblematic {
1571                                problematic: problematic_replicas,
1572                                total: total_replicas,
1573                            },
1574                            _ => FailureReason::NoReplicas,
1575                        });
1576
1577                        let status = match status_str.as_str() {
1578                            "ready" => ClusterDeploymentStatus::Ready,
1579                            "hydrating" => ClusterDeploymentStatus::Hydrating {
1580                                hydrated: hydrated_count,
1581                                total: total_count,
1582                            },
1583                            "lagging" => ClusterDeploymentStatus::Lagging { max_lag_secs },
1584                            "failing" => ClusterDeploymentStatus::Failing {
1585                                reason: failure_reason.clone().unwrap_or(FailureReason::NoReplicas),
1586                            },
1587                            _ => ClusterDeploymentStatus::Ready,
1588                        };
1589
1590                        yield HydrationStatusUpdate {
1591                            cluster_name: row.get(2),
1592                            cluster_id: row.get(3),
1593                            status,
1594                            failure_reason,
1595                            hydrated_count,
1596                            total_count,
1597                            max_lag_secs,
1598                            total_replicas,
1599                            problematic_replicas,
1600                        };
1601                }
1602            }
1603        }
1604    }
1605}
1606
1607/// Delete all replacement MV records for a deployment.
1608pub(super) async fn delete_replacement_mvs(
1609    client: &Client,
1610    deploy_id: &str,
1611) -> Result<(), ConnectionError> {
1612    client
1613        .execute(
1614            "DELETE FROM _mz_deploy.tables.replacement_mvs WHERE deploy_id = $1",
1615            &[&deploy_id],
1616        )
1617        .await?;
1618
1619    Ok(())
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624    use super::hydration_status_query;
1625
1626    #[mz_ore::test]
1627    fn test_hydration_status_query_escapes_like_patterns() {
1628        // Regression test for QA round-2 Finding 1.
1629        //
1630        // The hydration-status query filters clusters with `c.name LIKE $1`,
1631        // where $1 is the `_<deploy_id>` staging suffix pattern. Every such
1632        // predicate must carry `ESCAPE '\'` so the `_` separator (and any `_`/`%`
1633        // in the deploy id) is matched literally rather than as a wildcard —
1634        // otherwise `wait`/`list` sweep in unrelated clusters ending in
1635        // `<any char><deploy_id>` (e.g. `dataprod` for deploy id `prod`).
1636        let query = hydration_status_query(60);
1637
1638        // There are three cluster-name LIKE predicates (cluster_health,
1639        // hydration_counts, cluster_lag), and every one must be escaped.
1640        assert_eq!(
1641            query.matches("LIKE $1").count(),
1642            3,
1643            "expected exactly three `LIKE $1` predicates, query: {}",
1644            query
1645        );
1646        assert_eq!(
1647            query.matches(r"LIKE $1 ESCAPE '\'").count(),
1648            3,
1649            "every `LIKE $1` predicate must use `ESCAPE '\\'`, query: {}",
1650            query
1651        );
1652    }
1653}