mz_adapter/coord/
caught_up.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for checking whether clusters/collections are caught up during a 0dt
11//! deployment.
12
13use std::collections::{BTreeMap, BTreeSet};
14use std::time::Duration;
15
16use differential_dataflow::lattice::Lattice as _;
17use futures::StreamExt;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::{
20    ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
21    WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
22};
23use mz_catalog::builtin::{MZ_CLUSTER_REPLICA_FRONTIERS, MZ_CLUSTER_REPLICA_STATUS_HISTORY};
24use mz_catalog::memory::objects::Cluster;
25use mz_controller_types::ReplicaId;
26use mz_orchestrator::OfflineReason;
27use mz_ore::channel::trigger::Trigger;
28use mz_ore::now::EpochMillis;
29use mz_repr::{GlobalId, Timestamp};
30use timely::PartialOrder;
31use timely::progress::{Antichain, Timestamp as _};
32
33use crate::coord::Coordinator;
34
35/// Context needed to check whether clusters/collections are caught up.
36#[derive(Debug)]
37pub struct CaughtUpCheckContext {
38    /// A trigger that signals that all clusters/collections have been caught
39    /// up.
40    pub trigger: Trigger,
41    /// Collections to exclude from the caught up check.
42    ///
43    /// When a caught up check is performed as part of a 0dt upgrade, it makes sense to exclude
44    /// collections of newly added builtin objects, as these might not hydrate in read-only mode.
45    pub exclude_collections: BTreeSet<GlobalId>,
46}
47
48impl Coordinator {
49    /// Checks that all clusters/collections are caught up. If so, this will
50    /// trigger `self.catchup_check.trigger`.
51    ///
52    /// This method is a no-op when the trigger has already been fired.
53    pub async fn maybe_check_caught_up(&mut self) {
54        let Some(ctx) = &self.caught_up_check else {
55            return;
56        };
57
58        let replica_frontier_item_id = self
59            .catalog()
60            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
61        let replica_frontier_gid = self
62            .catalog()
63            .get_entry(&replica_frontier_item_id)
64            .latest_global_id();
65
66        let live_frontiers = self
67            .controller
68            .storage_collections
69            .snapshot_latest(replica_frontier_gid)
70            .await
71            .expect("can't read mz_cluster_replica_frontiers");
72
73        let live_frontiers = live_frontiers
74            .into_iter()
75            .map(|row| {
76                let mut iter = row.into_iter();
77
78                let id: GlobalId = iter
79                    .next()
80                    .expect("missing object id")
81                    .unwrap_str()
82                    .parse()
83                    .expect("cannot parse id");
84                let replica_id = iter
85                    .next()
86                    .expect("missing replica id")
87                    .unwrap_str()
88                    .to_string();
89                let maybe_upper_ts = iter.next().expect("missing upper_ts");
90                // The timestamp has a total order, so there can be at
91                // most one entry in the upper frontier, which is this
92                // timestamp here. And NULL encodes the empty upper
93                // frontier.
94                let upper_frontier = if maybe_upper_ts.is_null() {
95                    Antichain::new()
96                } else {
97                    let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
98                    Antichain::from_elem(upper_ts)
99                };
100
101                (id, replica_id, upper_frontier)
102            })
103            .collect_vec();
104
105        // We care about each collection being hydrated on _some_
106        // replica. We don't check that at least one replica has all
107        // collections of that cluster hydrated.
108        let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
109            .into_iter()
110            .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
111            .into_grouping_map()
112            .fold(
113                Antichain::from_elem(Timestamp::minimum()),
114                |mut acc, _key, upper| {
115                    acc.join_assign(&upper);
116                    acc
117                },
118            )
119            .into_iter()
120            .collect();
121
122        tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
123
124        let allowed_lag =
125            WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
126        let allowed_lag: u64 = allowed_lag
127            .as_millis()
128            .try_into()
129            .expect("must fit into u64");
130
131        let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
132        let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
133
134        let now = self.now();
135
136        // Something might go wrong with querying the status collection, so we
137        // have an emergency flag for disabling it.
138        let replica_status_check_enabled =
139            ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK.get(self.catalog().system_config().dyncfgs());
140
141        // Analyze replica statuses to detect crash-looping or OOM-looping replicas
142        let problematic_replicas = if replica_status_check_enabled {
143            self.analyze_replica_looping(now).await
144        } else {
145            BTreeSet::new()
146        };
147
148        let compute_caught_up = self
149            .clusters_caught_up(
150                allowed_lag.into(),
151                cutoff.into(),
152                now.into(),
153                &live_collection_frontiers,
154                &ctx.exclude_collections,
155                &problematic_replicas,
156            )
157            .await;
158
159        tracing::info!(%compute_caught_up, "checked caught-up status of collections");
160
161        if compute_caught_up {
162            let ctx = self.caught_up_check.take().expect("known to exist");
163            ctx.trigger.fire();
164        }
165    }
166
167    /// Returns `true` if all non-transient, non-excluded collections have their write
168    /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
169    /// reported in `live_frontiers`. The "live" frontiers are frontiers as
170    /// reported by a currently running `environmentd` deployment, during a 0dt
171    /// upgrade.
172    ///
173    /// Collections whose write frontier is behind `now` by more than the cutoff
174    /// are ignored.
175    ///
176    /// For this check, zero-replica clusters are always considered caught up.
177    /// Their collections would never normally be considered caught up but it's
178    /// clearly intentional that they have no replicas.
179    async fn clusters_caught_up(
180        &self,
181        allowed_lag: Timestamp,
182        cutoff: Timestamp,
183        now: Timestamp,
184        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
185        exclude_collections: &BTreeSet<GlobalId>,
186        problematic_replicas: &BTreeSet<ReplicaId>,
187    ) -> bool {
188        let mut result = true;
189        for cluster in self.catalog().clusters() {
190            let caught_up = self
191                .collections_caught_up(
192                    cluster,
193                    allowed_lag.clone(),
194                    cutoff.clone(),
195                    now.clone(),
196                    live_frontiers,
197                    exclude_collections,
198                    problematic_replicas,
199                )
200                .await;
201
202            let caught_up = caught_up.unwrap_or_else(|e| {
203                tracing::error!(
204                    "unexpected error while checking if cluster {} caught up: {e:#}",
205                    cluster.id
206                );
207                false
208            });
209
210            if !caught_up {
211                result = false;
212
213                // We continue with our loop instead of breaking out early, so
214                // that we log all non-caught up clusters.
215                tracing::info!("cluster {} is not caught up", cluster.id);
216            }
217        }
218
219        result
220    }
221
222    /// Returns `true` if all non-transient, non-excluded collections have their write
223    /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
224    /// reported in `live_frontiers`. The "live" frontiers are frontiers as
225    /// reported by a currently running `environmentd` deployment, during a 0dt
226    /// upgrade.
227    ///
228    /// Collections whose write frontier is behind `now` by more than the cutoff
229    /// are ignored.
230    ///
231    /// This also returns `true` in case this cluster does not have any
232    /// replicas.
233    async fn collections_caught_up(
234        &self,
235        cluster: &Cluster,
236        allowed_lag: Timestamp,
237        cutoff: Timestamp,
238        now: Timestamp,
239        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
240        exclude_collections: &BTreeSet<GlobalId>,
241        problematic_replicas: &BTreeSet<ReplicaId>,
242    ) -> Result<bool, anyhow::Error> {
243        if cluster.replicas().next().is_none() {
244            return Ok(true);
245        }
246
247        // Check if all replicas in this cluster are crash/OOM-looping. As long
248        // as there is at least  one healthy replica, the cluster is okay-ish.
249        let cluster_has_only_problematic_replicas = cluster
250            .replicas()
251            .all(|replica| problematic_replicas.contains(&replica.replica_id));
252
253        enum CollectionType {
254            Storage,
255            Compute,
256        }
257
258        let mut all_caught_up = true;
259
260        let storage_frontiers = self
261            .controller
262            .storage
263            .active_ingestion_exports(cluster.id)
264            .copied()
265            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
266            .map(|id| {
267                let (_read_frontier, write_frontier) =
268                    self.controller.storage.collection_frontiers(id)?;
269                Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
270            });
271
272        let compute_frontiers = self
273            .controller
274            .compute
275            .collection_ids(cluster.id)?
276            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
277            .map(|id| {
278                let write_frontier = self
279                    .controller
280                    .compute
281                    .collection_frontiers(id, Some(cluster.id))?
282                    .write_frontier
283                    .to_owned();
284                Ok((id, write_frontier, CollectionType::Compute))
285            });
286
287        for res in itertools::chain(storage_frontiers, compute_frontiers) {
288            let (id, write_frontier, collection_type) = res?;
289            let live_write_frontier = match live_frontiers.get(&id) {
290                Some(frontier) => frontier,
291                None => {
292                    // The collection didn't previously exist, so consider
293                    // ourselves hydrated as long as our write_ts is > 0.
294                    tracing::info!(?write_frontier, "collection {id} not in live frontiers");
295                    if write_frontier.less_equal(&Timestamp::minimum()) {
296                        all_caught_up = false;
297                    }
298                    continue;
299                }
300            };
301
302            // We can't do comparisons and subtractions, so we bump up the live
303            // write frontier by the cutoff, and then compare that against
304            // `now`.
305            let live_write_frontier_plus_cutoff = live_write_frontier
306                .iter()
307                .map(|t| t.step_forward_by(&cutoff));
308            let live_write_frontier_plus_cutoff =
309                Antichain::from_iter(live_write_frontier_plus_cutoff);
310
311            let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
312
313            if beyond_all_hope && cluster_has_only_problematic_replicas {
314                tracing::info!(
315                    ?live_write_frontier,
316                    ?cutoff,
317                    ?now,
318                    "live write frontier of collection {id} is too far behind 'now'"
319                );
320                tracing::info!(
321                    "ALL replicas of cluster {} are crash/OOM-looping and it has at least one collection that is too far behind 'now', ignoring cluster for caught-up checks",
322                    cluster.id
323                );
324                return Ok(true);
325            }
326
327            // We can't do easy comparisons and subtractions, so we bump up the
328            // write frontier by the allowed lag, and then compare that against
329            // the write frontier.
330            let write_frontier_plus_allowed_lag = write_frontier
331                .iter()
332                .map(|t| t.step_forward_by(&allowed_lag));
333            let bumped_write_plus_allowed_lag =
334                Antichain::from_iter(write_frontier_plus_allowed_lag);
335
336            let within_lag =
337                PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
338
339            // This call is on the expensive side, because we have to do a call
340            // across a task/channel boundary, and our work competes with other
341            // things the compute/instance controller might be doing. But it's
342            // okay because we only do these hydration checks when in read-only
343            // mode, and only rarely.
344            let collection_hydrated = match collection_type {
345                CollectionType::Compute => {
346                    self.controller
347                        .compute
348                        .collection_hydrated(cluster.id, id)
349                        .await?
350                }
351                CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
352            };
353
354            // We don't expect collections to get hydrated, ingestions to be
355            // started, etc. when they are already at the empty write frontier.
356            if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
357                // This is a bit spammy, but log caught-up collections while we
358                // investigate why environments are cutting over but then a lot
359                // of compute collections are _not_ in fact hydrated on
360                // clusters.
361                tracing::info!(
362                    %id,
363                    %within_lag,
364                    %collection_hydrated,
365                    ?write_frontier,
366                    ?live_write_frontier,
367                    ?allowed_lag,
368                    %cluster.id,
369                    "collection is caught up");
370            } else {
371                // We are not within the allowed lag, or not hydrated!
372                //
373                // We continue with our loop instead of breaking out early, so
374                // that we log all non-caught-up replicas.
375                tracing::info!(
376                    %id,
377                    %within_lag,
378                    %collection_hydrated,
379                    ?write_frontier,
380                    ?live_write_frontier,
381                    ?allowed_lag,
382                    %cluster.id,
383                    "collection is not caught up"
384                );
385                all_caught_up = false;
386            }
387        }
388
389        Ok(all_caught_up)
390    }
391
392    /// Analyzes replica status history to detect replicas that are
393    /// crash-looping or OOM-looping.
394    ///
395    /// A replica is considered problematic if it has multiple OOM kills in a
396    /// short-ish window.
397    async fn analyze_replica_looping(&self, now: EpochMillis) -> BTreeSet<ReplicaId> {
398        // Look back 1 day for patterns.
399        let lookback_window: u64 = Duration::from_secs(24 * 60 * 60)
400            .as_millis()
401            .try_into()
402            .expect("fits into u64");
403        let min_timestamp = now.saturating_sub(lookback_window);
404        let min_timestamp_dt = mz_ore::now::to_datetime(min_timestamp);
405
406        // Get the replica status collection GlobalId
407        let replica_status_item_id = self
408            .catalog()
409            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_STATUS_HISTORY);
410        let replica_status_gid = self
411            .catalog()
412            .get_entry(&replica_status_item_id)
413            .latest_global_id();
414
415        // Acquire a read hold to determine the as_of timestamp for snapshot_and_stream
416        let read_holds = self
417            .controller
418            .storage_collections
419            .acquire_read_holds(vec![replica_status_gid])
420            .expect("can't acquire read hold for mz_cluster_replica_status_history");
421        let read_hold = if let Some(read_hold) = read_holds.into_iter().next() {
422            read_hold
423        } else {
424            // Collection is not readable anymore, but we return an empty set
425            // instead of panicing.
426            return BTreeSet::new();
427        };
428
429        let as_of = read_hold
430            .since()
431            .iter()
432            .next()
433            .cloned()
434            .expect("since should not be empty");
435
436        let mut replica_statuses_stream = self
437            .controller
438            .storage_collections
439            .snapshot_and_stream(replica_status_gid, as_of)
440            .await
441            .expect("can't read mz_cluster_replica_status_history");
442
443        let mut replica_problem_counts: BTreeMap<ReplicaId, u32> = BTreeMap::new();
444
445        while let Some((source_data, _ts, diff)) = replica_statuses_stream.next().await {
446            // Only process inserts (positive diffs)
447            if diff <= 0 {
448                continue;
449            }
450
451            // Extract the Row from SourceData
452            let row = match source_data.0 {
453                Ok(row) => row,
454                Err(err) => {
455                    // This builtin collection shouldn't have errors, so we at
456                    // least log an error so that tests or sentry will notice.
457                    tracing::error!(
458                        collection = MZ_CLUSTER_REPLICA_STATUS_HISTORY.name,
459                        ?err,
460                        "unexpected error in builtin collection"
461                    );
462                    continue;
463                }
464            };
465
466            let mut iter = row.into_iter();
467
468            let replica_id: ReplicaId = iter
469                .next()
470                .expect("missing replica_id")
471                .unwrap_str()
472                .parse()
473                .expect("must parse as replica ID");
474            let _process_id = iter.next().expect("missing process_id").unwrap_uint64();
475            let status = iter
476                .next()
477                .expect("missing status")
478                .unwrap_str()
479                .to_string();
480            let reason_datum = iter.next().expect("missing reason");
481            let reason = if reason_datum.is_null() {
482                None
483            } else {
484                Some(reason_datum.unwrap_str().to_string())
485            };
486            let occurred_at = iter
487                .next()
488                .expect("missing occurred_at")
489                .unwrap_timestamptz();
490
491            // Only consider events within the time window and that are problematic
492            if occurred_at.naive_utc() >= min_timestamp_dt.naive_utc() {
493                if Self::is_problematic_status(&status, reason.as_deref()) {
494                    *replica_problem_counts.entry(replica_id).or_insert(0) += 1;
495                }
496            }
497        }
498
499        // Filter to replicas with 3 or more problematic events.
500        let result = replica_problem_counts
501            .into_iter()
502            .filter_map(|(replica_id, count)| {
503                if count >= 3 {
504                    tracing::info!(
505                        "Detected problematic cluster replica {}: {} problematic events in last {:?}",
506                        replica_id,
507                        count,
508                        Duration::from_millis(lookback_window)
509                    );
510                    Some(replica_id)
511                } else {
512                    None
513                }
514            })
515            .collect();
516
517        // Explicitly keep the read hold alive until this point.
518        drop(read_hold);
519
520        result
521    }
522
523    /// Determines if a replica status indicates a problematic state that could
524    /// indicate looping.
525    fn is_problematic_status(_status: &str, reason: Option<&str>) -> bool {
526        // For now, we only look at the reason, but we could change/expand this
527        // if/when needed.
528        if let Some(reason) = reason {
529            return reason == OfflineReason::OomKilled.to_string();
530        }
531
532        false
533    }
534}