1use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20 ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
21 WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38 pub trigger: Trigger,
41 pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49 pub async fn maybe_check_caught_up(&mut self) {
54 let Some(ctx) = &self.caught_up_check else {
55 return;
56 };
57
58 let replica_frontier_item_id = self
59 .catalog()
60 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
61 let replica_frontier_gid = self
62 .catalog()
63 .get_entry(&replica_frontier_item_id)
64 .latest_global_id();
65
66 let live_frontiers = self
67 .controller
68 .storage_collections
69 .snapshot_latest(replica_frontier_gid)
70 .await
71 .expect("can't read mz_cluster_replica_frontiers");
72
73 let live_frontiers = live_frontiers
74 .into_iter()
75 .map(|row| {
76 let mut iter = row.into_iter();
77
78 let id: GlobalId = iter
79 .next()
80 .expect("missing object id")
81 .unwrap_str()
82 .parse()
83 .expect("cannot parse id");
84 let replica_id = iter
85 .next()
86 .expect("missing replica id")
87 .unwrap_str()
88 .to_string();
89 let maybe_upper_ts = iter.next().expect("missing upper_ts");
90 let upper_frontier = if maybe_upper_ts.is_null() {
95 Antichain::new()
96 } else {
97 let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
98 Antichain::from_elem(upper_ts)
99 };
100
101 (id, replica_id, upper_frontier)
102 })
103 .collect_vec();
104
105 let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
109 .into_iter()
110 .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
111 .into_grouping_map()
112 .fold(
113 Antichain::from_elem(Timestamp::minimum()),
114 |mut acc, _key, upper| {
115 acc.join_assign(&upper);
116 acc
117 },
118 )
119 .into_iter()
120 .collect();
121
122 tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
123
124 let allowed_lag =
125 WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
126 let allowed_lag: u64 = allowed_lag
127 .as_millis()
128 .try_into()
129 .expect("must fit into u64");
130
131 let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
132 let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
133
134 let now = self.now();
135
136 let replica_status_check_enabled =
139 ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
140
141 let problematic_replicas = if replica_status_check_enabled {
143 self.analyze_replica_looping(now).await
144 } else {
145 BTreeSet::new()
146 };
147
148 let compute_caught_up = self
149 .clusters_caught_up(
150 allowed_lag.into(),
151 cutoff.into(),
152 now.into(),
153 &live_collection_frontiers,
154 &ctx.exclude_collections,
155 &problematic_replicas,
156 )
157 .await;
158
159 tracing::info!(%compute_caught_up, "checked caught-up status of collections");
160
161 if compute_caught_up {
162 let ctx = self.caught_up_check.take().expect("known to exist");
163 ctx.trigger.fire();
164 }
165 }
166
167 async fn clusters_caught_up(
180 &self,
181 allowed_lag: Timestamp,
182 cutoff: Timestamp,
183 now: Timestamp,
184 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
185 exclude_collections: &BTreeSet<GlobalId>,
186 problematic_replicas: &BTreeSet<ReplicaId>,
187 ) -> bool {
188 let mut result = true;
189 for cluster in self.catalog().clusters() {
190 let caught_up = self
191 .collections_caught_up(
192 cluster,
193 allowed_lag.clone(),
194 cutoff.clone(),
195 now.clone(),
196 live_frontiers,
197 exclude_collections,
198 problematic_replicas,
199 )
200 .await;
201
202 let caught_up = caught_up.unwrap_or_else(|e| {
203 tracing::error!(
204 "unexpected error while checking if cluster {} caught up: {e:#}",
205 cluster.id
206 );
207 false
208 });
209
210 if !caught_up {
211 result = false;
212
213 tracing::info!("cluster {} is not caught up", cluster.id);
216 }
217 }
218
219 result
220 }
221
222 async fn collections_caught_up(
234 &self,
235 cluster: &Cluster,
236 allowed_lag: Timestamp,
237 cutoff: Timestamp,
238 now: Timestamp,
239 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
240 exclude_collections: &BTreeSet<GlobalId>,
241 problematic_replicas: &BTreeSet<ReplicaId>,
242 ) -> Result<bool, anyhow::Error> {
243 if cluster.replicas().next().is_none() {
244 return Ok(true);
245 }
246
247 let cluster_has_only_problematic_replicas = cluster
250 .replicas()
251 .all(|replica| problematic_replicas.contains(&replica.replica_id));
252
253 enum CollectionType {
254 Storage,
255 Compute,
256 }
257
258 let mut all_caught_up = true;
259
260 let storage_frontiers = self
261 .controller
262 .storage
263 .active_ingestion_exports(cluster.id)
264 .copied()
265 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
266 .map(|id| {
267 let (_read_frontier, write_frontier) =
268 self.controller.storage.collection_frontiers(id)?;
269 Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
270 });
271
272 let compute_frontiers = self
273 .controller
274 .compute
275 .collection_ids(cluster.id)?
276 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
277 .map(|id| {
278 let write_frontier = self
279 .controller
280 .compute
281 .collection_frontiers(id, Some(cluster.id))?
282 .write_frontier
283 .to_owned();
284 Ok((id, write_frontier, CollectionType::Compute))
285 });
286
287 for res in itertools::chain(storage_frontiers, compute_frontiers) {
288 let (id, write_frontier, collection_type) = res?;
289 let live_write_frontier = match live_frontiers.get(&id) {
290 Some(frontier) => frontier,
291 None => {
292 tracing::info!(?write_frontier, "collection {id} not in live frontiers");
295 if write_frontier.less_equal(&Timestamp::minimum()) {
296 all_caught_up = false;
297 }
298 continue;
299 }
300 };
301
302 let live_write_frontier_plus_cutoff = live_write_frontier
306 .iter()
307 .map(|t| t.step_forward_by(&cutoff));
308 let live_write_frontier_plus_cutoff =
309 Antichain::from_iter(live_write_frontier_plus_cutoff);
310
311 let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
312
313 if beyond_all_hope && cluster_has_only_problematic_replicas {
314 tracing::info!(
315 ?live_write_frontier,
316 ?cutoff,
317 ?now,
318 "live write frontier of collection {id} is too far behind 'now'"
319 );
320 tracing::info!(
321 "ALL replicas of cluster {} are crash/OOM-looping and it has at least one collection that is too far behind 'now', ignoring cluster for caught-up checks",
322 cluster.id
323 );
324 return Ok(true);
325 }
326
327 let write_frontier_plus_allowed_lag = write_frontier
331 .iter()
332 .map(|t| t.step_forward_by(&allowed_lag));
333 let bumped_write_plus_allowed_lag =
334 Antichain::from_iter(write_frontier_plus_allowed_lag);
335
336 let within_lag =
337 PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
338
339 let collection_hydrated = match collection_type {
345 CollectionType::Compute => {
346 self.controller
347 .compute
348 .collection_hydrated(cluster.id, id)
349 .await?
350 }
351 CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
352 };
353
354 if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
357 tracing::info!(
362 %id,
363 %within_lag,
364 %collection_hydrated,
365 ?write_frontier,
366 ?live_write_frontier,
367 ?allowed_lag,
368 %cluster.id,
369 "collection is caught up");
370 } else {
371 tracing::info!(
376 %id,
377 %within_lag,
378 %collection_hydrated,
379 ?write_frontier,
380 ?live_write_frontier,
381 ?allowed_lag,
382 %cluster.id,
383 "collection is not caught up"
384 );
385 all_caught_up = false;
386 }
387 }
388
389 Ok(all_caught_up)
390 }
391
392 async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
398 let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
400 .as_millis()
401 .try_into()
402 .expect("fits into u64");
403 let min_timestamp = now.saturating_sub(lookback_window);
404 let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
405
406 let replica_status_item_id = self
408 .catalog()
409 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
410 let replica_status_gid = self
411 .catalog()
412 .get_entry(&replica_status_item_id)
413 .latest_global_id();
414
415 let read_holds = self
417 .controller
418 .storage_collections
419 .acquire_read_holds(vec![replica_status_gid])
420 .expect("can't acquire read hold for mz_cluster_replica_status_history");
421 let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
422 read_hold
423 } else {
424 return BTreeSet::new();
427 };
428
429 let as_of = read_hold
430 .since()
431 .iter()
432 .next()
433 .cloned()
434 .expect("since should not be empty");
435
436 let mut replica_statuses_stream = self
437 .controller
438 .storage_collections
439 .snapshot_and_stream(replica_status_gid, as_of)
440 .await
441 .expect("can't read mz_cluster_replica_status_history");
442
443 let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
444
445 while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
446 if diff <= 0 {
448 continue;
449 }
450
451 let row = match source_data.0 {
453 Ok(row) => row,
454 Err(err) => {
455 tracing::error!(
458 collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
459 ?err,
460 "unexpected error in builtin collection"
461 );
462 continue;
463 }
464 };
465
466 let mut iter = row.into_iter();
467
468 let replica_id: ReplicaId = iter
469 .next()
470 .expect("missing replica_id")
471 .unwrap_str()
472 .parse()
473 .expect("must parse as replica ID");
474 let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
475 let status = iter
476 .next()
477 .expect("missing status")
478 .unwrap_str()
479 .to_string();
480 let reason_datum = iter.next().expect("missing reason");
481 let reason = if reason_datum.is_null() {
482 None
483 } else {
484 Some(reason_datum.unwrap_str().to_string())
485 };
486 let occurred_at = iter
487 .next()
488 .expect("missing occurred_at")
489 .unwrap_timestamptz();
490
491 if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
493 if Self::is_problematic_status(&status, reason.as_deref()) {
494 *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
495 }
496 }
497 }
498
499 let result = replica_problem_counts
501 .into_iter()
502 .filter_map(|(replica_id, count)| {
503 if count >= 3 {
504 tracing::info!(
505 "Detected problematic cluster replica {}: {} problematic events in last {:?}",
506 replica_id,
507 count,
508 Duration::from_millis(lookback_window)
509 );
510 Some(replica_id)
511 } else {
512 None
513 }
514 })
515 .collect();
516
517 drop(read_hold);
519
520 result
521 }
522
523 fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
526 if let Some(reason) = reason {
529 return reason == OfflineReason::OomKilled.to_string();
530 }
531
532 false
533 }
534}