mz_adapter/coord/
caught_up.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for checking whether clusters/collections are caught up during a 0dt
11//! deployment.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20    ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
21    WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35/// Context needed to check whether clusters/collections are caught up.
36#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38    /// A trigger that signals that all clusters/collections have been caught
39    /// up.
40    pub trigger: Trigger,
41    /// Collections to exclude from the caught up check.
42    ///
43    /// When a caught up check is performed as part of a 0dt upgrade, it makes sense to exclude
44    /// collections of newly added builtin objects, as these might not hydrate in read-only mode.
45    pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49    /// Checks that all clusters/collections are caught up. If so, this will
50    /// trigger `self.catchup_check.trigger`.
51    ///
52    /// This method is a no-op when the trigger has already been fired.
53    pub async fn maybe_check_caught_up(&mut self) {
54        let Some(ctx) = &self.caught_up_check else {
55            return;
56        };
57
58        let replica_frontier_item_id = self
59            .catalog()
60            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
61        let replica_frontier_gid = self
62            .catalog()
63            .get_entry(&replica_frontier_item_id)
64            .latest_global_id();
65
66        let live_frontiers = self
67            .controller
68            .storage_collections
69            .snapshot_latest(replica_frontier_gid)
70            .await
71            .expect("can't read mz_cluster_replica_frontiers");
72
73        let live_frontiers = live_frontiers
74            .into_iter()
75            .map(|row| {
76                let mut iter = row.into_iter();
77
78                let id: GlobalId = iter
79                    .next()
80                    .expect("missing object id")
81                    .unwrap_str()
82                    .parse()
83                    .expect("cannot parse id");
84                let replica_id = iter
85                    .next()
86                    .expect("missing replica id")
87                    .unwrap_str()
88                    .to_string();
89                let maybe_upper_ts = iter.next().expect("missing upper_ts");
90                // The timestamp has a total order, so there can be at
91                // most one entry in the upper frontier, which is this
92                // timestamp here. And NULL encodes the empty upper
93                // frontier.
94                let upper_frontier = if maybe_upper_ts.is_null() {
95                    Antichain::new()
96                } else {
97                    let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
98                    Antichain::from_elem(upper_ts)
99                };
100
101                (id, replica_id, upper_frontier)
102            })
103            .collect_vec();
104
105        // We care about each collection being hydrated on _some_
106        // replica. We don't check that at least one replica has all
107        // collections of that cluster hydrated.
108        let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
109            .into_iter()
110            .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
111            .into_grouping_map()
112            .fold(
113                Antichain::from_elem(Timestamp::minimum()),
114                |mut acc, _key, upper| {
115                    acc.join_assign(&upper);
116                    acc
117                },
118            )
119            .into_iter()
120            .collect();
121
122        tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
123
124        let allowed_lag =
125            WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
126        let allowed_lag: u64 = allowed_lag
127            .as_millis()
128            .try_into()
129            .expect("must fit into u64");
130
131        let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
132        let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
133
134        let now = self.now();
135
136        // Something might go wrong with querying the status collection, so we
137        // have an emergency flag for disabling it.
138        let replica_status_check_enabled =
139            ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
140
141        // Analyze replica statuses to detect crash-looping or OOM-looping replicas
142        let problematic_replicas = if replica_status_check_enabled {
143            self.analyze_replica_looping(now).await
144        } else {
145            BTreeSet::new()
146        };
147
148        let caught_up = self
149            .clusters_caught_up(
150                allowed_lag.into(),
151                cutoff.into(),
152                now.into(),
153                &live_collection_frontiers,
154                &ctx.exclude_collections,
155                &problematic_replicas,
156            )
157            .await;
158
159        tracing::info!(%caught_up, "checked caught-up status of collections");
160
161        if caught_up {
162            let ctx = self.caught_up_check.take().expect("known to exist");
163            ctx.trigger.fire();
164        }
165    }
166
167    /// Returns whether all clusters are considered caught-up.
168    ///
169    /// Informally, a cluster is considered caught-up if it is at least as healthy as its
170    /// counterpart in the leader environment. To determine that, we use the following rules:
171    ///
172    ///  (1) A cluster is caught-up if all non-transient, non-excluded collections installed on it
173    ///      are either caught-up or ignored.
174    ///  (2) A collection is caught-up when it is (a) hydrated and (b) its write frontier is within
175    ///      `allowed_lag` of the "live" frontier, the collection's frontier reported by the leader
176    ///      environment.
177    ///  (3) A collection is ignored if its "live" frontier is behind `now` by more than `cutoff`.
178    ///      Such a collection is unhealthy in the leader environment, so we don't care about its
179    ///      health in the read-only environment either.
180    ///  (4) On a cluster that is crash-looping, all collections are ignored.
181    ///
182    /// For this check, zero-replica clusters are always considered caught up. Their collections
183    /// would never normally be considered caught up but it's clearly intentional that they have no
184    /// replicas.
185    async fn clusters_caught_up(
186        &self,
187        allowed_lag: Timestamp,
188        cutoff: Timestamp,
189        now: Timestamp,
190        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
191        exclude_collections: &BTreeSet<GlobalId>,
192        problematic_replicas: &BTreeSet<ReplicaId>,
193    ) -> bool {
194        let mut result = true;
195        for cluster in self.catalog().clusters() {
196            let caught_up = self
197                .collections_caught_up(
198                    cluster,
199                    allowed_lag.clone(),
200                    cutoff.clone(),
201                    now.clone(),
202                    live_frontiers,
203                    exclude_collections,
204                    problematic_replicas,
205                )
206                .await;
207
208            let caught_up = caught_up.unwrap_or_else(|e| {
209                tracing::error!(
210                    "unexpected error while checking if cluster {} caught up: {e:#}",
211                    cluster.id
212                );
213                false
214            });
215
216            if !caught_up {
217                result = false;
218
219                // We continue with our loop instead of breaking out early, so
220                // that we log all non-caught up clusters.
221                tracing::info!("cluster {} is not caught up", cluster.id);
222            }
223        }
224
225        result
226    }
227
228    /// Returns whether the given cluster is considered caught-up.
229    ///
230    /// See [`Coordinator::clusters_caught_up`] for details.
231    async fn collections_caught_up(
232        &self,
233        cluster: &Cluster,
234        allowed_lag: Timestamp,
235        cutoff: Timestamp,
236        now: Timestamp,
237        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
238        exclude_collections: &BTreeSet<GlobalId>,
239        problematic_replicas: &BTreeSet<ReplicaId>,
240    ) -> Result<bool, anyhow::Error> {
241        if cluster.replicas().next().is_none() {
242            return Ok(true);
243        }
244
245        // Check if all replicas in this cluster are crash/OOM-looping. As long
246        // as there is at least one healthy replica, the cluster is okay-ish.
247        let cluster_has_only_problematic_replicas = cluster
248            .replicas()
249            .all(|replica| problematic_replicas.contains(&replica.replica_id));
250
251        enum CollectionType {
252            Storage,
253            Compute,
254        }
255
256        let mut all_caught_up = true;
257
258        let storage_frontiers = self
259            .controller
260            .storage
261            .active_ingestion_exports(cluster.id)
262            .copied()
263            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
264            .map(|id| {
265                let (_read_frontier, write_frontier) =
266                    self.controller.storage.collection_frontiers(id)?;
267                Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
268            });
269
270        let compute_frontiers = self
271            .controller
272            .compute
273            .collection_ids(cluster.id)?
274            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
275            .map(|id| {
276                let write_frontier = self
277                    .controller
278                    .compute
279                    .collection_frontiers(id, Some(cluster.id))?
280                    .write_frontier
281                    .to_owned();
282                Ok((id, write_frontier, CollectionType::Compute))
283            });
284
285        for res in itertools::chain(storage_frontiers, compute_frontiers) {
286            let (id, write_frontier, collection_type) = res?;
287            let live_write_frontier = match live_frontiers.get(&id) {
288                Some(frontier) => frontier,
289                None => {
290                    // The collection didn't previously exist, so consider
291                    // ourselves hydrated as long as our write_ts is > 0.
292                    tracing::info!(?write_frontier, "collection {id} not in live frontiers");
293                    if write_frontier.less_equal(&Timestamp::minimum()) {
294                        all_caught_up = false;
295                    }
296                    continue;
297                }
298            };
299
300            // We can't do comparisons and subtractions, so we bump up the live
301            // write frontier by the cutoff, and then compare that against
302            // `now`.
303            let live_write_frontier_plus_cutoff = live_write_frontier
304                .iter()
305                .map(|t| t.step_forward_by(&cutoff));
306            let live_write_frontier_plus_cutoff =
307                Antichain::from_iter(live_write_frontier_plus_cutoff);
308
309            let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
310
311            if beyond_all_hope && cluster_has_only_problematic_replicas {
312                tracing::info!(
313                    ?live_write_frontier,
314                    ?cutoff,
315                    ?now,
316                    "live write frontier of collection {id} is too far behind 'now'"
317                );
318                tracing::info!(
319                    "ALL replicas of cluster {} are crash/OOM-looping and it has at least one \
320                     collection that is too far behind 'now'; ignoring cluster for caught-up \
321                     checks",
322                    cluster.id
323                );
324                return Ok(true);
325            } else if beyond_all_hope {
326                tracing::info!(
327                    ?live_write_frontier,
328                    ?cutoff,
329                    ?now,
330                    "live write frontier of collection {id} is too far behind 'now'; \
331                     ignoring for caught-up checks"
332                );
333                continue;
334            }
335
336            // We can't do easy comparisons and subtractions, so we bump up the
337            // write frontier by the allowed lag, and then compare that against
338            // the write frontier.
339            let write_frontier_plus_allowed_lag = write_frontier
340                .iter()
341                .map(|t| t.step_forward_by(&allowed_lag));
342            let bumped_write_plus_allowed_lag =
343                Antichain::from_iter(write_frontier_plus_allowed_lag);
344
345            let within_lag =
346                PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
347
348            // This call is on the expensive side, because we have to do a call
349            // across a task/channel boundary, and our work competes with other
350            // things the compute/instance controller might be doing. But it's
351            // okay because we only do these hydration checks when in read-only
352            // mode, and only rarely.
353            let collection_hydrated = match collection_type {
354                CollectionType::Compute => {
355                    self.controller
356                        .compute
357                        .collection_hydrated(cluster.id, id)
358                        .await?
359                }
360                CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
361            };
362
363            // We don't expect collections to get hydrated, ingestions to be
364            // started, etc. when they are already at the empty write frontier.
365            if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
366                // This is a bit spammy, but log caught-up collections while we
367                // investigate why environments are cutting over but then a lot
368                // of compute collections are _not_ in fact hydrated on
369                // clusters.
370                tracing::info!(
371                    %id,
372                    %within_lag,
373                    %collection_hydrated,
374                    ?write_frontier,
375                    ?live_write_frontier,
376                    ?allowed_lag,
377                    %cluster.id,
378                    "collection is caught up");
379            } else {
380                // We are not within the allowed lag, or not hydrated!
381                //
382                // We continue with our loop instead of breaking out early, so
383                // that we log all non-caught-up replicas.
384                tracing::info!(
385                    %id,
386                    %within_lag,
387                    %collection_hydrated,
388                    ?write_frontier,
389                    ?live_write_frontier,
390                    ?allowed_lag,
391                    %cluster.id,
392                    "collection is not caught up"
393                );
394                all_caught_up = false;
395            }
396        }
397
398        Ok(all_caught_up)
399    }
400
401    /// Analyzes replica status history to detect replicas that are
402    /// crash-looping or OOM-looping.
403    ///
404    /// A replica is considered problematic if it has multiple OOM kills in a
405    /// short-ish window.
406    async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
407        // Look back 1 day for patterns.
408        let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
409            .as_millis()
410            .try_into()
411            .expect("fits into u64");
412        let min_timestamp = now.saturating_sub(lookback_window);
413        let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
414
415        // Get the replica status collection GlobalId
416        let replica_status_item_id = self
417            .catalog()
418            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
419        let replica_status_gid = self
420            .catalog()
421            .get_entry(&replica_status_item_id)
422            .latest_global_id();
423
424        // Acquire a read hold to determine the as_of timestamp for snapshot_and_stream
425        let read_holds = self
426            .controller
427            .storage_collections
428            .acquire_read_holds(vec![replica_status_gid])
429            .expect("can't acquire read hold for mz_cluster_replica_status_history");
430        let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
431            read_hold
432        } else {
433            // Collection is not readable anymore, but we return an empty set
434            // instead of panicing.
435            return BTreeSet::new();
436        };
437
438        let as_of = read_hold
439            .since()
440            .iter()
441            .next()
442            .cloned()
443            .expect("since should not be empty");
444
445        let mut replica_statuses_stream = self
446            .controller
447            .storage_collections
448            .snapshot_and_stream(replica_status_gid, as_of)
449            .await
450            .expect("can't read mz_cluster_replica_status_history");
451
452        let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
453
454        while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
455            // Only process inserts (positive diffs)
456            if diff <= 0 {
457                continue;
458            }
459
460            // Extract the Row from SourceData
461            let row = match source_data.0 {
462                Ok(row) => row,
463                Err(err) => {
464                    // This builtin collection shouldn't have errors, so we at
465                    // least log an error so that tests or sentry will notice.
466                    tracing::error!(
467                        collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
468                        ?err,
469                        "unexpected error in builtin collection"
470                    );
471                    continue;
472                }
473            };
474
475            let mut iter = row.into_iter();
476
477            let replica_id: ReplicaId = iter
478                .next()
479                .expect("missing replica_id")
480                .unwrap_str()
481                .parse()
482                .expect("must parse as replica ID");
483            let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
484            let status = iter
485                .next()
486                .expect("missing status")
487                .unwrap_str()
488                .to_string();
489            let reason_datum = iter.next().expect("missing reason");
490            let reason = if reason_datum.is_null() {
491                None
492            } else {
493                Some(reason_datum.unwrap_str().to_string())
494            };
495            let occurred_at = iter
496                .next()
497                .expect("missing occurred_at")
498                .unwrap_timestamptz();
499
500            // Only consider events within the time window and that are problematic
501            if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
502                if Self::is_problematic_status(&status, reason.as_deref()) {
503                    *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
504                }
505            }
506        }
507
508        // Filter to replicas with 3 or more problematic events.
509        let result = replica_problem_counts
510            .into_iter()
511            .filter_map(|(replica_id, count)| {
512                if count >= 3 {
513                    tracing::info!(
514                        "Detected problematic cluster replica {}: {} problematic events in last {:?}",
515                        replica_id,
516                        count,
517                        Duration::from_millis(lookback_window)
518                    );
519                    Some(replica_id)
520                } else {
521                    None
522                }
523            })
524            .collect();
525
526        // Explicitly keep the read hold alive until this point.
527        drop(read_hold);
528
529        result
530    }
531
532    /// Determines if a replica status indicates a problematic state that could
533    /// indicate looping.
534    fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
535        // For now, we only look at the reason, but we could change/expand this
536        // if/when needed.
537        if let Some(reason) = reason {
538            return reason == OfflineReason::OomKilled.to_string();
539        }
540
541        false
542    }
543}