1use crate::cli::CliError;
13use crate::client::{
14 Client, ClusterDeploymentStatus, ClusterStatusContext, FailureReason, HydrationStatusUpdate,
15};
16use crate::config::Settings;
17use crate::log;
18use crate::{info, info_nonl};
19use crossterm::{
20 cursor::{Hide, MoveToColumn, MoveUp, Show},
21 execute,
22 terminal::{Clear, ClearType},
23};
24use futures::StreamExt;
25use owo_colors::{OwoColorize, Stream, Style};
26use std::collections::BTreeMap;
27use std::io::{self, Write};
28use std::pin::pin;
29use std::time::{Duration, Instant};
30
31pub async fn run(
53 settings: &Settings,
54 deploy_id: &str,
55 once: bool,
56 timeout: Option<u64>,
57 allowed_lag_secs: i64,
58) -> Result<(), CliError> {
59 let profile = settings.connection();
60 let mut client = Client::connect_with_profile(profile.clone())
62 .await
63 .map_err(CliError::Connection)?;
64 client.deployments().validate_staging(deploy_id).await?;
66
67 if log::json_output_enabled() {
68 if once {
69 return run_snapshot_json(deploy_id, &client, allowed_lag_secs).await;
70 } else {
71 return run_continuous_json(deploy_id, &mut client, timeout, allowed_lag_secs).await;
72 }
73 }
74
75 if once {
76 run_snapshot(deploy_id, &client, allowed_lag_secs).await
78 } else {
79 run_continuous(deploy_id, &mut client, timeout, allowed_lag_secs).await
81 }
82}
83
84async fn run_snapshot(
86 deploy_id: &str,
87 client: &Client,
88 allowed_lag_secs: i64,
89) -> Result<(), CliError> {
90 let statuses = client
91 .deployments()
92 .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
93 .await?;
94
95 if statuses.is_empty() {
96 info!("No clusters found in staging deployment '{}'", deploy_id);
97 return Ok(());
98 }
99
100 info!();
101 let style = Style::new().cyan().bold();
102 info!(
103 "{}",
104 format!(" Deployment: {}", deploy_id)
105 .if_supports_color(Stream::Stderr, |t| style.style(t))
106 );
107 info!();
108
109 let mut has_failures = false;
110 let mut has_non_ready = false;
111
112 for ctx in &statuses {
113 print_cluster_status(ctx, allowed_lag_secs);
114
115 match &ctx.status {
116 ClusterDeploymentStatus::Failing { .. } => has_failures = true,
117 ClusterDeploymentStatus::Ready => {}
118 _ => has_non_ready = true,
119 }
120 }
121
122 info!();
123 print_summary(&statuses);
124 info!();
125
126 if has_failures {
127 Err(CliError::DeploymentFailing {
128 name: deploy_id.to_string(),
129 })
130 } else if has_non_ready {
131 Err(CliError::ClustersHydrating)
132 } else {
133 let style = Style::new().green().bold();
134 info!(
135 "{}",
136 " All clusters are ready!".if_supports_color(Stream::Stderr, |t| style.style(t))
137 );
138 Ok(())
139 }
140}
141
142async fn run_snapshot_json(
144 deploy_id: &str,
145 client: &Client,
146 allowed_lag_secs: i64,
147) -> Result<(), CliError> {
148 let statuses = client
149 .deployments()
150 .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
151 .await?;
152
153 let all_ready = statuses
154 .iter()
155 .all(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Ready));
156
157 let json = serde_json::json!({
158 "deploy_id": deploy_id,
159 "ready": all_ready,
160 "clusters": statuses,
161 });
162 log::output_json(&json);
163
164 if statuses
165 .iter()
166 .any(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Failing { .. }))
167 {
168 Err(CliError::DeploymentFailing {
169 name: deploy_id.to_string(),
170 })
171 } else if !all_ready {
172 Err(CliError::ClustersHydrating)
173 } else {
174 Ok(())
175 }
176}
177
178async fn run_continuous_json(
180 deploy_id: &str,
181 client: &mut Client,
182 timeout: Option<u64>,
183 allowed_lag_secs: i64,
184) -> Result<(), CliError> {
185 let monitor_future = monitor_hydration_ndjson(deploy_id, client, allowed_lag_secs);
186
187 if let Some(secs) = timeout {
188 match tokio::time::timeout(Duration::from_secs(secs), monitor_future).await {
189 Ok(result) => result,
190 Err(_) => Err(CliError::ReadyTimeout {
191 name: deploy_id.to_string(),
192 seconds: secs,
193 }),
194 }
195 } else {
196 monitor_future.await
197 }
198}
199
200async fn monitor_hydration_ndjson(
202 deploy_id: &str,
203 client: &mut Client,
204 allowed_lag_secs: i64,
205) -> Result<(), CliError> {
206 let initial_statuses = client
207 .deployments()
208 .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
209 .await?;
210
211 if initial_statuses.is_empty() {
212 let json = serde_json::json!({"deploy_id": deploy_id, "clusters": [], "all_ready": true});
213 log::output_json(&json);
214 return Ok(());
215 }
216
217 let mut cluster_states: BTreeMap<String, ClusterStatusContext> = initial_statuses
218 .into_iter()
219 .map(|ctx| (ctx.cluster_name.clone(), ctx))
220 .collect();
221
222 let mut deployments = client.deployments_mut();
223 let stream = deployments.subscribe_deployment_hydration(deploy_id, allowed_lag_secs);
224 let mut stream = pin!(stream);
225
226 while let Some(result) = stream.next().await {
227 let update = result.map_err(CliError::Connection)?;
228 let status = update_to_status(&update);
229
230 cluster_states.insert(
231 update.cluster_name.clone(),
232 ClusterStatusContext {
233 cluster_name: update.cluster_name,
234 cluster_id: update.cluster_id,
235 status,
236 hydrated_count: update.hydrated_count,
237 total_count: update.total_count,
238 max_lag_secs: update.max_lag_secs,
239 total_replicas: update.total_replicas,
240 problematic_replicas: update.problematic_replicas,
241 },
242 );
243
244 let all_ready = cluster_states
245 .values()
246 .all(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Ready));
247
248 let statuses: Vec<_> = cluster_states.values().collect();
250 let json = serde_json::json!({
251 "deploy_id": deploy_id,
252 "all_ready": all_ready,
253 "clusters": statuses,
254 });
255 log::output_json(&json);
256
257 if all_ready {
258 return Ok(());
259 }
260 }
261
262 Ok(())
263}
264
265fn print_cluster_status(ctx: &ClusterStatusContext, allowed_lag_secs: i64) {
267 info!(
269 " {}",
270 ctx.cluster_name
271 .as_str()
272 .if_supports_color(Stream::Stderr, |t| t.bold())
273 );
274
275 let bar = render_progress_bar(ctx.hydrated_count, ctx.total_count, 40);
277 let progress_str = format!("{}/{} objects", ctx.hydrated_count, ctx.total_count);
278
279 info_nonl!(" {} ", bar);
280 match &ctx.status {
281 ClusterDeploymentStatus::Ready => {
282 info_nonl!(
283 "{} {}",
284 "✓".if_supports_color(Stream::Stderr, |t| t.green()),
285 "ready".if_supports_color(Stream::Stderr, |t| t.green())
286 )
287 }
288 ClusterDeploymentStatus::Hydrating { .. } => {
289 info_nonl!(
290 "{} {}",
291 "◐".if_supports_color(Stream::Stderr, |t| t.yellow()),
292 "hydrating".if_supports_color(Stream::Stderr, |t| t.yellow())
293 )
294 }
295 ClusterDeploymentStatus::Lagging { .. } => {
296 info_nonl!(
297 "{} {}",
298 "⚠".if_supports_color(Stream::Stderr, |t| t.yellow()),
299 "lagging".if_supports_color(Stream::Stderr, |t| t.yellow())
300 )
301 }
302 ClusterDeploymentStatus::Failing { .. } => {
303 info_nonl!(
304 "{} {}",
305 "✗".if_supports_color(Stream::Stderr, |t| t.red()),
306 "failing".if_supports_color(Stream::Stderr, |t| t.red())
307 )
308 }
309 }
310 info!(
311 " {}",
312 progress_str.if_supports_color(Stream::Stderr, |t| t.dimmed())
313 );
314
315 match &ctx.status {
317 ClusterDeploymentStatus::Ready => {
318 info!(
319 " {} lag: {}s",
320 "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
321 ctx.max_lag_secs
322 .to_string()
323 .if_supports_color(Stream::Stderr, |t| t.green())
324 );
325 }
326 ClusterDeploymentStatus::Hydrating { hydrated, total } => {
327 #[allow(clippy::as_conversions)]
328 let pct = if *total > 0 {
329 (*hydrated as f64 / *total as f64 * 100.0) as u8
330 } else {
331 0
332 };
333 info!(
334 " {} {}% complete",
335 "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
336 pct.to_string()
337 .if_supports_color(Stream::Stderr, |t| t.yellow())
338 );
339 }
340 ClusterDeploymentStatus::Lagging { max_lag_secs } => {
341 let style = Style::new().yellow().bold();
342 info!(
343 " {} lag: {}s (threshold: {}s)",
344 "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
345 max_lag_secs
346 .to_string()
347 .if_supports_color(Stream::Stderr, |t| style.style(t)),
348 allowed_lag_secs
349 );
350 }
351 ClusterDeploymentStatus::Failing { reason } => {
352 info!(
353 " {} {}",
354 "└".if_supports_color(Stream::Stderr, |t| t.dimmed()),
355 reason
356 .to_string()
357 .if_supports_color(Stream::Stderr, |t| t.red())
358 );
359 }
360 }
361 info!();
362}
363
364#[allow(clippy::as_conversions)]
366fn render_progress_bar(current: i64, total: i64, width: usize) -> String {
367 if total == 0 {
368 return format!(
369 "[{}]",
370 "░"
371 .repeat(width)
372 .if_supports_color(Stream::Stderr, |t| t.dimmed())
373 );
374 }
375
376 let filled = ((current as f64 / total as f64) * width as f64) as usize;
377 let empty = width.saturating_sub(filled);
378
379 format!(
380 "[{}{}]",
381 "█"
382 .repeat(filled)
383 .if_supports_color(Stream::Stderr, |t| t.cyan()),
384 "░"
385 .repeat(empty)
386 .if_supports_color(Stream::Stderr, |t| t.dimmed())
387 )
388}
389
390fn print_summary(statuses: &[ClusterStatusContext]) {
392 let mut ready = 0;
393 let mut hydrating = 0;
394 let mut lagging = 0;
395 let mut failing = 0;
396
397 for ctx in statuses {
398 match ctx.status {
399 ClusterDeploymentStatus::Ready => ready += 1,
400 ClusterDeploymentStatus::Hydrating { .. } => hydrating += 1,
401 ClusterDeploymentStatus::Lagging { .. } => lagging += 1,
402 ClusterDeploymentStatus::Failing { .. } => failing += 1,
403 }
404 }
405
406 info_nonl!(" ");
407 let mut parts = Vec::new();
408 if ready > 0 {
409 parts.push(
410 format!("{} ready", ready)
411 .if_supports_color(Stream::Stderr, |t| t.green())
412 .to_string(),
413 );
414 }
415 if hydrating > 0 {
416 parts.push(
417 format!("{} hydrating", hydrating)
418 .if_supports_color(Stream::Stderr, |t| t.yellow())
419 .to_string(),
420 );
421 }
422 if lagging > 0 {
423 parts.push(
424 format!("{} lagging", lagging)
425 .if_supports_color(Stream::Stderr, |t| t.yellow())
426 .to_string(),
427 );
428 }
429 if failing > 0 {
430 parts.push(
431 format!("{} failing", failing)
432 .if_supports_color(Stream::Stderr, |t| t.red())
433 .to_string(),
434 );
435 }
436 info!("{}", parts.join(" · "));
437}
438
439async fn run_continuous(
441 deploy_id: &str,
442 client: &mut Client,
443 timeout: Option<u64>,
444 allowed_lag_secs: i64,
445) -> Result<(), CliError> {
446 let initial_statuses = client
448 .deployments()
449 .get_deployment_hydration_status_with_lag(deploy_id, allowed_lag_secs)
450 .await?;
451
452 if initial_statuses.is_empty() {
453 info!("No clusters found in staging deployment '{}'", deploy_id);
454 return Ok(());
455 }
456
457 let start_time = Instant::now();
458
459 let monitor_future = monitor_hydration_live(
461 deploy_id,
462 client,
463 initial_statuses,
464 start_time,
465 allowed_lag_secs,
466 );
467
468 if let Some(secs) = timeout {
469 match tokio::time::timeout(Duration::from_secs(secs), monitor_future).await {
470 Ok(result) => result,
471 Err(_) => Err(CliError::ReadyTimeout {
472 name: deploy_id.to_string(),
473 seconds: secs,
474 }),
475 }
476 } else {
477 monitor_future.await
478 }
479}
480
481async fn monitor_hydration_live(
483 deploy_id: &str,
484 client: &mut Client,
485 initial_statuses: Vec<ClusterStatusContext>,
486 start_time: Instant,
487 allowed_lag_secs: i64,
488) -> Result<(), CliError> {
489 let mut stdout = io::stdout();
490 let num_clusters = initial_statuses.len();
491
492 let mut cluster_states: BTreeMap<String, ClusterStatusContext> = initial_statuses
494 .into_iter()
495 .map(|ctx| (ctx.cluster_name.clone(), ctx))
496 .collect();
497
498 let lines_per_render = 3 + (num_clusters * 4) + 2;
501
502 render_dashboard(
504 &mut stdout,
505 deploy_id,
506 &cluster_states,
507 start_time,
508 allowed_lag_secs,
509 )?;
510
511 let mut deployments = client.deployments_mut();
512 let stream = deployments.subscribe_deployment_hydration(deploy_id, allowed_lag_secs);
513 let mut stream = pin!(stream);
514
515 let _ = execute!(stdout, Hide);
517
518 while let Some(result) = stream.next().await {
519 let update = result.map_err(CliError::Connection)?;
520
521 let status = update_to_status(&update);
522
523 cluster_states.insert(
524 update.cluster_name.clone(),
525 ClusterStatusContext {
526 cluster_name: update.cluster_name,
527 cluster_id: update.cluster_id,
528 status,
529 hydrated_count: update.hydrated_count,
530 total_count: update.total_count,
531 max_lag_secs: update.max_lag_secs,
532 total_replicas: update.total_replicas,
533 problematic_replicas: update.problematic_replicas,
534 },
535 );
536
537 #[allow(clippy::as_conversions)]
539 let lines = lines_per_render as u16;
540 let _ = execute!(stdout, MoveUp(lines), MoveToColumn(0));
541 for _ in 0..lines_per_render {
542 let _ = execute!(stdout, Clear(ClearType::CurrentLine));
543 info!();
544 }
545 let _ = execute!(stdout, MoveUp(lines), MoveToColumn(0));
546
547 render_dashboard(
548 &mut stdout,
549 deploy_id,
550 &cluster_states,
551 start_time,
552 allowed_lag_secs,
553 )?;
554
555 let all_ready = cluster_states
556 .values()
557 .all(|ctx| matches!(ctx.status, ClusterDeploymentStatus::Ready));
558
559 if all_ready {
560 let _ = execute!(stdout, Show);
561 info!();
562 let style = Style::new().green().bold();
563 info!(
564 "{}",
565 " All clusters are ready!".if_supports_color(Stream::Stderr, |t| style.style(t))
566 );
567 return Ok(());
568 }
569 }
570
571 let _ = execute!(stdout, Show);
572 Ok(())
573}
574
575fn update_to_status(update: &HydrationStatusUpdate) -> ClusterDeploymentStatus {
577 match update.status {
578 ClusterDeploymentStatus::Ready => ClusterDeploymentStatus::Ready,
579 ClusterDeploymentStatus::Hydrating { .. } => ClusterDeploymentStatus::Hydrating {
580 hydrated: update.hydrated_count,
581 total: update.total_count,
582 },
583 ClusterDeploymentStatus::Lagging { .. } => ClusterDeploymentStatus::Lagging {
584 max_lag_secs: update.max_lag_secs,
585 },
586 ClusterDeploymentStatus::Failing { .. } => {
587 let reason = match update.failure_reason {
588 Some(FailureReason::NoReplicas) => FailureReason::NoReplicas,
589 Some(FailureReason::AllReplicasProblematic { .. }) => {
590 FailureReason::AllReplicasProblematic {
591 problematic: update.problematic_replicas,
592 total: update.total_replicas,
593 }
594 }
595 None => FailureReason::NoReplicas,
596 };
597 ClusterDeploymentStatus::Failing { reason }
598 }
599 }
600}
601
602fn render_dashboard(
604 stdout: &mut io::Stdout,
605 deploy_id: &str,
606 cluster_states: &BTreeMap<String, ClusterStatusContext>,
607 start_time: Instant,
608 allowed_lag_secs: i64,
609) -> Result<(), CliError> {
610 let elapsed = start_time.elapsed();
611 let elapsed_str = format_duration(elapsed);
612
613 info!();
615 let header_style = Style::new().cyan().bold();
616 info!(
617 "{}",
618 format!(" mz-deploy wait · deployment: {}", deploy_id)
619 .if_supports_color(Stream::Stderr, |t| header_style.style(t))
620 );
621 info!(
622 " {} {}",
623 "elapsed:".if_supports_color(Stream::Stderr, |t| t.dimmed()),
624 elapsed_str.if_supports_color(Stream::Stderr, |t| t.dimmed())
625 );
626 info!();
627
628 let mut cluster_names: Vec<_> = cluster_states.keys().collect();
630 cluster_names.sort();
631
632 for name in cluster_names {
634 if let Some(ctx) = cluster_states.get(name) {
635 print_cluster_status(ctx, allowed_lag_secs);
636 }
637 }
638
639 let statuses: Vec<_> = cluster_states.values().cloned().collect();
641 print_summary(&statuses);
642
643 let _ = stdout.flush();
644 Ok(())
645}
646
647fn format_duration(duration: Duration) -> String {
649 let secs = duration.as_secs();
650 if secs < 60 {
651 format!("{}s", secs)
652 } else if secs < 3600 {
653 let mins = secs / 60;
654 let secs = secs % 60;
655 format!("{}m {}s", mins, secs)
656 } else {
657 let hours = secs / 3600;
658 let mins = (secs % 3600) / 60;
659 format!("{}h {}m", hours, mins)
660 }
661}