Skip to main content

mz_deploy/cli/commands/
wait.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Ready command - wait for staging deployment cluster hydration.
11
12use crate::cli::CliError;
13use crate::client::{
14    Client, ClusterDeploymentStatus, ClusterStatusContext, FailureReason, HydrationStatusUpdate,
15};
16use crate::config::Settings;
17use crate::log;
18use crate::{info, info_nonl};
19use crossterm::{
20    cursor::{Hide, MoveToColumn, MoveUp, Show},
21    execute,
22    terminal::{Clear, ClearType},
23};
24use futures::StreamExt;
25use owo_colors::{OwoColorize, Stream, Style};
26use std::collections::BTreeMap;
27use std::io::{self, Write};
28use std::pin::pin;
29use std::time::{Duration, Instant};
30
31/// Wait for a staging deployment to become ready by monitoring hydration status.
32///
33/// This command:
34/// - Validates the staging deployment exists and hasn't been promoted
35/// - Subscribes to cluster hydration status
36/// - Shows a live dashboard tracking hydration, lag, and health for each cluster
37/// - Exits when all clusters are ready or timeout is reached
38///
39/// # Arguments
40/// * `settings` - Resolved CLI settings (profile, project directory, etc.)
41/// * `deploy_id` - Staging deployment ID
42/// * `once` - If true, check once and exit; if false, track continuously
43/// * `timeout` - Optional timeout in seconds
44/// * `allowed_lag_secs` - Maximum allowed lag in seconds before marking as "lagging"
45///
46/// # Returns
47/// `Ok(())` once the deployment is ready.
48///
49/// # Errors
50/// Surfaces connection errors (including unknown or already-promoted
51/// deployments) and `CliError::ReadyTimeout` when the timeout is reached.
52pub async fn run(
53    settings: &Settings,
54    deploy_id: &str,
55    once: bool,
56    timeout: Option<u64>,
57    allowed_lag_secs: i64,
58) -> Result<(), CliError> {
59    let profile = settings.connection();
60    // Connect to database
61    let mut client = Client::connect_with_profile(profile.clone())
62        .await
63        .map_err(CliError::Connection)?;
64    // Validate staging deployment exists and is not promoted
65    client.deployments().validate_staging(deploy_id).await?;
66
67    if log::json_output_enabled() {
68        if once {
69            return run_snapshot_json(deploy_id, &client, allowed_lag_secs).await;
70        } else {
71            return run_continuous_json(deploy_id, &mut client, timeout, allowed_lag_secs).await;
72        }
73    }
74
75    if once {
76        // Snapshot mode: query once and display status
77        run_snapshot(deploy_id, &client, allowed_lag_secs).await
78    } else {
79        // Continuous mode: subscribe and track with live dashboard
80        run_continuous(deploy_id, &mut client, timeout, allowed_lag_secs).await
81    }
82}
83
84/// Run in snapshot mode: query hydration status once and display.
85async fn run_snapshot(
86    deploy_id: &str,
87    client: &Client,
88    allowed_lag_secs: i64,
89) -> Result<(), CliError> {
90    let statuses = client
91        .deployments()
92        .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
93        .await?;
94
95    if statuses.is_empty() {
96        info!("No clusters found in staging deployment '{}'", deploy_id);
97        return Ok(());
98    }
99
100    info!();
101    let style = Style::new().cyan().bold();
102    info!(
103        "{}",
104        format!("  Deployment: {}", deploy_id)
105            .if_supports_color(Stream::Stderr, |t| style.style(t))
106    );
107    info!();
108
109    let mut has_failures = false;
110    let mut has_non_ready = false;
111
112    for ctx in &statuses {
113        print_cluster_status(ctx, allowed_lag_secs);
114
115        match &ctx.status {
116            ClusterDeploymentStatus::Failing { .. } => has_failures = true,
117            ClusterDeploymentStatus::Ready => {}
118            _ => has_non_ready = true,
119        }
120    }
121
122    info!();
123    print_summary(&statuses);
124    info!();
125
126    if has_failures {
127        Err(CliError::DeploymentFailing {
128            name: deploy_id.to_string(),
129        })
130    } else if has_non_ready {
131        Err(CliError::ClustersHydrating)
132    } else {
133        let style = Style::new().green().bold();
134        info!(
135            "{}",
136            "  All clusters are ready!".if_supports_color(Stream::Stderr, |t| style.style(t))
137        );
138        Ok(())
139    }
140}
141
142/// Run in snapshot mode with JSON output.
143async fn run_snapshot_json(
144    deploy_id: &str,
145    client: &Client,
146    allowed_lag_secs: i64,
147) -> Result<(), CliError> {
148    let statuses = client
149        .deployments()
150        .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
151        .await?;
152
153    let all_ready = statuses
154        .iter()
155        .all(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Ready));
156
157    let json = serde_json::json!({
158        "deploy_id": deploy_id,
159        "ready": all_ready,
160        "clusters": statuses,
161    });
162    log::output_json(&json);
163
164    if statuses
165        .iter()
166        .any(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Failing { .. }))
167    {
168        Err(CliError::DeploymentFailing {
169            name: deploy_id.to_string(),
170        })
171    } else if !all_ready {
172        Err(CliError::ClustersHydrating)
173    } else {
174        Ok(())
175    }
176}
177
178/// Run in continuous mode with NDJSON output.
179async fn run_continuous_json(
180    deploy_id: &str,
181    client: &mut Client,
182    timeout: Option<u64>,
183    allowed_lag_secs: i64,
184) -> Result<(), CliError> {
185    let monitor_future = monitor_hydration_ndjson(deploy_id, client, allowed_lag_secs);
186
187    if let Some(secs) = timeout {
188        match tokio::time::timeout(Duration::from_secs(secs), monitor_future).await {
189            Ok(result) => result,
190            Err(_) => Err(CliError::ReadyTimeout {
191                name: deploy_id.to_string(),
192                seconds: secs,
193            }),
194        }
195    } else {
196        monitor_future.await
197    }
198}
199
200/// Monitor hydration via SUBSCRIBE, emitting one NDJSON line per update.
201async fn monitor_hydration_ndjson(
202    deploy_id: &str,
203    client: &mut Client,
204    allowed_lag_secs: i64,
205) -> Result<(), CliError> {
206    let initial_statuses = client
207        .deployments()
208        .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
209        .await?;
210
211    if initial_statuses.is_empty() {
212        let json = serde_json::json!({"deploy_id": deploy_id, "clusters": [], "all_ready": true});
213        log::output_json(&json);
214        return Ok(());
215    }
216
217    let mut cluster_states: BTreeMap<String, ClusterStatusContext> = initial_statuses
218        .into_iter()
219        .map(|ctx| (ctx.cluster_name.clone(), ctx))
220        .collect();
221
222    let mut deployments = client.deployments_mut();
223    let stream = deployments.subscribe_deployment_hydration(deploy_id, allowed_lag_secs);
224    let mut stream = pin!(stream);
225
226    while let Some(result) = stream.next().await {
227        let update = result.map_err(CliError::Connection)?;
228        let status = update_to_status(&update);
229
230        cluster_states.insert(
231            update.cluster_name.clone(),
232            ClusterStatusContext {
233                cluster_name: update.cluster_name,
234                cluster_id: update.cluster_id,
235                status,
236                hydrated_count: update.hydrated_count,
237                total_count: update.total_count,
238                max_lag_secs: update.max_lag_secs,
239                total_replicas: update.total_replicas,
240                problematic_replicas: update.problematic_replicas,
241            },
242        );
243
244        let all_ready = cluster_states
245            .values()
246            .all(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Ready));
247
248        // Emit one NDJSON line per update
249        let statuses: Vec<_> = cluster_states.values().collect();
250        let json = serde_json::json!({
251            "deploy_id": deploy_id,
252            "all_ready": all_ready,
253            "clusters": statuses,
254        });
255        log::output_json(&json);
256
257        if all_ready {
258            return Ok(());
259        }
260    }
261
262    Ok(())
263}
264
265/// Print status for a single cluster with visual formatting.
266fn print_cluster_status(ctx: &ClusterStatusContext, allowed_lag_secs: i64) {
267    // Cluster name header
268    info!(
269        "  {}",
270        ctx.cluster_name
271            .as_str()
272            .if_supports_color(Stream::Stderr, |t| t.bold())
273    );
274
275    // Progress bar
276    let bar = render_progress_bar(ctx.hydrated_count, ctx.total_count, 40);
277    let progress_str = format!("{}/{} objects", ctx.hydrated_count, ctx.total_count);
278
279    info_nonl!("  {} ", bar);
280    match &ctx.status {
281        ClusterDeploymentStatus::Ready => {
282            info_nonl!(
283                "{} {}",
284                "✓".if_supports_color(Stream::Stderr, |t| t.green()),
285                "ready".if_supports_color(Stream::Stderr, |t| t.green())
286            )
287        }
288        ClusterDeploymentStatus::Hydrating { .. } => {
289            info_nonl!(
290                "{} {}",
291                "◐".if_supports_color(Stream::Stderr, |t| t.yellow()),
292                "hydrating".if_supports_color(Stream::Stderr, |t| t.yellow())
293            )
294        }
295        ClusterDeploymentStatus::Lagging { .. } => {
296            info_nonl!(
297                "{} {}",
298                "⚠".if_supports_color(Stream::Stderr, |t| t.yellow()),
299                "lagging".if_supports_color(Stream::Stderr, |t| t.yellow())
300            )
301        }
302        ClusterDeploymentStatus::Failing { .. } => {
303            info_nonl!(
304                "{} {}",
305                "✗".if_supports_color(Stream::Stderr, |t| t.red()),
306                "failing".if_supports_color(Stream::Stderr, |t| t.red())
307            )
308        }
309    }
310    info!(
311        "  {}",
312        progress_str.if_supports_color(Stream::Stderr, |t| t.dimmed())
313    );
314
315    // Additional context based on status
316    match &ctx.status {
317        ClusterDeploymentStatus::Ready => {
318            info!(
319                "  {} lag: {}s",
320                "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
321                ctx.max_lag_secs
322                    .to_string()
323                    .if_supports_color(Stream::Stderr, |t| t.green())
324            );
325        }
326        ClusterDeploymentStatus::Hydrating { hydrated, total } => {
327            #[allow(clippy::as_conversions)]
328            let pct = if *total > 0 {
329                (*hydrated as f64 / *total as f64 * 100.0) as u8
330            } else {
331                0
332            };
333            info!(
334                "  {} {}% complete",
335                "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
336                pct.to_string()
337                    .if_supports_color(Stream::Stderr, |t| t.yellow())
338            );
339        }
340        ClusterDeploymentStatus::Lagging { max_lag_secs } => {
341            let style = Style::new().yellow().bold();
342            info!(
343                "  {} lag: {}s (threshold: {}s)",
344                "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
345                max_lag_secs
346                    .to_string()
347                    .if_supports_color(Stream::Stderr, |t| style.style(t)),
348                allowed_lag_secs
349            );
350        }
351        ClusterDeploymentStatus::Failing { reason } => {
352            info!(
353                "  {} {}",
354                "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
355                reason
356                    .to_string()
357                    .if_supports_color(Stream::Stderr, |t| t.red())
358            );
359        }
360    }
361    info!();
362}
363
364/// Render a Unicode progress bar.
365#[allow(clippy::as_conversions)]
366fn render_progress_bar(current: i64, total: i64, width: usize) -> String {
367    if total == 0 {
368        return format!(
369            "[{}]",
370            "░"
371                .repeat(width)
372                .if_supports_color(Stream::Stderr, |t| t.dimmed())
373        );
374    }
375
376    let filled = ((current as f64 / total as f64) * width as f64) as usize;
377    let empty = width.saturating_sub(filled);
378
379    format!(
380        "[{}{}]",
381        "█"
382            .repeat(filled)
383            .if_supports_color(Stream::Stderr, |t| t.cyan()),
384        "░"
385            .repeat(empty)
386            .if_supports_color(Stream::Stderr, |t| t.dimmed())
387    )
388}
389
390/// Print summary footer with counts.
391fn print_summary(statuses: &[ClusterStatusContext]) {
392    let mut ready = 0;
393    let mut hydrating = 0;
394    let mut lagging = 0;
395    let mut failing = 0;
396
397    for ctx in statuses {
398        match ctx.status {
399            ClusterDeploymentStatus::Ready => ready += 1,
400            ClusterDeploymentStatus::Hydrating { .. } => hydrating += 1,
401            ClusterDeploymentStatus::Lagging { .. } => lagging += 1,
402            ClusterDeploymentStatus::Failing { .. } => failing += 1,
403        }
404    }
405
406    info_nonl!("  ");
407    let mut parts = Vec::new();
408    if ready > 0 {
409        parts.push(
410            format!("{} ready", ready)
411                .if_supports_color(Stream::Stderr, |t| t.green())
412                .to_string(),
413        );
414    }
415    if hydrating > 0 {
416        parts.push(
417            format!("{} hydrating", hydrating)
418                .if_supports_color(Stream::Stderr, |t| t.yellow())
419                .to_string(),
420        );
421    }
422    if lagging > 0 {
423        parts.push(
424            format!("{} lagging", lagging)
425                .if_supports_color(Stream::Stderr, |t| t.yellow())
426                .to_string(),
427        );
428    }
429    if failing > 0 {
430        parts.push(
431            format!("{} failing", failing)
432                .if_supports_color(Stream::Stderr, |t| t.red())
433                .to_string(),
434        );
435    }
436    info!("{}", parts.join(" · "));
437}
438
439/// Run in continuous mode: subscribe to hydration updates and show live dashboard.
440async fn run_continuous(
441    deploy_id: &str,
442    client: &mut Client,
443    timeout: Option<u64>,
444    allowed_lag_secs: i64,
445) -> Result<(), CliError> {
446    // Get initial hydration status
447    let initial_statuses = client
448        .deployments()
449        .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
450        .await?;
451
452    if initial_statuses.is_empty() {
453        info!("No clusters found in staging deployment '{}'", deploy_id);
454        return Ok(());
455    }
456
457    let start_time = Instant::now();
458
459    // Subscribe to hydration updates and monitor
460    let monitor_future = monitor_hydration_live(
461        deploy_id,
462        client,
463        initial_statuses,
464        start_time,
465        allowed_lag_secs,
466    );
467
468    if let Some(secs) = timeout {
469        match tokio::time::timeout(Duration::from_secs(secs), monitor_future).await {
470            Ok(result) => result,
471            Err(_) => Err(CliError::ReadyTimeout {
472                name: deploy_id.to_string(),
473                seconds: secs,
474            }),
475        }
476    } else {
477        monitor_future.await
478    }
479}
480
481/// Monitor hydration status via SUBSCRIBE and update live dashboard.
482async fn monitor_hydration_live(
483    deploy_id: &str,
484    client: &mut Client,
485    initial_statuses: Vec<ClusterStatusContext>,
486    start_time: Instant,
487    allowed_lag_secs: i64,
488) -> Result<(), CliError> {
489    let mut stdout = io::stdout();
490    let num_clusters = initial_statuses.len();
491
492    // Build initial state map
493    let mut cluster_states: BTreeMap<String, ClusterStatusContext> = initial_statuses
494        .into_iter()
495        .map(|ctx| (ctx.cluster_name.clone(), ctx))
496        .collect();
497
498    // Calculate lines per render (header + per-cluster lines + summary)
499    // Header: 3 lines, per cluster: 4 lines, summary: 2 lines
500    let lines_per_render = 3 + (num_clusters * 4) + 2;
501
502    // Initial render
503    render_dashboard(
504        &mut stdout,
505        deploy_id,
506        &cluster_states,
507        start_time,
508        allowed_lag_secs,
509    )?;
510
511    let mut deployments = client.deployments_mut();
512    let stream = deployments.subscribe_deployment_hydration(deploy_id, allowed_lag_secs);
513    let mut stream = pin!(stream);
514
515    // Hide cursor during updates
516    let _ = execute!(stdout, Hide);
517
518    while let Some(result) = stream.next().await {
519        let update = result.map_err(CliError::Connection)?;
520
521        let status = update_to_status(&update);
522
523        cluster_states.insert(
524            update.cluster_name.clone(),
525            ClusterStatusContext {
526                cluster_name: update.cluster_name,
527                cluster_id: update.cluster_id,
528                status,
529                hydrated_count: update.hydrated_count,
530                total_count: update.total_count,
531                max_lag_secs: update.max_lag_secs,
532                total_replicas: update.total_replicas,
533                problematic_replicas: update.problematic_replicas,
534            },
535        );
536
537        // Move cursor up and clear, then re-render
538        #[allow(clippy::as_conversions)]
539        let lines = lines_per_render as u16;
540        let _ = execute!(stdout, MoveUp(lines), MoveToColumn(0));
541        for _ in 0..lines_per_render {
542            let _ = execute!(stdout, Clear(ClearType::CurrentLine));
543            info!();
544        }
545        let _ = execute!(stdout, MoveUp(lines), MoveToColumn(0));
546
547        render_dashboard(
548            &mut stdout,
549            deploy_id,
550            &cluster_states,
551            start_time,
552            allowed_lag_secs,
553        )?;
554
555        let all_ready = cluster_states
556            .values()
557            .all(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Ready));
558
559        if all_ready {
560            let _ = execute!(stdout, Show);
561            info!();
562            let style = Style::new().green().bold();
563            info!(
564                "{}",
565                "  All clusters are ready!".if_supports_color(Stream::Stderr, |t| style.style(t))
566            );
567            return Ok(());
568        }
569    }
570
571    let _ = execute!(stdout, Show);
572    Ok(())
573}
574
575/// Convert a HydrationStatusUpdate to a ClusterDeploymentStatus.
576fn update_to_status(update: &HydrationStatusUpdate) -> ClusterDeploymentStatus {
577    match update.status {
578        ClusterDeploymentStatus::Ready => ClusterDeploymentStatus::Ready,
579        ClusterDeploymentStatus::Hydrating { .. } => ClusterDeploymentStatus::Hydrating {
580            hydrated: update.hydrated_count,
581            total: update.total_count,
582        },
583        ClusterDeploymentStatus::Lagging { .. } => ClusterDeploymentStatus::Lagging {
584            max_lag_secs: update.max_lag_secs,
585        },
586        ClusterDeploymentStatus::Failing { .. } => {
587            let reason = match update.failure_reason {
588                Some(FailureReason::NoReplicas) => FailureReason::NoReplicas,
589                Some(FailureReason::AllReplicasProblematic { .. }) => {
590                    FailureReason::AllReplicasProblematic {
591                        problematic: update.problematic_replicas,
592                        total: update.total_replicas,
593                    }
594                }
595                None => FailureReason::NoReplicas,
596            };
597            ClusterDeploymentStatus::Failing { reason }
598        }
599    }
600}
601
602/// Render the live dashboard.
603fn render_dashboard(
604    stdout: &mut io::Stdout,
605    deploy_id: &str,
606    cluster_states: &BTreeMap<String, ClusterStatusContext>,
607    start_time: Instant,
608    allowed_lag_secs: i64,
609) -> Result<(), CliError> {
610    let elapsed = start_time.elapsed();
611    let elapsed_str = format_duration(elapsed);
612
613    // Header
614    info!();
615    let header_style = Style::new().cyan().bold();
616    info!(
617        "{}",
618        format!("  mz-deploy wait · deployment: {}", deploy_id)
619            .if_supports_color(Stream::Stderr, |t| header_style.style(t))
620    );
621    info!(
622        "  {} {}",
623        "elapsed:".if_supports_color(Stream::Stderr, |t| t.dimmed()),
624        elapsed_str.if_supports_color(Stream::Stderr, |t| t.dimmed())
625    );
626    info!();
627
628    // Sort clusters by name for consistent ordering
629    let mut cluster_names: Vec<_> = cluster_states.keys().collect();
630    cluster_names.sort();
631
632    // Render each cluster
633    for name in cluster_names {
634        if let Some(ctx) = cluster_states.get(name) {
635            print_cluster_status(ctx, allowed_lag_secs);
636        }
637    }
638
639    // Summary
640    let statuses: Vec<_> = cluster_states.values().cloned().collect();
641    print_summary(&statuses);
642
643    let _ = stdout.flush();
644    Ok(())
645}
646
647/// Format a duration as human-readable string.
648fn format_duration(duration: Duration) -> String {
649    let secs = duration.as_secs();
650    if secs < 60 {
651        format!("{}s", secs)
652    } else if secs < 3600 {
653        let mins = secs / 60;
654        let secs = secs % 60;
655        format!("{}m {}s", mins, secs)
656    } else {
657        let hours = secs / 3600;
658        let mins = (secs % 3600) / 60;
659        format!("{}h {}m", hours, mins)
660    }
661}