mz_adapter/coord/
caught_up.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for checking whether clusters/collections are caught up during a 0dt
11//! deployment.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20    ENABLE_0DT_CAUGHT_UP_CHECK, ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK,
21    WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG, WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35/// Context needed to check whether clusters/collections are caught up.
36#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38    /// A trigger that signals that all clusters/collections have been caught
39    /// up.
40    pub trigger: Trigger,
41    /// Collections to exclude from the caught up check.
42    ///
43    /// When a caught up check is performed as part of a 0dt upgrade, it makes sense to exclude
44    /// collections of newly added builtin objects, as these might not hydrate in read-only mode.
45    pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49    /// Checks that all clusters/collections are caught up. If so, this will
50    /// trigger `self.catchup_check.trigger`.
51    ///
52    /// This method is a no-op when the trigger has already been fired.
53    pub async fn maybe_check_caught_up(&mut self) {
54        let enable_caught_up_check =
55            ENABLE_0DT_CAUGHT_UP_CHECK.get(self.catalog().system_config().dyncfgs());
56
57        if enable_caught_up_check {
58            self.maybe_check_caught_up_new().await
59        } else {
60            self.maybe_check_caught_up_legacy().await
61        }
62    }
63
64    async fn maybe_check_caught_up_new(&mut self) {
65        let Some(ctx) = &self.caught_up_check else {
66            return;
67        };
68
69        let replica_frontier_item_id = self
70            .catalog()
71            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
72        let replica_frontier_gid = self
73            .catalog()
74            .get_entry(&replica_frontier_item_id)
75            .latest_global_id();
76
77        let live_frontiers = self
78            .controller
79            .storage_collections
80            .snapshot_latest(replica_frontier_gid)
81            .await
82            .expect("can't read mz_cluster_replica_frontiers");
83
84        let live_frontiers = live_frontiers
85            .into_iter()
86            .map(|row| {
87                let mut iter = row.into_iter();
88
89                let id: GlobalId = iter
90                    .next()
91                    .expect("missing object id")
92                    .unwrap_str()
93                    .parse()
94                    .expect("cannot parse id");
95                let replica_id = iter
96                    .next()
97                    .expect("missing replica id")
98                    .unwrap_str()
99                    .to_string();
100                let maybe_upper_ts = iter.next().expect("missing upper_ts");
101                // The timestamp has a total order, so there can be at
102                // most one entry in the upper frontier, which is this
103                // timestamp here. And NULL encodes the empty upper
104                // frontier.
105                let upper_frontier = if maybe_upper_ts.is_null() {
106                    Antichain::new()
107                } else {
108                    let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
109                    Antichain::from_elem(upper_ts)
110                };
111
112                (id, replica_id, upper_frontier)
113            })
114            .collect_vec();
115
116        // We care about each collection being hydrated on _some_
117        // replica. We don't check that at least one replica has all
118        // collections of that cluster hydrated.
119        let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
120            .into_iter()
121            .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
122            .into_grouping_map()
123            .fold(
124                Antichain::from_elem(Timestamp::minimum()),
125                |mut acc, _key, upper| {
126                    acc.join_assign(&upper);
127                    acc
128                },
129            )
130            .into_iter()
131            .collect();
132
133        tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
134
135        let allowed_lag =
136            WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
137        let allowed_lag: u64 = allowed_lag
138            .as_millis()
139            .try_into()
140            .expect("must fit into u64");
141
142        let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
143        let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
144
145        let now = self.now();
146
147        // Something might go wrong with querying the status collection, so we
148        // have an emergency flag for disabling it.
149        let replica_status_check_enabled =
150            ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
151
152        // Analyze replica statuses to detect crash-looping or OOM-looping replicas
153        let problematic_replicas = if replica_status_check_enabled {
154            self.analyze_replica_looping(now).await
155        } else {
156            BTreeSet::new()
157        };
158
159        let compute_caught_up = self
160            .clusters_caught_up(
161                allowed_lag.into(),
162                cutoff.into(),
163                now.into(),
164                &live_collection_frontiers,
165                &ctx.exclude_collections,
166                &problematic_replicas,
167            )
168            .await;
169
170        tracing::info!(%compute_caught_up, "checked caught-up status of collections");
171
172        if compute_caught_up {
173            let ctx = self.caught_up_check.take().expect("known to exist");
174            ctx.trigger.fire();
175        }
176    }
177
178    /// Returns `true` if all non-transient, non-excluded collections have their write
179    /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
180    /// reported in `live_frontiers`. The "live" frontiers are frontiers as
181    /// reported by a currently running `environmentd` deployment, during a 0dt
182    /// upgrade.
183    ///
184    /// Collections whose write frontier is behind `now` by more than the cutoff
185    /// are ignored.
186    ///
187    /// For this check, zero-replica clusters are always considered caught up.
188    /// Their collections would never normally be considered caught up but it's
189    /// clearly intentional that they have no replicas.
190    async fn clusters_caught_up(
191        &self,
192        allowed_lag: Timestamp,
193        cutoff: Timestamp,
194        now: Timestamp,
195        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
196        exclude_collections: &BTreeSet<GlobalId>,
197        problematic_replicas: &BTreeSet<ReplicaId>,
198    ) -> bool {
199        let mut result = true;
200        for cluster in self.catalog().clusters() {
201            let caught_up = self
202                .collections_caught_up(
203                    cluster,
204                    allowed_lag.clone(),
205                    cutoff.clone(),
206                    now.clone(),
207                    live_frontiers,
208                    exclude_collections,
209                    problematic_replicas,
210                )
211                .await;
212
213            let caught_up = caught_up.unwrap_or_else(|e| {
214                tracing::error!(
215                    "unexpected error while checking if cluster {} caught up: {e:#}",
216                    cluster.id
217                );
218                false
219            });
220
221            if !caught_up {
222                result = false;
223
224                // We continue with our loop instead of breaking out early, so
225                // that we log all non-caught up clusters.
226                tracing::info!("cluster {} is not caught up", cluster.id);
227            }
228        }
229
230        result
231    }
232
233    /// Returns `true` if all non-transient, non-excluded collections have their write
234    /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
235    /// reported in `live_frontiers`. The "live" frontiers are frontiers as
236    /// reported by a currently running `environmentd` deployment, during a 0dt
237    /// upgrade.
238    ///
239    /// Collections whose write frontier is behind `now` by more than the cutoff
240    /// are ignored.
241    ///
242    /// This also returns `true` in case this cluster does not have any
243    /// replicas.
244    async fn collections_caught_up(
245        &self,
246        cluster: &Cluster,
247        allowed_lag: Timestamp,
248        cutoff: Timestamp,
249        now: Timestamp,
250        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
251        exclude_collections: &BTreeSet<GlobalId>,
252        problematic_replicas: &BTreeSet<ReplicaId>,
253    ) -> Result<bool, anyhow::Error> {
254        if cluster.replicas().next().is_none() {
255            return Ok(true);
256        }
257
258        // Check if all replicas in this cluster are crash/OOM-looping. As long
259        // as there is at least  one healthy replica, the cluster is okay-ish.
260        let cluster_has_only_problematic_replicas = cluster
261            .replicas()
262            .all(|replica| problematic_replicas.contains(&replica.replica_id));
263
264        enum CollectionType {
265            Storage,
266            Compute,
267        }
268
269        let mut all_caught_up = true;
270
271        let storage_frontiers = self
272            .controller
273            .storage
274            .active_ingestions(cluster.id)
275            .copied()
276            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
277            .map(|id| {
278                let (_read_frontier, write_frontier) =
279                    self.controller.storage.collection_frontiers(id)?;
280                Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
281            });
282
283        let compute_frontiers = self
284            .controller
285            .compute
286            .collection_ids(cluster.id)?
287            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
288            .map(|id| {
289                let write_frontier = self
290                    .controller
291                    .compute
292                    .collection_frontiers(id, Some(cluster.id))?
293                    .write_frontier
294                    .to_owned();
295                Ok((id, write_frontier, CollectionType::Compute))
296            });
297
298        for res in itertools::chain(storage_frontiers, compute_frontiers) {
299            let (id, write_frontier, collection_type) = res?;
300            let live_write_frontier = match live_frontiers.get(&id) {
301                Some(frontier) => frontier,
302                None => {
303                    // The collection didn't previously exist, so consider
304                    // ourselves hydrated as long as our write_ts is > 0.
305                    tracing::info!(?write_frontier, "collection {id} not in live frontiers");
306                    if write_frontier.less_equal(&Timestamp::minimum()) {
307                        all_caught_up = false;
308                    }
309                    continue;
310                }
311            };
312
313            // We can't do comparisons and subtractions, so we bump up the live
314            // write frontier by the cutoff, and then compare that against
315            // `now`.
316            let live_write_frontier_plus_cutoff = live_write_frontier
317                .iter()
318                .map(|t| t.step_forward_by(&cutoff));
319            let live_write_frontier_plus_cutoff =
320                Antichain::from_iter(live_write_frontier_plus_cutoff);
321
322            let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
323
324            if beyond_all_hope && cluster_has_only_problematic_replicas {
325                tracing::info!(
326                    ?live_write_frontier,
327                    ?cutoff,
328                    ?now,
329                    "live write frontier of collection {id} is too far behind 'now'"
330                );
331                tracing::info!(
332                    "ALL replicas of cluster {} are crash/OOM-looping and it has at least one collection that is too far behind 'now', ignoring cluster for caught-up checks",
333                    cluster.id
334                );
335                return Ok(true);
336            }
337
338            // We can't do easy comparisons and subtractions, so we bump up the
339            // write frontier by the allowed lag, and then compare that against
340            // the write frontier.
341            let write_frontier_plus_allowed_lag = write_frontier
342                .iter()
343                .map(|t| t.step_forward_by(&allowed_lag));
344            let bumped_write_plus_allowed_lag =
345                Antichain::from_iter(write_frontier_plus_allowed_lag);
346
347            let within_lag =
348                PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
349
350            // This call is on the expensive side, because we have to do a call
351            // across a task/channel boundary, and our work competes with other
352            // things the compute/instance controller might be doing. But it's
353            // okay because we only do these hydration checks when in read-only
354            // mode, and only rarely.
355            let collection_hydrated = match collection_type {
356                CollectionType::Compute => {
357                    self.controller
358                        .compute
359                        .collection_hydrated(cluster.id, id)
360                        .await?
361                }
362                CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
363            };
364
365            // We don't expect collections to get hydrated, ingestions to be
366            // started, etc. when they are already at the empty write frontier.
367            if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
368                // This is a bit spammy, but log caught-up collections while we
369                // investigate why environments are cutting over but then a lot
370                // of compute collections are _not_ in fact hydrated on
371                // clusters.
372                tracing::info!(
373                    %id,
374                    %within_lag,
375                    %collection_hydrated,
376                    ?write_frontier,
377                    ?live_write_frontier,
378                    ?allowed_lag,
379                    %cluster.id,
380                    "collection is caught up");
381            } else {
382                // We are not within the allowed lag, or not hydrated!
383                //
384                // We continue with our loop instead of breaking out early, so
385                // that we log all non-caught-up replicas.
386                tracing::info!(
387                    %id,
388                    %within_lag,
389                    %collection_hydrated,
390                    ?write_frontier,
391                    ?live_write_frontier,
392                    ?allowed_lag,
393                    %cluster.id,
394                    "collection is not caught up"
395                );
396                all_caught_up = false;
397            }
398        }
399
400        Ok(all_caught_up)
401    }
402
403    async fn maybe_check_caught_up_legacy(&mut self) {
404        let Some(ctx) = &self.caught_up_check else {
405            return;
406        };
407
408        let compute_hydrated = self
409            .controller
410            .compute
411            .clusters_hydrated(&ctx.exclude_collections)
412            .await;
413        tracing::info!(%compute_hydrated, "checked hydration status of clusters");
414
415        if compute_hydrated {
416            let ctx = self.caught_up_check.take().expect("known to exist");
417            ctx.trigger.fire();
418        }
419    }
420
421    /// Analyzes replica status history to detect replicas that are
422    /// crash-looping or OOM-looping.
423    ///
424    /// A replica is considered problematic if it has multiple OOM kills in a
425    /// short-ish window.
426    async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
427        // Look back 1 day for patterns.
428        let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
429            .as_millis()
430            .try_into()
431            .expect("fits into u64");
432        let min_timestamp = now.saturating_sub(lookback_window);
433        let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
434
435        // Get the replica status collection GlobalId
436        let replica_status_item_id = self
437            .catalog()
438            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
439        let replica_status_gid = self
440            .catalog()
441            .get_entry(&replica_status_item_id)
442            .latest_global_id();
443
444        // Acquire a read hold to determine the as_of timestamp for snapshot_and_stream
445        let read_holds = self
446            .controller
447            .storage_collections
448            .acquire_read_holds(vec![replica_status_gid])
449            .expect("can't acquire read hold for mz_cluster_replica_status_history");
450        let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
451            read_hold
452        } else {
453            // Collection is not readable anymore, but we return an empty set
454            // instead of panicing.
455            return BTreeSet::new();
456        };
457
458        let as_of = read_hold
459            .since()
460            .iter()
461            .next()
462            .cloned()
463            .expect("since should not be empty");
464
465        let mut replica_statuses_stream = self
466            .controller
467            .storage_collections
468            .snapshot_and_stream(replica_status_gid, as_of)
469            .await
470            .expect("can't read mz_cluster_replica_status_history");
471
472        let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
473
474        while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
475            // Only process inserts (positive diffs)
476            if diff <= 0 {
477                continue;
478            }
479
480            // Extract the Row from SourceData
481            let row = match source_data.0 {
482                Ok(row) => row,
483                Err(err) => {
484                    // This builtin collection shouldn't have errors, so we at
485                    // least log an error so that tests or sentry will notice.
486                    tracing::error!(
487                        collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
488                        ?err,
489                        "unexpected error in builtin collection"
490                    );
491                    continue;
492                }
493            };
494
495            let mut iter = row.into_iter();
496
497            let replica_id: ReplicaId = iter
498                .next()
499                .expect("missing replica_id")
500                .unwrap_str()
501                .parse()
502                .expect("must parse as replica ID");
503            let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
504            let status = iter
505                .next()
506                .expect("missing status")
507                .unwrap_str()
508                .to_string();
509            let reason_datum = iter.next().expect("missing reason");
510            let reason = if reason_datum.is_null() {
511                None
512            } else {
513                Some(reason_datum.unwrap_str().to_string())
514            };
515            let occurred_at = iter
516                .next()
517                .expect("missing occurred_at")
518                .unwrap_timestamptz();
519
520            // Only consider events within the time window and that are problematic
521            if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
522                if Self::is_problematic_status(&status, reason.as_deref()) {
523                    *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
524                }
525            }
526        }
527
528        // Filter to replicas with 3 or more problematic events.
529        let result = replica_problem_counts
530            .into_iter()
531            .filter_map(|(replica_id, count)| {
532                if count >= 3 {
533                    tracing::info!(
534                        "Detected problematic cluster replica {}: {} problematic events in last {:?}",
535                        replica_id,
536                        count,
537                        Duration::from_millis(lookback_window)
538                    );
539                    Some(replica_id)
540                } else {
541                    None
542                }
543            })
544            .collect();
545
546        // Explicitly keep the read hold alive until this point.
547        drop(read_hold);
548
549        result
550    }
551
552    /// Determines if a replica status indicates a problematic state that could
553    /// indicate looping.
554    fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
555        // For now, we only look at the reason, but we could change/expand this
556        // if/when needed.
557        if let Some(reason) = reason {
558            return reason == OfflineReason::OomKilled.to_string();
559        }
560
561        false
562    }
563}