1use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20 ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
21 WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38 pub trigger: Trigger,
41 pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49 pub async fn maybe_check_caught_up(&mut self) {
54 let Some(ctx) = &self.caught_up_check else {
55 return;
56 };
57
58 let replica_frontier_item_id = self
59 .catalog()
60 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
61 let replica_frontier_gid = self
62 .catalog()
63 .get_entry(&replica_frontier_item_id)
64 .latest_global_id();
65
66 let live_frontiers = self
67 .controller
68 .storage_collections
69 .snapshot_latest(replica_frontier_gid)
70 .await
71 .expect("can't read mz_cluster_replica_frontiers");
72
73 let live_frontiers = live_frontiers
74 .into_iter()
75 .map(|row| {
76 let mut iter = row.into_iter();
77
78 let id: GlobalId = iter
79 .next()
80 .expect("missing object id")
81 .unwrap_str()
82 .parse()
83 .expect("cannot parse id");
84 let replica_id = iter
85 .next()
86 .expect("missing replica id")
87 .unwrap_str()
88 .to_string();
89 let maybe_upper_ts = iter.next().expect("missing upper_ts");
90 let upper_frontier = if maybe_upper_ts.is_null() {
95 Antichain::new()
96 } else {
97 let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
98 Antichain::from_elem(upper_ts)
99 };
100
101 (id, replica_id, upper_frontier)
102 })
103 .collect_vec();
104
105 let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
109 .into_iter()
110 .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
111 .into_grouping_map()
112 .fold(
113 Antichain::from_elem(Timestamp::minimum()),
114 |mut acc, _key, upper| {
115 acc.join_assign(&upper);
116 acc
117 },
118 )
119 .into_iter()
120 .collect();
121
122 tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
123
124 let allowed_lag =
125 WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
126 let allowed_lag: u64 = allowed_lag
127 .as_millis()
128 .try_into()
129 .expect("must fit into u64");
130
131 let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
132 let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
133
134 let now = self.now();
135
136 let replica_status_check_enabled =
139 ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
140
141 let problematic_replicas = if replica_status_check_enabled {
143 self.analyze_replica_looping(now).await
144 } else {
145 BTreeSet::new()
146 };
147
148 let caught_up = self
149 .clusters_caught_up(
150 allowed_lag.into(),
151 cutoff.into(),
152 now.into(),
153 &live_collection_frontiers,
154 &ctx.exclude_collections,
155 &problematic_replicas,
156 )
157 .await;
158
159 tracing::info!(%caught_up, "checked caught-up status of collections");
160
161 if caught_up {
162 let ctx = self.caught_up_check.take().expect("known to exist");
163 ctx.trigger.fire();
164 }
165 }
166
167 async fn clusters_caught_up(
186 &self,
187 allowed_lag: Timestamp,
188 cutoff: Timestamp,
189 now: Timestamp,
190 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
191 exclude_collections: &BTreeSet<GlobalId>,
192 problematic_replicas: &BTreeSet<ReplicaId>,
193 ) -> bool {
194 let mut result = true;
195 for cluster in self.catalog().clusters() {
196 let caught_up = self
197 .collections_caught_up(
198 cluster,
199 allowed_lag.clone(),
200 cutoff.clone(),
201 now.clone(),
202 live_frontiers,
203 exclude_collections,
204 problematic_replicas,
205 )
206 .await;
207
208 let caught_up = caught_up.unwrap_or_else(|e| {
209 tracing::error!(
210 "unexpected error while checking if cluster {} caught up: {e:#}",
211 cluster.id
212 );
213 false
214 });
215
216 if !caught_up {
217 result = false;
218
219 tracing::info!("cluster {} is not caught up", cluster.id);
222 }
223 }
224
225 result
226 }
227
228 async fn collections_caught_up(
232 &self,
233 cluster: &Cluster,
234 allowed_lag: Timestamp,
235 cutoff: Timestamp,
236 now: Timestamp,
237 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
238 exclude_collections: &BTreeSet<GlobalId>,
239 problematic_replicas: &BTreeSet<ReplicaId>,
240 ) -> Result<bool, anyhow::Error> {
241 if cluster.replicas().next().is_none() {
242 return Ok(true);
243 }
244
245 let cluster_has_only_problematic_replicas = cluster
248 .replicas()
249 .all(|replica| problematic_replicas.contains(&replica.replica_id));
250
251 enum CollectionType {
252 Storage,
253 Compute,
254 }
255
256 let mut all_caught_up = true;
257
258 let storage_frontiers = self
259 .controller
260 .storage
261 .active_ingestion_exports(cluster.id)
262 .copied()
263 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
264 .map(|id| {
265 let (_read_frontier, write_frontier) =
266 self.controller.storage.collection_frontiers(id)?;
267 Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
268 });
269
270 let compute_frontiers = self
271 .controller
272 .compute
273 .collection_ids(cluster.id)?
274 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
275 .map(|id| {
276 let write_frontier = self
277 .controller
278 .compute
279 .collection_frontiers(id, Some(cluster.id))?
280 .write_frontier
281 .to_owned();
282 Ok((id, write_frontier, CollectionType::Compute))
283 });
284
285 for res in itertools::chain(storage_frontiers, compute_frontiers) {
286 let (id, write_frontier, collection_type) = res?;
287 let live_write_frontier = match live_frontiers.get(&id) {
288 Some(frontier) => frontier,
289 None => {
290 tracing::info!(?write_frontier, "collection {id} not in live frontiers");
293 if write_frontier.less_equal(&Timestamp::minimum()) {
294 all_caught_up = false;
295 }
296 continue;
297 }
298 };
299
300 let live_write_frontier_plus_cutoff = live_write_frontier
304 .iter()
305 .map(|t| t.step_forward_by(&cutoff));
306 let live_write_frontier_plus_cutoff =
307 Antichain::from_iter(live_write_frontier_plus_cutoff);
308
309 let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
310
311 if beyond_all_hope && cluster_has_only_problematic_replicas {
312 tracing::info!(
313 ?live_write_frontier,
314 ?cutoff,
315 ?now,
316 "live write frontier of collection {id} is too far behind 'now'"
317 );
318 tracing::info!(
319 "ALL replicas of cluster {} are crash/OOM-looping and it has at least one \
320 collection that is too far behind 'now'; ignoring cluster for caught-up \
321 checks",
322 cluster.id
323 );
324 return Ok(true);
325 } else if beyond_all_hope {
326 tracing::info!(
327 ?live_write_frontier,
328 ?cutoff,
329 ?now,
330 "live write frontier of collection {id} is too far behind 'now'; \
331 ignoring for caught-up checks"
332 );
333 continue;
334 }
335
336 let write_frontier_plus_allowed_lag = write_frontier
340 .iter()
341 .map(|t| t.step_forward_by(&allowed_lag));
342 let bumped_write_plus_allowed_lag =
343 Antichain::from_iter(write_frontier_plus_allowed_lag);
344
345 let within_lag =
346 PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
347
348 let collection_hydrated = match collection_type {
354 CollectionType::Compute => {
355 self.controller
356 .compute
357 .collection_hydrated(cluster.id, id)
358 .await?
359 }
360 CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
361 };
362
363 if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
366 tracing::info!(
371 %id,
372 %within_lag,
373 %collection_hydrated,
374 ?write_frontier,
375 ?live_write_frontier,
376 ?allowed_lag,
377 %cluster.id,
378 "collection is caught up");
379 } else {
380 tracing::info!(
385 %id,
386 %within_lag,
387 %collection_hydrated,
388 ?write_frontier,
389 ?live_write_frontier,
390 ?allowed_lag,
391 %cluster.id,
392 "collection is not caught up"
393 );
394 all_caught_up = false;
395 }
396 }
397
398 Ok(all_caught_up)
399 }
400
401 async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
407 let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
409 .as_millis()
410 .try_into()
411 .expect("fits into u64");
412 let min_timestamp = now.saturating_sub(lookback_window);
413 let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
414
415 let replica_status_item_id = self
417 .catalog()
418 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
419 let replica_status_gid = self
420 .catalog()
421 .get_entry(&replica_status_item_id)
422 .latest_global_id();
423
424 let read_holds = self
426 .controller
427 .storage_collections
428 .acquire_read_holds(vec![replica_status_gid])
429 .expect("can't acquire read hold for mz_cluster_replica_status_history");
430 let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
431 read_hold
432 } else {
433 return BTreeSet::new();
436 };
437
438 let as_of = read_hold
439 .since()
440 .iter()
441 .next()
442 .cloned()
443 .expect("since should not be empty");
444
445 let mut replica_statuses_stream = self
446 .controller
447 .storage_collections
448 .snapshot_and_stream(replica_status_gid, as_of)
449 .await
450 .expect("can't read mz_cluster_replica_status_history");
451
452 let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
453
454 while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
455 if diff <= 0 {
457 continue;
458 }
459
460 let row = match source_data.0 {
462 Ok(row) => row,
463 Err(err) => {
464 tracing::error!(
467 collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
468 ?err,
469 "unexpected error in builtin collection"
470 );
471 continue;
472 }
473 };
474
475 let mut iter = row.into_iter();
476
477 let replica_id: ReplicaId = iter
478 .next()
479 .expect("missing replica_id")
480 .unwrap_str()
481 .parse()
482 .expect("must parse as replica ID");
483 let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
484 let status = iter
485 .next()
486 .expect("missing status")
487 .unwrap_str()
488 .to_string();
489 let reason_datum = iter.next().expect("missing reason");
490 let reason = if reason_datum.is_null() {
491 None
492 } else {
493 Some(reason_datum.unwrap_str().to_string())
494 };
495 let occurred_at = iter
496 .next()
497 .expect("missing occurred_at")
498 .unwrap_timestamptz();
499
500 if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
502 if Self::is_problematic_status(&status, reason.as_deref()) {
503 *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
504 }
505 }
506 }
507
508 let result = replica_problem_counts
510 .into_iter()
511 .filter_map(|(replica_id, count)| {
512 if count >= 3 {
513 tracing::info!(
514 "Detected problematic cluster replica {}: {} problematic events in last {:?}",
515 replica_id,
516 count,
517 Duration::from_millis(lookback_window)
518 );
519 Some(replica_id)
520 } else {
521 None
522 }
523 })
524 .collect();
525
526 drop(read_hold);
528
529 result
530 }
531
532 fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
535 if let Some(reason) = reason {
538 return reason == OfflineReason::OomKilled.to_string();
539 }
540
541 false
542 }
543}