mz_adapter/coord/cluster_scheduling.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use itertools::Itertools;
11use mz_audit_log::SchedulingDecisionsWithReasonsV2;
12use mz_catalog::memory::objects::{CatalogItem, ClusterVariant, ClusterVariantManaged};
13use mz_controller_types::ClusterId;
14use mz_ore::collections::CollectionExt;
15use mz_ore::{soft_assert_or_log, soft_panic_or_log};
16use mz_repr::adt::interval::Interval;
17use mz_repr::{GlobalId, TimestampManipulation};
18use mz_sql::catalog::CatalogCluster;
19use mz_sql::plan::{AlterClusterPlanStrategy, ClusterSchedule};
20use std::time::{Duration, Instant};
21use tracing::{debug, warn};
22
23use crate::AdapterError;
24use crate::coord::{Coordinator, Message};
25
26const POLICIES: &[&str] = &[REFRESH_POLICY_NAME];
27
28const REFRESH_POLICY_NAME: &str = "refresh";
29
30/// A policy's decision for whether it wants a certain cluster to be On, along with its reason.
31/// (Among the reasons there can be settings of the policy as well as other information about the
32/// state of the system.)
33#[derive(Clone, Debug)]
34pub enum SchedulingDecision {
35 /// The reason for the refresh policy for wanting to turn a cluster On or Off.
36 Refresh(RefreshDecision),
37}
38
39impl SchedulingDecision {
40 /// Extract the On/Off decision from the policy-specific structs.
41 pub fn cluster_on(&self) -> bool {
42 match &self {
43 SchedulingDecision::Refresh(RefreshDecision { cluster_on, .. }) => cluster_on.clone(),
44 }
45 }
46}
47
48#[derive(Clone, Debug)]
49pub struct RefreshDecision {
50 /// Whether the ON REFRESH policy wants a certain cluster to be On.
51 cluster_on: bool,
52 /// Objects that currently need a refresh on the cluster (taking into account the rehydration
53 /// time estimate), and therefore should keep the cluster On.
54 objects_needing_refresh: Vec<GlobalId>,
55 /// Objects for which we estimate that they currently need Persist compaction, and therefore
56 /// should keep the cluster On.
57 objects_needing_compaction: Vec<GlobalId>,
58 /// The HYDRATION TIME ESTIMATE setting of the cluster.
59 hydration_time_estimate: Duration,
60}
61
62impl SchedulingDecision {
63 pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV2
64 where
65 I: IntoIterator<Item = &'a SchedulingDecision>,
66 {
67 SchedulingDecisionsWithReasonsV2 {
68 on_refresh: reasons
69 .into_iter()
70 .filter_map(|r| match r {
71 SchedulingDecision::Refresh(RefreshDecision {
72 cluster_on,
73 objects_needing_refresh,
74 objects_needing_compaction,
75 hydration_time_estimate,
76 }) => {
77 soft_assert_or_log!(
78 !cluster_on
79 || !objects_needing_refresh.is_empty()
80 || !objects_needing_compaction.is_empty(),
81 "`cluster_on = true` should have an explanation"
82 );
83 let mut hydration_time_estimate_str = String::new();
84 mz_repr::strconv::format_interval(
85 &mut hydration_time_estimate_str,
86 Interval::from_duration(hydration_time_estimate).expect(
87 "planning ensured that this is convertible back to Interval",
88 ),
89 );
90 Some(mz_audit_log::RefreshDecisionWithReasonV2 {
91 decision: (*cluster_on).into(),
92 objects_needing_refresh: objects_needing_refresh
93 .iter()
94 .map(|id| id.to_string())
95 .collect(),
96 objects_needing_compaction: objects_needing_compaction
97 .iter()
98 .map(|id| id.to_string())
99 .collect(),
100 hydration_time_estimate: hydration_time_estimate_str,
101 })
102 }
103 })
104 .into_element(), // Each policy should have exactly one opinion on each cluster.
105 }
106 }
107}
108
109impl Coordinator {
110 #[mz_ore::instrument(level = "debug")]
111 /// Call each scheduling policy.
112 pub(crate) async fn check_scheduling_policies(&mut self) {
113 // (So far, we have only this one policy.)
114 self.check_refresh_policy();
115 }
116
117 /// Runs the `SCHEDULE = ON REFRESH` cluster scheduling policy, which makes cluster On/Off
118 /// decisions based on REFRESH materialized view write frontiers and the current time (the local
119 /// oracle read ts), and sends `Message::SchedulingDecisions` with these decisions.
120 /// (Queries the timestamp oracle on a background task.)
121 fn check_refresh_policy(&self) {
122 let start_time = Instant::now();
123
124 // Collect information about REFRESH MVs:
125 // - cluster
126 // - hydration_time_estimate of the cluster
127 // - MV's id
128 // - MV's write frontier
129 // - MV's refresh schedule
130 let mut refresh_mv_infos = Vec::new();
131 for cluster in self.catalog().clusters() {
132 if let ClusterVariant::Managed(ref config) = cluster.config.variant {
133 match config.schedule {
134 ClusterSchedule::Manual => {
135 // Nothing to do, user manages this cluster manually.
136 }
137 ClusterSchedule::Refresh {
138 hydration_time_estimate,
139 } => {
140 let mvs = cluster
141 .bound_objects()
142 .iter()
143 .filter_map(|id| {
144 if let CatalogItem::MaterializedView(mv) =
145 self.catalog().get_entry(id).item()
146 {
147 mv.refresh_schedule.clone().map(|refresh_schedule| {
148 let (_since, write_frontier) = self
149 .controller
150 .storage
151 .collection_frontiers(mv.global_id())
152 .expect("the storage controller should know about MVs that exist in the catalog");
153 (mv.global_id(), write_frontier, refresh_schedule)
154 })
155 } else {
156 None
157 }
158 })
159 .collect_vec();
160 debug!(%cluster.id, ?refresh_mv_infos, "check_refresh_policy");
161 refresh_mv_infos.push((cluster.id, hydration_time_estimate, mvs));
162 }
163 }
164 }
165 }
166
167 // Spawn a background task that queries the timestamp oracle for the current read timestamp,
168 // compares this ts with the REFRESH MV write frontiers, thus making On/Off decisions per
169 // cluster, and sends a `Message::SchedulingDecisions` with these decisions.
170 let ts_oracle = self.get_local_timestamp_oracle();
171 let internal_cmd_tx = self.internal_cmd_tx.clone();
172 let check_scheduling_policies_seconds_cloned =
173 self.metrics.check_scheduling_policies_seconds.clone();
174 let compaction_estimate = self
175 .catalog()
176 .system_config()
177 .cluster_refresh_mv_compaction_estimate()
178 .try_into()
179 .expect("should be configured to a reasonable value");
180 mz_ore::task::spawn(|| "refresh policy get ts and make decisions", async move {
181 let task_start_time = Instant::now();
182 let local_read_ts = ts_oracle.read_ts().await;
183 debug!(%local_read_ts, ?refresh_mv_infos, "check_refresh_policy background task");
184 let decisions = refresh_mv_infos
185 .into_iter()
186 .map(|(cluster_id, hydration_time_estimate, refresh_mv_info)| {
187 // 1. check that
188 // write_frontier < local_read_ts + hydration_time_estimate
189 let hydration_estimate = &hydration_time_estimate
190 .try_into()
191 .expect("checked during planning");
192 let local_read_ts_adjusted = local_read_ts.step_forward_by(hydration_estimate);
193 let mvs_needing_refresh = refresh_mv_info
194 .iter()
195 .cloned()
196 .filter_map(|(id, frontier, _refresh_schedule)| {
197 if frontier.less_than(&local_read_ts_adjusted) {
198 Some(id)
199 } else {
200 None
201 }
202 })
203 .collect_vec();
204
205 // 2. check that
206 // prev_refresh + compaction_estimate > local_read_ts
207 let mvs_needing_compaction = refresh_mv_info
208 .into_iter()
209 .filter_map(|(id, frontier, refresh_schedule)| {
210 let frontier = frontier.as_option();
211 // `prev_refresh` will be None in two cases:
212 // 1. When there is no previous refresh, because we haven't yet had
213 // the first refresh. In this case, there is no need to schedule
214 // time now for compaction.
215 // 2. In the niche case where a `REFRESH EVERY` MV's write frontier
216 // is empty. In this case, it's not impossible that there would be a
217 // need for compaction. But I can't see any easy way to correctly
218 // handle this case, because we don't have any info handy about when
219 // the last refresh happened in wall clock time, because the
220 // frontiers have no relation to wall clock time. So, we'll not
221 // schedule any compaction time.
222 // (Note that `REFRESH AT` MVs with empty frontiers, which is a more
223 // common case, are fine, because `last_refresh` will return
224 // Some(...) for them.)
225 let prev_refresh = match frontier {
226 Some(frontier) => frontier.round_down_minus_1(&refresh_schedule),
227 None => refresh_schedule.last_refresh(),
228 };
229 prev_refresh
230 .map(|prev_refresh| {
231 if prev_refresh.step_forward_by(&compaction_estimate)
232 > local_read_ts
233 {
234 Some(id)
235 } else {
236 None
237 }
238 })
239 .flatten()
240 })
241 .collect_vec();
242
243 let cluster_on =
244 !mvs_needing_refresh.is_empty() || !mvs_needing_compaction.is_empty();
245 (
246 cluster_id,
247 SchedulingDecision::Refresh(RefreshDecision {
248 cluster_on,
249 objects_needing_refresh: mvs_needing_refresh,
250 objects_needing_compaction: mvs_needing_compaction,
251 hydration_time_estimate,
252 }),
253 )
254 })
255 .collect();
256 if let Err(e) = internal_cmd_tx.send(Message::SchedulingDecisions(vec![(
257 REFRESH_POLICY_NAME,
258 decisions,
259 )])) {
260 // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
261 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
262 }
263 check_scheduling_policies_seconds_cloned
264 .with_label_values(&[REFRESH_POLICY_NAME, "background"])
265 .observe((Instant::now() - task_start_time).as_secs_f64());
266 });
267
268 self.metrics
269 .check_scheduling_policies_seconds
270 .with_label_values(&[REFRESH_POLICY_NAME, "main"])
271 .observe((Instant::now() - start_time).as_secs_f64());
272 }
273
274 /// Handles `SchedulingDecisions`:
275 /// 1. Adds the newly made decisions to `cluster_scheduling_decisions`.
276 /// 2. Cleans up old decisions that are for clusters no longer in scope of automated scheduling
277 /// decisions.
278 /// 3. For each cluster, it sums up `cluster_scheduling_decisions`, checks the summed up decision
279 /// against the cluster state, and turns cluster On/Off if needed.
280 #[mz_ore::instrument(level = "debug")]
281 pub(crate) async fn handle_scheduling_decisions(
282 &mut self,
283 decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>,
284 ) {
285 let start_time = Instant::now();
286
287 // 1. Add the received decisions to `cluster_scheduling_decisions`.
288 for (policy_name, decisions) in decisions.iter() {
289 for (cluster_id, decision) in decisions {
290 self.cluster_scheduling_decisions
291 .entry(*cluster_id)
292 .or_insert_with(Default::default)
293 .insert(policy_name, decision.clone());
294 }
295 }
296
297 // 2. Clean up those clusters from `scheduling_decisions` that
298 // - have been dropped, or
299 // - were switched to unmanaged, or
300 // - were switched to `SCHEDULE = MANUAL`.
301 for cluster_id in self
302 .cluster_scheduling_decisions
303 .keys()
304 .cloned()
305 .collect_vec()
306 {
307 match self.get_managed_cluster_config(cluster_id) {
308 None => {
309 // Cluster have been dropped or switched to unmanaged.
310 debug!(
311 "handle_scheduling_decisions: \
312 Removing cluster {} from cluster_scheduling_decisions, \
313 because get_managed_cluster_config returned None",
314 cluster_id
315 );
316 self.cluster_scheduling_decisions.remove(&cluster_id);
317 }
318 Some(managed_config) => {
319 if matches!(managed_config.schedule, ClusterSchedule::Manual) {
320 debug!(
321 "handle_scheduling_decisions: \
322 Removing cluster {} from cluster_scheduling_decisions, \
323 because schedule is Manual",
324 cluster_id
325 );
326 self.cluster_scheduling_decisions.remove(&cluster_id);
327 }
328 }
329 }
330 }
331
332 // 3. Act on `scheduling_decisions` where needed.
333 let mut altered_a_cluster = false;
334 for (cluster_id, decisions) in self.cluster_scheduling_decisions.clone() {
335 // We touch a cluster only when all policies have made a decision about it. This is
336 // to ensure that after an envd restart all policies have a chance to run at least once
337 // before we turn off a cluster, to avoid spuriously turning off a cluster and possibly
338 // losing a hydrated state.
339 if POLICIES.iter().all(|policy| decisions.contains_key(policy)) {
340 // Check whether the cluster's state matches the needed state.
341 // If any policy says On, then we need a replica.
342 let needs_replica = decisions
343 .values()
344 .map(|decision| decision.cluster_on())
345 .contains(&true);
346 let cluster_config = self.catalog().get_cluster(cluster_id).config.clone();
347 let mut new_config = cluster_config.clone();
348 let ClusterVariant::Managed(managed_config) = &mut new_config.variant else {
349 panic!("cleaned up unmanaged clusters above");
350 };
351 let has_replica = managed_config.replication_factor > 0; // Is it On?
352 if needs_replica != has_replica {
353 // Turn the cluster On or Off.
354 altered_a_cluster = true;
355 managed_config.replication_factor = if needs_replica { 1 } else { 0 };
356 if let Err(e) = self
357 .sequence_alter_cluster_managed_to_managed(
358 None,
359 cluster_id,
360 new_config.clone(),
361 crate::catalog::ReplicaCreateDropReason::ClusterScheduling(
362 decisions.values().cloned().collect(),
363 ),
364 AlterClusterPlanStrategy::None,
365 )
366 .await
367 {
368 if let AdapterError::AlterClusterWhilePendingReplicas = e {
369 debug!(
370 "handle_scheduling_decisions tried to alter a cluster that is undergoing a graceful reconfiguration"
371 );
372 } else {
373 soft_panic_or_log!(
374 "handle_scheduling_decisions couldn't alter cluster {}. \
375 Old config: {:?}, \
376 New config: {:?}, \
377 Error: {}",
378 cluster_id,
379 cluster_config,
380 new_config,
381 e
382 );
383 }
384 }
385 }
386 } else {
387 debug!(
388 "handle_scheduling_decisions: \
389 Not all policies have made a decision about cluster {}. decisions: {:?}",
390 cluster_id, decisions,
391 );
392 }
393 }
394
395 self.metrics
396 .handle_scheduling_decisions_seconds
397 .with_label_values(&[altered_a_cluster.to_string().as_str()])
398 .observe((Instant::now() - start_time).as_secs_f64());
399 }
400
401 /// Returns the managed config for a cluster. Returns None if the cluster doesn't exist or if
402 /// it's an unmanaged cluster.
403 fn get_managed_cluster_config(&self, cluster_id: ClusterId) -> Option<ClusterVariantManaged> {
404 let cluster = self.catalog().try_get_cluster(cluster_id)?;
405 if let ClusterVariant::Managed(managed_config) = cluster.config.variant.clone() {
406 Some(managed_config)
407 } else {
408 None
409 }
410 }
411}