Skip to main content

mz_adapter/coord/
caught_up.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for checking whether clusters/collections are caught up during a 0dt
11//! deployment.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20    ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
21    WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35/// Context needed to check whether clusters/collections are caught up.
36#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38    /// A trigger that signals that all clusters/collections have been caught
39    /// up.
40    pub trigger: Trigger,
41    /// Collections to exclude from the caught up check.
42    ///
43    /// When a caught up check is performed as part of a 0dt upgrade, it makes sense to exclude
44    /// collections of newly added builtin objects, as these might not hydrate in read-only mode.
45    pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49    /// Checks that all clusters/collections are caught up. If so, this will
50    /// trigger `self.catchup_check.trigger`.
51    ///
52    /// This method is a no-op when the trigger has already been fired.
53    pub async fn maybe_check_caught_up(&mut self) {
54        let Some(ctx) = &self.caught_up_check else {
55            return;
56        };
57
58        let replica_frontier_item_id = self
59            .catalog()
60            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
61        let replica_frontier_gid = self
62            .catalog()
63            .get_entry(&replica_frontier_item_id)
64            .latest_global_id();
65
66        // `snapshot_latest` requires that the collection consolidates to a
67        // set. `mz_cluster_replica_frontiers` is a controller-managed builtin
68        // written with ±1 diffs, so it satisfies that invariant.
69        let live_frontiers = self
70            .controller
71            .storage_collections
72            .snapshot_latest(replica_frontier_gid)
73            .await
74            .expect("can't read mz_cluster_replica_frontiers");
75
76        let live_frontiers = live_frontiers
77            .into_iter()
78            .map(|row| {
79                let mut iter = row.into_iter();
80
81                let id: GlobalId = iter
82                    .next()
83                    .expect("missing object id")
84                    .unwrap_str()
85                    .parse()
86                    .expect("cannot parse id");
87                let replica_id = iter
88                    .next()
89                    .expect("missing replica id")
90                    .unwrap_str()
91                    .to_string();
92                let maybe_upper_ts = iter.next().expect("missing upper_ts");
93                // The timestamp has a total order, so there can be at
94                // most one entry in the upper frontier, which is this
95                // timestamp here. And NULL encodes the empty upper
96                // frontier.
97                let upper_frontier = if maybe_upper_ts.is_null() {
98                    Antichain::new()
99                } else {
100                    let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
101                    Antichain::from_elem(upper_ts)
102                };
103
104                (id, replica_id, upper_frontier)
105            })
106            .collect_vec();
107
108        // We care about each collection being hydrated on _some_
109        // replica. We don't check that at least one replica has all
110        // collections of that cluster hydrated.
111        let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
112            .into_iter()
113            .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
114            .into_grouping_map()
115            .fold(
116                Antichain::from_elem(Timestamp::minimum()),
117                |mut acc, _key, upper| {
118                    acc.join_assign(&upper);
119                    acc
120                },
121            )
122            .into_iter()
123            .collect();
124
125        tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
126
127        let allowed_lag =
128            WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
129        let allowed_lag: u64 = allowed_lag
130            .as_millis()
131            .try_into()
132            .expect("must fit into u64");
133
134        let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
135        let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
136
137        let now = self.now();
138
139        // Something might go wrong with querying the status collection, so we
140        // have an emergency flag for disabling it.
141        let replica_status_check_enabled =
142            ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
143
144        // Analyze replica statuses to detect crash-looping or OOM-looping replicas
145        let problematic_replicas = if replica_status_check_enabled {
146            self.analyze_replica_looping(now).await
147        } else {
148            BTreeSet::new()
149        };
150
151        let caught_up = self
152            .clusters_caught_up(
153                allowed_lag.into(),
154                cutoff.into(),
155                now.into(),
156                &live_collection_frontiers,
157                &ctx.exclude_collections,
158                &problematic_replicas,
159            )
160            .await;
161
162        tracing::info!(%caught_up, "checked caught-up status of collections");
163
164        if caught_up {
165            let ctx = self.caught_up_check.take().expect("known to exist");
166            ctx.trigger.fire();
167        }
168    }
169
170    /// Returns whether all clusters are considered caught-up.
171    ///
172    /// Informally, a cluster is considered caught-up if it is at least as healthy as its
173    /// counterpart in the leader environment. To determine that, we use the following rules:
174    ///
175    ///  (1) A cluster is caught-up if all non-transient, non-excluded collections installed on it
176    ///      are either caught-up or ignored.
177    ///  (2) A collection is caught-up when it is (a) hydrated and (b) its write frontier is within
178    ///      `allowed_lag` of the "live" frontier, the collection's frontier reported by the leader
179    ///      environment.
180    ///  (3) A collection is ignored if its "live" frontier is behind `now` by more than `cutoff`.
181    ///      Such a collection is unhealthy in the leader environment, so we don't care about its
182    ///      health in the read-only environment either.
183    ///  (4) On a cluster that is crash-looping, all collections are ignored.
184    ///
185    /// For this check, zero-replica clusters are always considered caught up. Their collections
186    /// would never normally be considered caught up but it's clearly intentional that they have no
187    /// replicas.
188    async fn clusters_caught_up(
189        &self,
190        allowed_lag: Timestamp,
191        cutoff: Timestamp,
192        now: Timestamp,
193        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
194        exclude_collections: &BTreeSet<GlobalId>,
195        problematic_replicas: &BTreeSet<ReplicaId>,
196    ) -> bool {
197        let mut result = true;
198        for cluster in self.catalog().clusters() {
199            let caught_up = self
200                .collections_caught_up(
201                    cluster,
202                    allowed_lag.clone(),
203                    cutoff.clone(),
204                    now.clone(),
205                    live_frontiers,
206                    exclude_collections,
207                    problematic_replicas,
208                )
209                .await;
210
211            let caught_up = caught_up.unwrap_or_else(|e| {
212                tracing::error!(
213                    "unexpected error while checking if cluster {} caught up: {e:#}",
214                    cluster.id
215                );
216                false
217            });
218
219            if !caught_up {
220                result = false;
221
222                // We continue with our loop instead of breaking out early, so
223                // that we log all non-caught up clusters.
224                tracing::info!("cluster {} is not caught up", cluster.id);
225            }
226        }
227
228        result
229    }
230
231    /// Returns whether the given cluster is considered caught-up.
232    ///
233    /// See [`Coordinator::clusters_caught_up`] for details.
234    async fn collections_caught_up(
235        &self,
236        cluster: &Cluster,
237        allowed_lag: Timestamp,
238        cutoff: Timestamp,
239        now: Timestamp,
240        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
241        exclude_collections: &BTreeSet<GlobalId>,
242        problematic_replicas: &BTreeSet<ReplicaId>,
243    ) -> Result<bool, anyhow::Error> {
244        if cluster.replicas().next().is_none() {
245            return Ok(true);
246        }
247
248        // Check if all replicas in this cluster are crash/OOM-looping. As long
249        // as there is at least one healthy replica, the cluster is okay-ish.
250        let cluster_has_only_problematic_replicas = cluster
251            .replicas()
252            .all(|replica| problematic_replicas.contains(&replica.replica_id));
253
254        enum CollectionType {
255            Storage,
256            Compute,
257        }
258
259        let mut all_caught_up = true;
260
261        let storage_frontiers = self
262            .controller
263            .storage
264            .active_ingestion_exports(cluster.id)
265            .copied()
266            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
267            .map(|id| {
268                let (_read_frontier, write_frontier) =
269                    self.controller.storage.collection_frontiers(id)?;
270                Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
271            });
272
273        let compute_frontiers = self
274            .controller
275            .compute
276            .collection_ids(cluster.id)?
277            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
278            .map(|id| {
279                let write_frontier = self
280                    .controller
281                    .compute
282                    .collection_frontiers(id, Some(cluster.id))?
283                    .write_frontier
284                    .to_owned();
285                Ok((id, write_frontier, CollectionType::Compute))
286            });
287
288        for res in itertools::chain(storage_frontiers, compute_frontiers) {
289            let (id, write_frontier, collection_type) = res?;
290            let live_write_frontier = match live_frontiers.get(&id) {
291                Some(frontier) => frontier,
292                None => {
293                    // The collection didn't previously exist, so consider
294                    // ourselves hydrated as long as our write_ts is > 0.
295                    tracing::info!(?write_frontier, "collection {id} not in live frontiers");
296                    if write_frontier.less_equal(&Timestamp::minimum()) {
297                        all_caught_up = false;
298                    }
299                    continue;
300                }
301            };
302
303            // We can't do comparisons and subtractions, so we bump up the live
304            // write frontier by the cutoff, and then compare that against
305            // `now`.
306            let live_write_frontier_plus_cutoff = live_write_frontier
307                .iter()
308                .map(|t| t.step_forward_by(&cutoff));
309            let live_write_frontier_plus_cutoff =
310                Antichain::from_iter(live_write_frontier_plus_cutoff);
311
312            let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
313
314            if beyond_all_hope && cluster_has_only_problematic_replicas {
315                tracing::info!(
316                    ?live_write_frontier,
317                    ?cutoff,
318                    ?now,
319                    "live write frontier of collection {id} is too far behind 'now'"
320                );
321                tracing::info!(
322                    "ALL replicas of cluster {} are crash/OOM-looping and it has at least one \
323                     collection that is too far behind 'now'; ignoring cluster for caught-up \
324                     checks",
325                    cluster.id
326                );
327                return Ok(true);
328            } else if beyond_all_hope {
329                tracing::info!(
330                    ?live_write_frontier,
331                    ?cutoff,
332                    ?now,
333                    "live write frontier of collection {id} is too far behind 'now'; \
334                     ignoring for caught-up checks"
335                );
336                continue;
337            }
338
339            // We can't do easy comparisons and subtractions, so we bump up the
340            // write frontier by the allowed lag, and then compare that against
341            // the write frontier.
342            let write_frontier_plus_allowed_lag = write_frontier
343                .iter()
344                .map(|t| t.step_forward_by(&allowed_lag));
345            let bumped_write_plus_allowed_lag =
346                Antichain::from_iter(write_frontier_plus_allowed_lag);
347
348            let within_lag =
349                PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
350
351            // This call is on the expensive side, because we have to do a call
352            // across a task/channel boundary, and our work competes with other
353            // things the compute/instance controller might be doing. But it's
354            // okay because we only do these hydration checks when in read-only
355            // mode, and only rarely.
356            let collection_hydrated = match collection_type {
357                CollectionType::Compute => {
358                    self.controller
359                        .compute
360                        .collection_hydrated(cluster.id, id)
361                        .await?
362                }
363                CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
364            };
365
366            // We don't expect collections to get hydrated, ingestions to be
367            // started, etc. when they are already at the empty write frontier.
368            if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
369                // This is a bit spammy, but log caught-up collections while we
370                // investigate why environments are cutting over but then a lot
371                // of compute collections are _not_ in fact hydrated on
372                // clusters.
373                tracing::info!(
374                    %id,
375                    %within_lag,
376                    %collection_hydrated,
377                    ?write_frontier,
378                    ?live_write_frontier,
379                    ?allowed_lag,
380                    %cluster.id,
381                    "collection is caught up");
382            } else {
383                // We are not within the allowed lag, or not hydrated!
384                //
385                // We continue with our loop instead of breaking out early, so
386                // that we log all non-caught-up replicas.
387                tracing::info!(
388                    %id,
389                    %within_lag,
390                    %collection_hydrated,
391                    ?write_frontier,
392                    ?live_write_frontier,
393                    ?allowed_lag,
394                    %cluster.id,
395                    "collection is not caught up"
396                );
397                all_caught_up = false;
398            }
399        }
400
401        Ok(all_caught_up)
402    }
403
404    /// Analyzes replica status history to detect replicas that are
405    /// crash-looping or OOM-looping.
406    ///
407    /// A replica is considered problematic if it has multiple OOM kills in a
408    /// short-ish window.
409    async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
410        // Look back 1 day for patterns.
411        let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
412            .as_millis()
413            .try_into()
414            .expect("fits into u64");
415        let min_timestamp = now.saturating_sub(lookback_window);
416        let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
417
418        // Get the replica status collection GlobalId
419        let replica_status_item_id = self
420            .catalog()
421            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
422        let replica_status_gid = self
423            .catalog()
424            .get_entry(&replica_status_item_id)
425            .latest_global_id();
426
427        // Acquire a read hold to determine the as_of timestamp for snapshot_and_stream
428        let read_holds = self
429            .controller
430            .storage_collections
431            .acquire_read_holds(vec![replica_status_gid])
432            .expect("can't acquire read hold for mz_cluster_replica_status_history");
433        let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
434            read_hold
435        } else {
436            // Collection is not readable anymore, but we return an empty set
437            // instead of panicing.
438            return BTreeSet::new();
439        };
440
441        let as_of = read_hold
442            .since()
443            .iter()
444            .next()
445            .cloned()
446            .expect("since should not be empty");
447
448        let mut replica_statuses_stream = self
449            .controller
450            .storage_collections
451            .snapshot_and_stream(replica_status_gid, as_of)
452            .await
453            .expect("can't read mz_cluster_replica_status_history");
454
455        let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
456
457        while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
458            // Only process inserts (positive diffs)
459            if diff <= 0 {
460                continue;
461            }
462
463            // Extract the Row from SourceData
464            let row = match source_data.0 {
465                Ok(row) => row,
466                Err(err) => {
467                    // This builtin collection shouldn't have errors, so we at
468                    // least log an error so that tests or sentry will notice.
469                    tracing::error!(
470                        collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
471                        ?err,
472                        "unexpected error in builtin collection"
473                    );
474                    continue;
475                }
476            };
477
478            let mut iter = row.into_iter();
479
480            let replica_id: ReplicaId = iter
481                .next()
482                .expect("missing replica_id")
483                .unwrap_str()
484                .parse()
485                .expect("must parse as replica ID");
486            let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
487            let status = iter
488                .next()
489                .expect("missing status")
490                .unwrap_str()
491                .to_string();
492            let reason_datum = iter.next().expect("missing reason");
493            let reason = if reason_datum.is_null() {
494                None
495            } else {
496                Some(reason_datum.unwrap_str().to_string())
497            };
498            let occurred_at = iter
499                .next()
500                .expect("missing occurred_at")
501                .unwrap_timestamptz();
502
503            // Only consider events within the time window and that are problematic
504            if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
505                if Self::is_problematic_status(&status, reason.as_deref()) {
506                    *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
507                }
508            }
509        }
510
511        // Filter to replicas with 3 or more problematic events.
512        let result = replica_problem_counts
513            .into_iter()
514            .filter_map(|(replica_id, count)| {
515                if count >= 3 {
516                    tracing::info!(
517                        "Detected problematic cluster replica {}: {} problematic events in last {:?}",
518                        replica_id,
519                        count,
520                        Duration::from_millis(lookback_window)
521                    );
522                    Some(replica_id)
523                } else {
524                    None
525                }
526            })
527            .collect();
528
529        // Explicitly keep the read hold alive until this point.
530        drop(read_hold);
531
532        result
533    }
534
535    /// Determines if a replica status indicates a problematic state that could
536    /// indicate looping.
537    fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
538        // For now, we only look at the reason, but we could change/expand this
539        // if/when needed.
540        if let Some(reason) = reason {
541            return reason == OfflineReason::OomKilled.to_string();
542        }
543
544        false
545    }
546}