1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Support for checking whether clusters/collections are caught up during a 0dt
11//! deployment.
1213use std::collections::{BTreeMap, BTreeSet};
1415use differential_dataflow::lattice::Lattice as _;
16use itertools::Itertools;
17use mz_adapter_types::dyncfgs::{
18 ENABLE_0DT_CAUGHT_UP_CHECK, WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG,
19 WITH_0DT_CAUGHT_UP_CHECK_CUTOFF,
20};
21use mz_catalog::builtin::MZ_CLUSTER_REPLICA_FRONTIERS;
22use mz_catalog::memory::objects::Cluster;
23use mz_ore::channel::trigger::Trigger;
24use mz_repr::{GlobalId, Timestamp};
25use timely::PartialOrder;
26use timely::progress::{Antichain, Timestamp as _};
2728use crate::coord::Coordinator;
2930/// Context needed to check whether clusters/collections are caught up.
31#[derive(Debug)]
32pub struct CaughtUpCheckContext {
33/// A trigger that signals that all clusters/collections have been caught
34 /// up.
35pub trigger: Trigger,
36/// Collections to exclude from the caught up check.
37 ///
38 /// When a caught up check is performed as part of a 0dt upgrade, it makes sense to exclude
39 /// collections of newly added builtin objects, as these might not hydrate in read-only mode.
40pub exclude_collections: BTreeSet<GlobalId>,
41}
4243impl Coordinator {
44/// Checks that all clusters/collections are caught up. If so, this will
45 /// trigger `self.catchup_check.trigger`.
46 ///
47 /// This method is a no-op when the trigger has already been fired.
48pub async fn maybe_check_caught_up(&mut self) {
49let enable_caught_up_check =
50 ENABLE_0DT_CAUGHT_UP_CHECK.get(self.catalog().system_config().dyncfgs());
5152if enable_caught_up_check {
53self.maybe_check_caught_up_new().await
54} else {
55self.maybe_check_caught_up_legacy().await
56}
57 }
5859async fn maybe_check_caught_up_new(&mut self) {
60let Some(ctx) = &self.caught_up_check else {
61return;
62 };
6364let replica_frontier_item_id = self
65.catalog()
66 .resolve_builtin_storage_collection(&MZ_CLUSTER_REPLICA_FRONTIERS);
67let replica_frontier_gid = self
68.catalog()
69 .get_entry(&replica_frontier_item_id)
70 .latest_global_id();
7172let live_frontiers = self
73.controller
74 .storage_collections
75 .snapshot_latest(replica_frontier_gid)
76 .await
77.expect("can't read mz_cluster_replica_frontiers");
7879let live_frontiers = live_frontiers
80 .into_iter()
81 .map(|row| {
82let mut iter = row.into_iter();
8384let id: GlobalId = iter
85 .next()
86 .expect("missing object id")
87 .unwrap_str()
88 .parse()
89 .expect("cannot parse id");
90let replica_id = iter
91 .next()
92 .expect("missing replica id")
93 .unwrap_str()
94 .to_string();
95let maybe_upper_ts = iter.next().expect("missing upper_ts");
96// The timestamp has a total order, so there can be at
97 // most one entry in the upper frontier, which is this
98 // timestamp here. And NULL encodes the empty upper
99 // frontier.
100let upper_frontier = if maybe_upper_ts.is_null() {
101 Antichain::new()
102 } else {
103let upper_ts = maybe_upper_ts.unwrap_mz_timestamp();
104 Antichain::from_elem(upper_ts)
105 };
106107 (id, replica_id, upper_frontier)
108 })
109 .collect_vec();
110111// We care about each collection being hydrated on _some_
112 // replica. We don't check that at least one replica has all
113 // collections of that cluster hydrated.
114let live_collection_frontiers: BTreeMap<_, _> = live_frontiers
115 .into_iter()
116 .map(|(oid, _replica_id, upper_ts)| (oid, upper_ts))
117 .into_grouping_map()
118 .fold(
119 Antichain::from_elem(Timestamp::minimum()),
120 |mut acc, _key, upper| {
121 acc.join_assign(&upper);
122 acc
123 },
124 )
125 .into_iter()
126 .collect();
127128tracing::debug!(?live_collection_frontiers, "checking re-hydration status");
129130let allowed_lag =
131 WITH_0DT_CAUGHT_UP_CHECK_ALLOWED_LAG.get(self.catalog().system_config().dyncfgs());
132let allowed_lag: u64 = allowed_lag
133 .as_millis()
134 .try_into()
135 .expect("must fit into u64");
136137let cutoff = WITH_0DT_CAUGHT_UP_CHECK_CUTOFF.get(self.catalog().system_config().dyncfgs());
138let cutoff: u64 = cutoff.as_millis().try_into().expect("must fit into u64");
139140let now = self.now();
141142let compute_caught_up = self
143.clusters_caught_up(
144 allowed_lag.into(),
145 cutoff.into(),
146 now.into(),
147&live_collection_frontiers,
148&ctx.exclude_collections,
149 )
150 .await;
151152tracing::info!(%compute_caught_up, "checked caught-up status of collections");
153154if compute_caught_up {
155let ctx = self.caught_up_check.take().expect("known to exist");
156 ctx.trigger.fire();
157 }
158 }
159160/// Returns `true` if all non-transient, non-excluded collections have their write
161 /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
162 /// reported in `live_frontiers`. The "live" frontiers are frontiers as
163 /// reported by a currently running `environmentd` deployment, during a 0dt
164 /// upgrade.
165 ///
166 /// Collections whose write frontier is behind `now` by more than the cutoff
167 /// are ignored.
168 ///
169 /// For this check, zero-replica clusters are always considered caught up.
170 /// Their collections would never normally be considered caught up but it's
171 /// clearly intentional that they have no replicas.
172async fn clusters_caught_up(
173&self,
174 allowed_lag: Timestamp,
175 cutoff: Timestamp,
176 now: Timestamp,
177 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
178 exclude_collections: &BTreeSet<GlobalId>,
179 ) -> bool {
180let mut result = true;
181for cluster in self.catalog().clusters() {
182let caught_up = self
183.collections_caught_up(
184 cluster,
185 allowed_lag.clone(),
186 cutoff.clone(),
187 now.clone(),
188 live_frontiers,
189 exclude_collections,
190 )
191 .await;
192193let caught_up = caught_up.unwrap_or_else(|e| {
194tracing::error!(
195"unexpected error while checking if cluster {} caught up: {e:#}",
196 cluster.id
197 );
198false
199});
200201if !caught_up {
202 result = false;
203204// We continue with our loop instead of breaking out early, so
205 // that we log all non-caught up clusters.
206tracing::info!("cluster {} is not caught up", cluster.id);
207 }
208 }
209210 result
211 }
212213/// Returns `true` if all non-transient, non-excluded collections have their write
214 /// frontier (aka. upper) within `allowed_lag` of the "live" frontier
215 /// reported in `live_frontiers`. The "live" frontiers are frontiers as
216 /// reported by a currently running `environmentd` deployment, during a 0dt
217 /// upgrade.
218 ///
219 /// Collections whose write frontier is behind `now` by more than the cutoff
220 /// are ignored.
221 ///
222 /// This also returns `true` in case this cluster does not have any
223 /// replicas.
224async fn collections_caught_up(
225&self,
226 cluster: &Cluster,
227 allowed_lag: Timestamp,
228 cutoff: Timestamp,
229 now: Timestamp,
230 live_frontiers: &BTreeMap<GlobalId, Antichain<Timestamp>>,
231 exclude_collections: &BTreeSet<GlobalId>,
232 ) -> Result<bool, anyhow::Error> {
233if cluster.replicas().next().is_none() {
234return Ok(true);
235 }
236237enum CollectionType {
238 Storage,
239 Compute,
240 }
241242let mut all_caught_up = true;
243244let storage_frontiers = self
245.controller
246 .storage
247 .active_ingestions(cluster.id)
248 .copied()
249 .filter(|id| !id.is_transient() && !exclude_collections.contains(id))
250 .map(|id| {
251let (_read_frontier, write_frontier) =
252self.controller.storage.collection_frontiers(id)?;
253Ok::<_, anyhow::Error>((id, write_frontier, CollectionType::Storage))
254 });
255256let compute_frontiers = self
257.controller
258 .compute
259 .collection_ids(cluster.id)?
260.filter(|id| !id.is_transient() && !exclude_collections.contains(id))
261 .map(|id| {
262let write_frontier = self
263.controller
264 .compute
265 .collection_frontiers(id, Some(cluster.id))?
266.write_frontier
267 .to_owned();
268Ok((id, write_frontier, CollectionType::Compute))
269 });
270271for res in itertools::chain(storage_frontiers, compute_frontiers) {
272let (id, write_frontier, collection_type) = res?;
273let live_write_frontier = match live_frontiers.get(&id) {
274Some(frontier) => frontier,
275None => {
276// The collection didn't previously exist, so consider
277 // ourselves hydrated as long as our write_ts is > 0.
278tracing::info!(?write_frontier, "collection {id} not in live frontiers");
279if write_frontier.less_equal(&Timestamp::minimum()) {
280 all_caught_up = false;
281 }
282continue;
283 }
284 };
285286// We can't do comparisons and subtractions, so we bump up the live
287 // write frontier by the cutoff, and then compare that against
288 // `now`.
289let live_write_frontier_plus_cutoff = live_write_frontier
290 .iter()
291 .map(|t| t.step_forward_by(&cutoff));
292let live_write_frontier_plus_cutoff =
293 Antichain::from_iter(live_write_frontier_plus_cutoff);
294295let beyond_all_hope = live_write_frontier_plus_cutoff.less_equal(&now);
296297if beyond_all_hope {
298tracing::info!(
299?live_write_frontier,
300?now,
301"live write frontier of collection {id} is too far behind 'now', ignoring for caught-up checks"
302);
303continue;
304 }
305306// We can't do easy comparisons and subtractions, so we bump up the
307 // write frontier by the allowed lag, and then compare that against
308 // the write frontier.
309let write_frontier_plus_allowed_lag = write_frontier
310 .iter()
311 .map(|t| t.step_forward_by(&allowed_lag));
312let bumped_write_plus_allowed_lag =
313 Antichain::from_iter(write_frontier_plus_allowed_lag);
314315let within_lag =
316 PartialOrder::less_equal(live_write_frontier, &bumped_write_plus_allowed_lag);
317318// This call is on the expensive side, because we have to do a call
319 // across a task/channel boundary, and our work competes with other
320 // things the compute/instance controller might be doing. But it's
321 // okay because we only do these hydration checks when in read-only
322 // mode, and only rarely.
323let collection_hydrated = match collection_type {
324 CollectionType::Compute => {
325self.controller
326 .compute
327 .collection_hydrated(cluster.id, id)
328 .await?
329}
330 CollectionType::Storage => self.controller.storage.collection_hydrated(id)?,
331 };
332333// We don't expect collections to get hydrated, ingestions to be
334 // started, etc. when they are already at the empty write frontier.
335if live_write_frontier.is_empty() || (within_lag && collection_hydrated) {
336// This is a bit spammy, but log caught-up collections while we
337 // investigate why environments are cutting over but then a lot
338 // of compute collections are _not_ in fact hydrated on
339 // clusters.
340tracing::info!(
341 %id,
342 %within_lag,
343 %collection_hydrated,
344?write_frontier,
345?live_write_frontier,
346?allowed_lag,
347 %cluster.id,
348"collection is caught up");
349 } else {
350// We are not within the allowed lag, or not hydrated!
351 //
352 // We continue with our loop instead of breaking out early, so
353 // that we log all non-caught-up replicas.
354tracing::info!(
355 %id,
356 %within_lag,
357 %collection_hydrated,
358?write_frontier,
359?live_write_frontier,
360?allowed_lag,
361 %cluster.id,
362"collection is not caught up"
363);
364 all_caught_up = false;
365 }
366 }
367368Ok(all_caught_up)
369 }
370371async fn maybe_check_caught_up_legacy(&mut self) {
372let Some(ctx) = &self.caught_up_check else {
373return;
374 };
375376let compute_hydrated = self
377.controller
378 .compute
379 .clusters_hydrated(&ctx.exclude_collections)
380 .await;
381tracing::info!(%compute_hydrated, "checked hydration status of clusters");
382383if compute_hydrated {
384let ctx = self.caught_up_check.take().expect("known to exist");
385 ctx.trigger.fire();
386 }
387 }
388}