1use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20 ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
21 WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38 pub trigger: Trigger,
41 pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49 pub async fn maybe_check_caught_up(&mut self) {
54 let Some(ctx) = &self.caught_up_check else {
55 return;
56 };
57
58 let replica_frontier_item_id = self
59 .catalog()
60 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
61 let replica_frontier_gid = self
62 .catalog()
63 .get_entry(&replica_frontier_item_id)
64 .latest_global_id();
65
66 let live_frontiers = self
70 .controller
71 .storage_collections
72 .snapshot_latest(replica_frontier_gid)
73 .await
74 .expect("can't read mz_cluster_replica_frontiers");
75
76 let live_frontiers = live_frontiers
77 .into_iter()
78 .map(|row| {
79 let mut iter = row.into_iter();
80
81 let id: GlobalId = iter
82 .next()
83 .expect("missing object id")
84 .unwrap_str()
85 .parse()
86 .expect("cannot parse id");
87 let replica_id = iter
88 .next()
89 .expect("missing replica id")
90 .unwrap_str()
91 .to_string();
92 let maybe_upper_ts = iter.next().expect("missing upper_ts");
93 let upper_frontier = if maybe_upper_ts.is_null() {
98 Antichain::new()
99 } else {
100 let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
101 Antichain::from_elem(upper_ts)
102 };
103
104 (id, replica_id, upper_frontier)
105 })
106 .collect_vec();
107
108 let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
112 .into_iter()
113 .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
114 .into_grouping_map()
115 .fold(
116 Antichain::from_elem(Timestamp::minimum()),
117 |mut acc, _key, upper| {
118 acc.join_assign(&upper);
119 acc
120 },
121 )
122 .into_iter()
123 .collect();
124
125 tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
126
127 let allowed_lag =
128 WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
129 let allowed_lag: u64 = allowed_lag
130 .as_millis()
131 .try_into()
132 .expect("must fit into u64");
133
134 let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
135 let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
136
137 let now = self.now();
138
139 let replica_status_check_enabled =
142 ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
143
144 let problematic_replicas = if replica_status_check_enabled {
146 self.analyze_replica_looping(now).await
147 } else {
148 BTreeSet::new()
149 };
150
151 let caught_up = self
152 .clusters_caught_up(
153 allowed_lag.into(),
154 cutoff.into(),
155 now.into(),
156 &live_collection_frontiers,
157 &ctx.exclude_collections,
158 &problematic_replicas,
159 )
160 .await;
161
162 tracing::info!(%caught_up, "checked caught-up status of collections");
163
164 if caught_up {
165 let ctx = self.caught_up_check.take().expect("known to exist");
166 ctx.trigger.fire();
167 }
168 }
169
170 async fn clusters_caught_up(
189 &self,
190 allowed_lag: Timestamp,
191 cutoff: Timestamp,
192 now: Timestamp,
193 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
194 exclude_collections: &BTreeSet<GlobalId>,
195 problematic_replicas: &BTreeSet<ReplicaId>,
196 ) -> bool {
197 let mut result = true;
198 for cluster in self.catalog().clusters() {
199 let caught_up = self
200 .collections_caught_up(
201 cluster,
202 allowed_lag.clone(),
203 cutoff.clone(),
204 now.clone(),
205 live_frontiers,
206 exclude_collections,
207 problematic_replicas,
208 )
209 .await;
210
211 let caught_up = caught_up.unwrap_or_else(|e| {
212 tracing::error!(
213 "unexpected error while checking if cluster {} caught up: {e:#}",
214 cluster.id
215 );
216 false
217 });
218
219 if !caught_up {
220 result = false;
221
222 tracing::info!("cluster {} is not caught up", cluster.id);
225 }
226 }
227
228 result
229 }
230
231 async fn collections_caught_up(
235 &self,
236 cluster: &Cluster,
237 allowed_lag: Timestamp,
238 cutoff: Timestamp,
239 now: Timestamp,
240 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
241 exclude_collections: &BTreeSet<GlobalId>,
242 problematic_replicas: &BTreeSet<ReplicaId>,
243 ) -> Result<bool, anyhow::Error> {
244 if cluster.replicas().next().is_none() {
245 return Ok(true);
246 }
247
248 let cluster_has_only_problematic_replicas = cluster
251 .replicas()
252 .all(|replica| problematic_replicas.contains(&replica.replica_id));
253
254 enum CollectionType {
255 Storage,
256 Compute,
257 }
258
259 let mut all_caught_up = true;
260
261 let storage_frontiers = self
262 .controller
263 .storage
264 .active_ingestion_exports(cluster.id)
265 .copied()
266 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
267 .map(|id| {
268 let (_read_frontier, write_frontier) =
269 self.controller.storage.collection_frontiers(id)?;
270 Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
271 });
272
273 let compute_frontiers = self
274 .controller
275 .compute
276 .collection_ids(cluster.id)?
277 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
278 .map(|id| {
279 let write_frontier = self
280 .controller
281 .compute
282 .collection_frontiers(id, Some(cluster.id))?
283 .write_frontier
284 .to_owned();
285 Ok((id, write_frontier, CollectionType::Compute))
286 });
287
288 for res in itertools::chain(storage_frontiers, compute_frontiers) {
289 let (id, write_frontier, collection_type) = res?;
290 let live_write_frontier = match live_frontiers.get(&id) {
291 Some(frontier) => frontier,
292 None => {
293 tracing::info!(?write_frontier, "collection {id} not in live frontiers");
296 if write_frontier.less_equal(&Timestamp::minimum()) {
297 all_caught_up = false;
298 }
299 continue;
300 }
301 };
302
303 let live_write_frontier_plus_cutoff = live_write_frontier
307 .iter()
308 .map(|t| t.step_forward_by(&cutoff));
309 let live_write_frontier_plus_cutoff =
310 Antichain::from_iter(live_write_frontier_plus_cutoff);
311
312 let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
313
314 if beyond_all_hope && cluster_has_only_problematic_replicas {
315 tracing::info!(
316 ?live_write_frontier,
317 ?cutoff,
318 ?now,
319 "live write frontier of collection {id} is too far behind 'now'"
320 );
321 tracing::info!(
322 "ALL replicas of cluster {} are crash/OOM-looping and it has at least one \
323 collection that is too far behind 'now'; ignoring cluster for caught-up \
324 checks",
325 cluster.id
326 );
327 return Ok(true);
328 } else if beyond_all_hope {
329 tracing::info!(
330 ?live_write_frontier,
331 ?cutoff,
332 ?now,
333 "live write frontier of collection {id} is too far behind 'now'; \
334 ignoring for caught-up checks"
335 );
336 continue;
337 }
338
339 let write_frontier_plus_allowed_lag = write_frontier
343 .iter()
344 .map(|t| t.step_forward_by(&allowed_lag));
345 let bumped_write_plus_allowed_lag =
346 Antichain::from_iter(write_frontier_plus_allowed_lag);
347
348 let within_lag =
349 PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
350
351 let collection_hydrated = match collection_type {
357 CollectionType::Compute => {
358 self.controller
359 .compute
360 .collection_hydrated(cluster.id, id)
361 .await?
362 }
363 CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
364 };
365
366 if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
369 tracing::info!(
374 %id,
375 %within_lag,
376 %collection_hydrated,
377 ?write_frontier,
378 ?live_write_frontier,
379 ?allowed_lag,
380 %cluster.id,
381 "collection is caught up");
382 } else {
383 tracing::info!(
388 %id,
389 %within_lag,
390 %collection_hydrated,
391 ?write_frontier,
392 ?live_write_frontier,
393 ?allowed_lag,
394 %cluster.id,
395 "collection is not caught up"
396 );
397 all_caught_up = false;
398 }
399 }
400
401 Ok(all_caught_up)
402 }
403
404 async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
410 let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
412 .as_millis()
413 .try_into()
414 .expect("fits into u64");
415 let min_timestamp = now.saturating_sub(lookback_window);
416 let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
417
418 let replica_status_item_id = self
420 .catalog()
421 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
422 let replica_status_gid = self
423 .catalog()
424 .get_entry(&replica_status_item_id)
425 .latest_global_id();
426
427 let read_holds = self
429 .controller
430 .storage_collections
431 .acquire_read_holds(vec![replica_status_gid])
432 .expect("can't acquire read hold for mz_cluster_replica_status_history");
433 let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
434 read_hold
435 } else {
436 return BTreeSet::new();
439 };
440
441 let as_of = read_hold
442 .since()
443 .iter()
444 .next()
445 .cloned()
446 .expect("since should not be empty");
447
448 let mut replica_statuses_stream = self
449 .controller
450 .storage_collections
451 .snapshot_and_stream(replica_status_gid, as_of)
452 .await
453 .expect("can't read mz_cluster_replica_status_history");
454
455 let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
456
457 while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
458 if diff <= 0 {
460 continue;
461 }
462
463 let row = match source_data.0 {
465 Ok(row) => row,
466 Err(err) => {
467 tracing::error!(
470 collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
471 ?err,
472 "unexpected error in builtin collection"
473 );
474 continue;
475 }
476 };
477
478 let mut iter = row.into_iter();
479
480 let replica_id: ReplicaId = iter
481 .next()
482 .expect("missing replica_id")
483 .unwrap_str()
484 .parse()
485 .expect("must parse as replica ID");
486 let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
487 let status = iter
488 .next()
489 .expect("missing status")
490 .unwrap_str()
491 .to_string();
492 let reason_datum = iter.next().expect("missing reason");
493 let reason = if reason_datum.is_null() {
494 None
495 } else {
496 Some(reason_datum.unwrap_str().to_string())
497 };
498 let occurred_at = iter
499 .next()
500 .expect("missing occurred_at")
501 .unwrap_timestamptz();
502
503 if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
505 if Self::is_problematic_status(&status, reason.as_deref()) {
506 *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
507 }
508 }
509 }
510
511 let result = replica_problem_counts
513 .into_iter()
514 .filter_map(|(replica_id, count)| {
515 if count >= 3 {
516 tracing::info!(
517 "Detected problematic cluster replica {}: {} problematic events in last {:?}",
518 replica_id,
519 count,
520 Duration::from_millis(lookback_window)
521 );
522 Some(replica_id)
523 } else {
524 None
525 }
526 })
527 .collect();
528
529 drop(read_hold);
531
532 result
533 }
534
535 fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
538 if let Some(reason) = reason {
541 return reason == OfflineReason::OomKilled.to_string();
542 }
543
544 false
545 }
546}