mz_adapter/coord/
caught_up.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for checking whether clusters/collections are caught up during a 0dt
11//! deployment.
12
13use std::collections::{BTreeMap, BTreeSet};
14
15use differential_dataflow::lattice::Lattice as _;
16use itertools::Itertools;
17use mz_adapter_types::dyncfgs::{
18    ENABLE_0DT_CAUGHT_UP_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
19    WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
20};
21use mz_catalog::builtin::MZ_CLUSTER_REPLICA_FRONTIERS;
22use mz_catalog::memory::objects::Cluster;
23use mz_ore::channel::trigger::Trigger;
24use mz_repr::{GlobalId, Timestamp};
25use timely::PartialOrder;
26use timely::progress::{Antichain, Timestamp as _};
27
28use crate::coord::Coordinator;
29
30/// Context needed to check whether clusters/collections are caught up.
31#[derive(Debug)]
32pub struct CaughtUpCheckContext {
33    /// A trigger that signals that all clusters/collections have been caught
34    /// up.
35    pub trigger: Trigger,
36    /// Collections to exclude from the caught up check.
37    ///
38    /// When a caught up check is performed as part of a 0dt upgrade, it makes sense to exclude
39    /// collections of newly added builtin objects, as these might not hydrate in read-only mode.
40    pub exclude_collections: BTreeSet<GlobalId>,
41}
42
43impl Coordinator {
44    /// Checks that all clusters/collections are caught up. If so, this will
45    /// trigger `self.catchup_check.trigger`.
46    ///
47    /// This method is a no-op when the trigger has already been fired.
48    pub async fn maybe_check_caught_up(&mut self) {
49        let enable_caught_up_check =
50            ENABLE_0DT_CAUGHT_UP_CHECK.get(self.catalog().system_config().dyncfgs());
51
52        if enable_caught_up_check {
53            self.maybe_check_caught_up_new().await
54        } else {
55            self.maybe_check_caught_up_legacy().await
56        }
57    }
58
59    async fn maybe_check_caught_up_new(&mut self) {
60        let Some(ctx) = &self.caught_up_check else {
61            return;
62        };
63
64        let replica_frontier_item_id = self
65            .catalog()
66            .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
67        let replica_frontier_gid = self
68            .catalog()
69            .get_entry(&replica_frontier_item_id)
70            .latest_global_id();
71
72        let live_frontiers = self
73            .controller
74            .storage_collections
75            .snapshot_latest(replica_frontier_gid)
76            .await
77            .expect("can't read mz_cluster_replica_frontiers");
78
79        let live_frontiers = live_frontiers
80            .into_iter()
81            .map(|row| {
82                let mut iter = row.into_iter();
83
84                let id: GlobalId = iter
85                    .next()
86                    .expect("missing object id")
87                    .unwrap_str()
88                    .parse()
89                    .expect("cannot parse id");
90                let replica_id = iter
91                    .next()
92                    .expect("missing replica id")
93                    .unwrap_str()
94                    .to_string();
95                let maybe_upper_ts = iter.next().expect("missing upper_ts");
96                // The timestamp has a total order, so there can be at
97                // most one entry in the upper frontier, which is this
98                // timestamp here. And NULL encodes the empty upper
99                // frontier.
100                let upper_frontier = if maybe_upper_ts.is_null() {
101                    Antichain::new()
102                } else {
103                    let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
104                    Antichain::from_elem(upper_ts)
105                };
106
107                (id, replica_id, upper_frontier)
108            })
109            .collect_vec();
110
111        // We care about each collection being hydrated on _some_
112        // replica. We don't check that at least one replica has all
113        // collections of that cluster hydrated.
114        let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
115            .into_iter()
116            .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
117            .into_grouping_map()
118            .fold(
119                Antichain::from_elem(Timestamp::minimum()),
120                |mut acc, _key, upper| {
121                    acc.join_assign(&upper);
122                    acc
123                },
124            )
125            .into_iter()
126            .collect();
127
128        tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
129
130        let allowed_lag =
131            WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
132        let allowed_lag: u64 = allowed_lag
133            .as_millis()
134            .try_into()
135            .expect("must fit into u64");
136
137        let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
138        let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
139
140        let now = self.now();
141
142        let compute_caught_up = self
143            .clusters_caught_up(
144                allowed_lag.into(),
145                cutoff.into(),
146                now.into(),
147                &live_collection_frontiers,
148                &ctx.exclude_collections,
149            )
150            .await;
151
152        tracing::info!(%compute_caught_up, "checked caught-up status of collections");
153
154        if compute_caught_up {
155            let ctx = self.caught_up_check.take().expect("known to exist");
156            ctx.trigger.fire();
157        }
158    }
159
160    /// Returns `true` if all non-transient, non-excluded collections have their write
161    /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
162    /// reported in `live_frontiers`. The "live" frontiers are frontiers as
163    /// reported by a currently running `environmentd` deployment, during a 0dt
164    /// upgrade.
165    ///
166    /// Collections whose write frontier is behind `now` by more than the cutoff
167    /// are ignored.
168    ///
169    /// For this check, zero-replica clusters are always considered caught up.
170    /// Their collections would never normally be considered caught up but it's
171    /// clearly intentional that they have no replicas.
172    async fn clusters_caught_up(
173        &self,
174        allowed_lag: Timestamp,
175        cutoff: Timestamp,
176        now: Timestamp,
177        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
178        exclude_collections: &BTreeSet<GlobalId>,
179    ) -> bool {
180        let mut result = true;
181        for cluster in self.catalog().clusters() {
182            let caught_up = self
183                .collections_caught_up(
184                    cluster,
185                    allowed_lag.clone(),
186                    cutoff.clone(),
187                    now.clone(),
188                    live_frontiers,
189                    exclude_collections,
190                )
191                .await;
192
193            let caught_up = caught_up.unwrap_or_else(|e| {
194                tracing::error!(
195                    "unexpected error while checking if cluster {} caught up: {e:#}",
196                    cluster.id
197                );
198                false
199            });
200
201            if !caught_up {
202                result = false;
203
204                // We continue with our loop instead of breaking out early, so
205                // that we log all non-caught up clusters.
206                tracing::info!("cluster {} is not caught up", cluster.id);
207            }
208        }
209
210        result
211    }
212
213    /// Returns `true` if all non-transient, non-excluded collections have their write
214    /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
215    /// reported in `live_frontiers`. The "live" frontiers are frontiers as
216    /// reported by a currently running `environmentd` deployment, during a 0dt
217    /// upgrade.
218    ///
219    /// Collections whose write frontier is behind `now` by more than the cutoff
220    /// are ignored.
221    ///
222    /// This also returns `true` in case this cluster does not have any
223    /// replicas.
224    async fn collections_caught_up(
225        &self,
226        cluster: &Cluster,
227        allowed_lag: Timestamp,
228        cutoff: Timestamp,
229        now: Timestamp,
230        live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
231        exclude_collections: &BTreeSet<GlobalId>,
232    ) -> Result<bool, anyhow::Error> {
233        if cluster.replicas().next().is_none() {
234            return Ok(true);
235        }
236
237        enum CollectionType {
238            Storage,
239            Compute,
240        }
241
242        let mut all_caught_up = true;
243
244        let storage_frontiers = self
245            .controller
246            .storage
247            .active_ingestions(cluster.id)
248            .copied()
249            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
250            .map(|id| {
251                let (_read_frontier, write_frontier) =
252                    self.controller.storage.collection_frontiers(id)?;
253                Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
254            });
255
256        let compute_frontiers = self
257            .controller
258            .compute
259            .collection_ids(cluster.id)?
260            .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
261            .map(|id| {
262                let write_frontier = self
263                    .controller
264                    .compute
265                    .collection_frontiers(id, Some(cluster.id))?
266                    .write_frontier
267                    .to_owned();
268                Ok((id, write_frontier, CollectionType::Compute))
269            });
270
271        for res in itertools::chain(storage_frontiers, compute_frontiers) {
272            let (id, write_frontier, collection_type) = res?;
273            let live_write_frontier = match live_frontiers.get(&id) {
274                Some(frontier) => frontier,
275                None => {
276                    // The collection didn't previously exist, so consider
277                    // ourselves hydrated as long as our write_ts is > 0.
278                    tracing::info!(?write_frontier, "collection {id} not in live frontiers");
279                    if write_frontier.less_equal(&Timestamp::minimum()) {
280                        all_caught_up = false;
281                    }
282                    continue;
283                }
284            };
285
286            // We can't do comparisons and subtractions, so we bump up the live
287            // write frontier by the cutoff, and then compare that against
288            // `now`.
289            let live_write_frontier_plus_cutoff = live_write_frontier
290                .iter()
291                .map(|t| t.step_forward_by(&cutoff));
292            let live_write_frontier_plus_cutoff =
293                Antichain::from_iter(live_write_frontier_plus_cutoff);
294
295            let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
296
297            if beyond_all_hope {
298                tracing::info!(
299                    ?live_write_frontier,
300                    ?now,
301                    "live write frontier of collection {id} is too far behind 'now', ignoring for caught-up checks"
302                );
303                continue;
304            }
305
306            // We can't do easy comparisons and subtractions, so we bump up the
307            // write frontier by the allowed lag, and then compare that against
308            // the write frontier.
309            let write_frontier_plus_allowed_lag = write_frontier
310                .iter()
311                .map(|t| t.step_forward_by(&allowed_lag));
312            let bumped_write_plus_allowed_lag =
313                Antichain::from_iter(write_frontier_plus_allowed_lag);
314
315            let within_lag =
316                PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
317
318            // This call is on the expensive side, because we have to do a call
319            // across a task/channel boundary, and our work competes with other
320            // things the compute/instance controller might be doing. But it's
321            // okay because we only do these hydration checks when in read-only
322            // mode, and only rarely.
323            let collection_hydrated = match collection_type {
324                CollectionType::Compute => {
325                    self.controller
326                        .compute
327                        .collection_hydrated(cluster.id, id)
328                        .await?
329                }
330                CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
331            };
332
333            // We don't expect collections to get hydrated, ingestions to be
334            // started, etc. when they are already at the empty write frontier.
335            if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
336                // This is a bit spammy, but log caught-up collections while we
337                // investigate why environments are cutting over but then a lot
338                // of compute collections are _not_ in fact hydrated on
339                // clusters.
340                tracing::info!(
341                    %id,
342                    %within_lag,
343                    %collection_hydrated,
344                    ?write_frontier,
345                    ?live_write_frontier,
346                    ?allowed_lag,
347                    %cluster.id,
348                    "collection is caught up");
349            } else {
350                // We are not within the allowed lag, or not hydrated!
351                //
352                // We continue with our loop instead of breaking out early, so
353                // that we log all non-caught-up replicas.
354                tracing::info!(
355                    %id,
356                    %within_lag,
357                    %collection_hydrated,
358                    ?write_frontier,
359                    ?live_write_frontier,
360                    ?allowed_lag,
361                    %cluster.id,
362                    "collection is not caught up"
363                );
364                all_caught_up = false;
365            }
366        }
367
368        Ok(all_caught_up)
369    }
370
371    async fn maybe_check_caught_up_legacy(&mut self) {
372        let Some(ctx) = &self.caught_up_check else {
373            return;
374        };
375
376        let compute_hydrated = self
377            .controller
378            .compute
379            .clusters_hydrated(&ctx.exclude_collections)
380            .await;
381        tracing::info!(%compute_hydrated, "checked hydration status of clusters");
382
383        if compute_hydrated {
384            let ctx = self.caught_up_check.take().expect("known to exist");
385            ctx.trigger.fire();
386        }
387    }
388}