1use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20 ENABLE_0DT_CAUGHT_UP_CHECK, ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK,
21 WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG, WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38 pub trigger: Trigger,
41 pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49 pub async fn maybe_check_caught_up(&mut self) {
54 let enable_caught_up_check =
55 ENABLE_0DT_CAUGHT_UP_CHECK.get(self.catalog().system_config().dyncfgs());
56
57 if enable_caught_up_check {
58 self.maybe_check_caught_up_new().await
59 } else {
60 self.maybe_check_caught_up_legacy().await
61 }
62 }
63
64 async fn maybe_check_caught_up_new(&mut self) {
65 let Some(ctx) = &self.caught_up_check else {
66 return;
67 };
68
69 let replica_frontier_item_id = self
70 .catalog()
71 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
72 let replica_frontier_gid = self
73 .catalog()
74 .get_entry(&replica_frontier_item_id)
75 .latest_global_id();
76
77 let live_frontiers = self
78 .controller
79 .storage_collections
80 .snapshot_latest(replica_frontier_gid)
81 .await
82 .expect("can't read mz_cluster_replica_frontiers");
83
84 let live_frontiers = live_frontiers
85 .into_iter()
86 .map(|row| {
87 let mut iter = row.into_iter();
88
89 let id: GlobalId = iter
90 .next()
91 .expect("missing object id")
92 .unwrap_str()
93 .parse()
94 .expect("cannot parse id");
95 let replica_id = iter
96 .next()
97 .expect("missing replica id")
98 .unwrap_str()
99 .to_string();
100 let maybe_upper_ts = iter.next().expect("missing upper_ts");
101 let upper_frontier = if maybe_upper_ts.is_null() {
106 Antichain::new()
107 } else {
108 let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
109 Antichain::from_elem(upper_ts)
110 };
111
112 (id, replica_id, upper_frontier)
113 })
114 .collect_vec();
115
116 let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
120 .into_iter()
121 .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
122 .into_grouping_map()
123 .fold(
124 Antichain::from_elem(Timestamp::minimum()),
125 |mut acc, _key, upper| {
126 acc.join_assign(&upper);
127 acc
128 },
129 )
130 .into_iter()
131 .collect();
132
133 tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
134
135 let allowed_lag =
136 WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
137 let allowed_lag: u64 = allowed_lag
138 .as_millis()
139 .try_into()
140 .expect("must fit into u64");
141
142 let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
143 let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
144
145 let now = self.now();
146
147 let replica_status_check_enabled =
150 ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
151
152 let problematic_replicas = if replica_status_check_enabled {
154 self.analyze_replica_looping(now).await
155 } else {
156 BTreeSet::new()
157 };
158
159 let compute_caught_up = self
160 .clusters_caught_up(
161 allowed_lag.into(),
162 cutoff.into(),
163 now.into(),
164 &live_collection_frontiers,
165 &ctx.exclude_collections,
166 &problematic_replicas,
167 )
168 .await;
169
170 tracing::info!(%compute_caught_up, "checked caught-up status of collections");
171
172 if compute_caught_up {
173 let ctx = self.caught_up_check.take().expect("known to exist");
174 ctx.trigger.fire();
175 }
176 }
177
178 async fn clusters_caught_up(
191 &self,
192 allowed_lag: Timestamp,
193 cutoff: Timestamp,
194 now: Timestamp,
195 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
196 exclude_collections: &BTreeSet<GlobalId>,
197 problematic_replicas: &BTreeSet<ReplicaId>,
198 ) -> bool {
199 let mut result = true;
200 for cluster in self.catalog().clusters() {
201 let caught_up = self
202 .collections_caught_up(
203 cluster,
204 allowed_lag.clone(),
205 cutoff.clone(),
206 now.clone(),
207 live_frontiers,
208 exclude_collections,
209 problematic_replicas,
210 )
211 .await;
212
213 let caught_up = caught_up.unwrap_or_else(|e| {
214 tracing::error!(
215 "unexpected error while checking if cluster {} caught up: {e:#}",
216 cluster.id
217 );
218 false
219 });
220
221 if !caught_up {
222 result = false;
223
224 tracing::info!("cluster {} is not caught up", cluster.id);
227 }
228 }
229
230 result
231 }
232
233 async fn collections_caught_up(
245 &self,
246 cluster: &Cluster,
247 allowed_lag: Timestamp,
248 cutoff: Timestamp,
249 now: Timestamp,
250 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
251 exclude_collections: &BTreeSet<GlobalId>,
252 problematic_replicas: &BTreeSet<ReplicaId>,
253 ) -> Result<bool, anyhow::Error> {
254 if cluster.replicas().next().is_none() {
255 return Ok(true);
256 }
257
258 let cluster_has_only_problematic_replicas = cluster
261 .replicas()
262 .all(|replica| problematic_replicas.contains(&replica.replica_id));
263
264 enum CollectionType {
265 Storage,
266 Compute,
267 }
268
269 let mut all_caught_up = true;
270
271 let storage_frontiers = self
272 .controller
273 .storage
274 .active_ingestions(cluster.id)
275 .copied()
276 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
277 .map(|id| {
278 let (_read_frontier, write_frontier) =
279 self.controller.storage.collection_frontiers(id)?;
280 Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
281 });
282
283 let compute_frontiers = self
284 .controller
285 .compute
286 .collection_ids(cluster.id)?
287 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
288 .map(|id| {
289 let write_frontier = self
290 .controller
291 .compute
292 .collection_frontiers(id, Some(cluster.id))?
293 .write_frontier
294 .to_owned();
295 Ok((id, write_frontier, CollectionType::Compute))
296 });
297
298 for res in itertools::chain(storage_frontiers, compute_frontiers) {
299 let (id, write_frontier, collection_type) = res?;
300 let live_write_frontier = match live_frontiers.get(&id) {
301 Some(frontier) => frontier,
302 None => {
303 tracing::info!(?write_frontier, "collection {id} not in live frontiers");
306 if write_frontier.less_equal(&Timestamp::minimum()) {
307 all_caught_up = false;
308 }
309 continue;
310 }
311 };
312
313 let live_write_frontier_plus_cutoff = live_write_frontier
317 .iter()
318 .map(|t| t.step_forward_by(&cutoff));
319 let live_write_frontier_plus_cutoff =
320 Antichain::from_iter(live_write_frontier_plus_cutoff);
321
322 let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
323
324 if beyond_all_hope && cluster_has_only_problematic_replicas {
325 tracing::info!(
326 ?live_write_frontier,
327 ?cutoff,
328 ?now,
329 "live write frontier of collection {id} is too far behind 'now'"
330 );
331 tracing::info!(
332 "ALL replicas of cluster {} are crash/OOM-looping and it has at least one collection that is too far behind 'now', ignoring cluster for caught-up checks",
333 cluster.id
334 );
335 return Ok(true);
336 }
337
338 let write_frontier_plus_allowed_lag = write_frontier
342 .iter()
343 .map(|t| t.step_forward_by(&allowed_lag));
344 let bumped_write_plus_allowed_lag =
345 Antichain::from_iter(write_frontier_plus_allowed_lag);
346
347 let within_lag =
348 PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
349
350 let collection_hydrated = match collection_type {
356 CollectionType::Compute => {
357 self.controller
358 .compute
359 .collection_hydrated(cluster.id, id)
360 .await?
361 }
362 CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
363 };
364
365 if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
368 tracing::info!(
373 %id,
374 %within_lag,
375 %collection_hydrated,
376 ?write_frontier,
377 ?live_write_frontier,
378 ?allowed_lag,
379 %cluster.id,
380 "collection is caught up");
381 } else {
382 tracing::info!(
387 %id,
388 %within_lag,
389 %collection_hydrated,
390 ?write_frontier,
391 ?live_write_frontier,
392 ?allowed_lag,
393 %cluster.id,
394 "collection is not caught up"
395 );
396 all_caught_up = false;
397 }
398 }
399
400 Ok(all_caught_up)
401 }
402
403 async fn maybe_check_caught_up_legacy(&mut self) {
404 let Some(ctx) = &self.caught_up_check else {
405 return;
406 };
407
408 let compute_hydrated = self
409 .controller
410 .compute
411 .clusters_hydrated(&ctx.exclude_collections)
412 .await;
413 tracing::info!(%compute_hydrated, "checked hydration status of clusters");
414
415 if compute_hydrated {
416 let ctx = self.caught_up_check.take().expect("known to exist");
417 ctx.trigger.fire();
418 }
419 }
420
421 async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
427 let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
429 .as_millis()
430 .try_into()
431 .expect("fits into u64");
432 let min_timestamp = now.saturating_sub(lookback_window);
433 let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
434
435 let replica_status_item_id = self
437 .catalog()
438 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
439 let replica_status_gid = self
440 .catalog()
441 .get_entry(&replica_status_item_id)
442 .latest_global_id();
443
444 let read_holds = self
446 .controller
447 .storage_collections
448 .acquire_read_holds(vec![replica_status_gid])
449 .expect("can't acquire read hold for mz_cluster_replica_status_history");
450 let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
451 read_hold
452 } else {
453 return BTreeSet::new();
456 };
457
458 let as_of = read_hold
459 .since()
460 .iter()
461 .next()
462 .cloned()
463 .expect("since should not be empty");
464
465 let mut replica_statuses_stream = self
466 .controller
467 .storage_collections
468 .snapshot_and_stream(replica_status_gid, as_of)
469 .await
470 .expect("can't read mz_cluster_replica_status_history");
471
472 let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
473
474 while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
475 if diff <= 0 {
477 continue;
478 }
479
480 let row = match source_data.0 {
482 Ok(row) => row,
483 Err(err) => {
484 tracing::error!(
487 collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
488 ?err,
489 "unexpected error in builtin collection"
490 );
491 continue;
492 }
493 };
494
495 let mut iter = row.into_iter();
496
497 let replica_id: ReplicaId = iter
498 .next()
499 .expect("missing replica_id")
500 .unwrap_str()
501 .parse()
502 .expect("must parse as replica ID");
503 let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
504 let status = iter
505 .next()
506 .expect("missing status")
507 .unwrap_str()
508 .to_string();
509 let reason_datum = iter.next().expect("missing reason");
510 let reason = if reason_datum.is_null() {
511 None
512 } else {
513 Some(reason_datum.unwrap_str().to_string())
514 };
515 let occurred_at = iter
516 .next()
517 .expect("missing occurred_at")
518 .unwrap_timestamptz();
519
520 if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
522 if Self::is_problematic_status(&status, reason.as_deref()) {
523 *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
524 }
525 }
526 }
527
528 let result = replica_problem_counts
530 .into_iter()
531 .filter_map(|(replica_id, count)| {
532 if count >= 3 {
533 tracing::info!(
534 "Detected problematic cluster replica {}: {} problematic events in last {:?}",
535 replica_id,
536 count,
537 Duration::from_millis(lookback_window)
538 );
539 Some(replica_id)
540 } else {
541 None
542 }
543 })
544 .collect();
545
546 drop(read_hold);
548
549 result
550 }
551
552 fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
555 if let Some(reason) = reason {
558 return reason == OfflineReason::OomKilled.to_string();
559 }
560
561 false
562 }
563}