Skip to main content

mz_deploy/cli/commands/
list.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployments command - list active staging deployments.
11
12use crate::cli::CliError;
13use crate::client::{Client, ClusterDeploymentStatus, ClusterStatusContext, DeploymentKind};
14use crate::config::Settings;
15use crate::log;
16use crate::project::SchemaQualifier;
17use chrono::{DateTime, Utc};
18use owo_colors::{OwoColorize, Stream, Style};
19use std::fmt;
20
21#[derive(serde::Serialize)]
22#[serde(transparent)]
23struct ListOutput {
24    deployments: Vec<ListDeployment>,
25}
26
27#[derive(serde::Serialize)]
28struct ListDeployment {
29    deploy_id: String,
30    deployed_at: DateTime<Utc>,
31    deployed_by: String,
32    git_commit: Option<String>,
33    kind: DeploymentKind,
34    schemas: Vec<SchemaQualifier>,
35    clusters: Vec<ClusterStatusContext>,
36}
37
38impl fmt::Display for ListOutput {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        if self.deployments.is_empty() {
41            writeln!(f, "No active staging deployments.")?;
42            writeln!(f)?;
43            writeln!(f, "To create a staging deployment, run:")?;
44            writeln!(
45                f,
46                "  {} {} {}",
47                "mz-deploy".if_supports_color(Stream::Stderr, |t| t.cyan()),
48                "stage".if_supports_color(Stream::Stderr, |t| t.cyan()),
49                ".".if_supports_color(Stream::Stderr, |t| t.cyan())
50            )?;
51            return Ok(());
52        }
53
54        writeln!(f, "Active staging deployments:")?;
55        writeln!(f)?;
56
57        for deployment in &self.deployments {
58            // Format timestamp
59            let now = Utc::now();
60            let duration = now.signed_duration_since(deployment.deployed_at);
61            let timestamp = if duration.num_seconds() < 0 {
62                "recently".to_string()
63            } else {
64                let hours = duration.num_hours();
65                if hours < 1 {
66                    let minutes = duration.num_minutes();
67                    format!("{} minutes ago", minutes)
68                } else if hours < 24 {
69                    format!("{} hours ago", hours)
70                } else {
71                    let days = hours / 24;
72                    format!("{} days ago", days)
73                }
74            };
75
76            let deploy_id_style = Style::new().cyan().bold();
77            writeln!(
78                f,
79                "  {} {} by {} {} [{}]",
80                "●".if_supports_color(Stream::Stderr, |t| t.green()),
81                deployment
82                    .deploy_id
83                    .if_supports_color(Stream::Stderr, |t| deploy_id_style.style(t)),
84                deployment
85                    .deployed_by
86                    .if_supports_color(Stream::Stderr, |t| t.yellow()),
87                format!("({})", timestamp).if_supports_color(Stream::Stderr, |t| t.dimmed()),
88                deployment
89                    .kind
90                    .to_string()
91                    .if_supports_color(Stream::Stderr, |t| t.dimmed()),
92            )?;
93
94            // Display commit if available
95            if let Some(commit_sha) = &deployment.git_commit {
96                writeln!(
97                    f,
98                    "    commit: {}",
99                    commit_sha.if_supports_color(Stream::Stderr, |t| t.dimmed())
100                )?;
101            }
102
103            // Display cluster status
104            if !deployment.clusters.is_empty() {
105                let mut ready_count = 0i64;
106                #[allow(clippy::as_conversions)]
107                let total_clusters = deployment.clusters.len() as i64;
108
109                for ctx in &deployment.clusters {
110                    if matches!(ctx.status, ClusterDeploymentStatus::Ready) {
111                        ready_count += 1;
112                    }
113                }
114
115                let text = if ready_count == total_clusters {
116                    "clusters: all ready".to_string()
117                } else {
118                    format!("clusters: {} of {} ready", ready_count, total_clusters)
119                };
120                writeln!(
121                    f,
122                    "    {}\n",
123                    text.if_supports_color(Stream::Stderr, |t| t.blue())
124                )?;
125            }
126
127            for sq in &deployment.schemas {
128                writeln!(
129                    f,
130                    "    {}.{}",
131                    sq.database
132                        .if_supports_color(Stream::Stderr, |t| t.dimmed()),
133                    sq.schema
134                )?;
135            }
136            writeln!(f)?;
137        }
138
139        Ok(())
140    }
141}
142
143/// List all active staging deployments.
144///
145/// This command:
146/// - Queries all deployments where promoted_at IS NULL (staging only)
147/// - Groups results by environment name
148/// - Displays schemas in each staging environment with deployment metadata
149///
150/// Similar to `git branch` - shows active development branches.
151///
152/// # Arguments
153/// * `settings` - Resolved CLI settings (profile, project directory, etc.)
154/// * `allowed_lag_secs` - Maximum allowed lag in seconds before marking as "lagging"
155///
156/// # Returns
157/// Ok(()) if listing succeeds
158///
159/// # Errors
160/// Returns `CliError::Connection` for database errors
161pub async fn run(settings: &Settings, allowed_lag_secs: i64) -> Result<(), CliError> {
162    let profile = settings.connection();
163    let client = Client::connect_with_profile(profile.clone())
164        .await
165        .map_err(CliError::Connection)?;
166
167    super::setup::verify(&client, settings.emulator()).await?;
168    super::setup::validate_connection(&client, settings.emulator()).await?;
169    let deployments = client.deployments().list_staging_deployments().await?;
170
171    let mut env_names: Vec<_> = deployments.keys().collect();
172    env_names.sort();
173
174    let mut list_deployments = Vec::new();
175    for env_name in env_names {
176        let deployment = &deployments[env_name];
177        let clusters = client
178            .deployments()
179            .get_deployment_hydration_status_with_lag(env_name, allowed_lag_secs)
180            .await
181            .unwrap_or_default();
182        list_deployments.push(ListDeployment {
183            deploy_id: env_name.clone(),
184            deployed_at: deployment.deployed_at,
185            deployed_by: deployment.deployed_by.clone(),
186            git_commit: deployment.git_commit.clone(),
187            kind: deployment.kind.clone(),
188            schemas: deployment.schemas.clone(),
189            clusters,
190        });
191    }
192
193    let output = ListOutput {
194        deployments: list_deployments,
195    };
196    log::output(&output);
197
198    Ok(())
199}